Home >> Kaldi gop-compute
Kaldi gop-compute
2025-12-26 09:53 AtmosphereMao
发音质量计算
-
Download
- kaldi
- Librispeech ASR model - Librispeech ASR Chain 1d & Librispeech i-vector extractor
-
Run
# Librispeech ASR Chain 1d & Librispeech i-vector extractor
# move to
# egs/librispeech/s5
# configuration
cd egs/gop_speechocean762/s5
vim run.sh
librispeech_eg=../../librispeech/s5
#model=$librispeech_eg/exp/nnet3_cleaned/tdnn_sp
model=$librispeech_eg/exp/chain_cleaned/tdnn_1d_sp
ivector_extractor=$librispeech_eg/exp/nnet3_cleaned/extractor
#lang=$librispeech_eg/data/lang
lang=$librispeech_eg/data/lang_nosp
# Run
./run.sh
-
Deploy API
import base64
import os
import re
import shlex
import subprocess
import uuid
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from pydantic import BaseModel
RECIPE_DIR = Path("/your/path/kaldi/egs/gop_speechocean762/s5")
LIBRI_DIR = Path("/your/path/kaldi-gop/kaldi/egs/librispeech/s5")
CHAIN_MODEL_DIR = LIBRI_DIR / "exp/chain_cleaned/tdnn_1d_sp"
IVECTOR_EXTRACTOR_DIR = LIBRI_DIR / "exp/nnet3_cleaned/extractor"
BASE_LANG_DIR = LIBRI_DIR / "data/lang_nosp"
CONF_MFCC = RECIPE_DIR / "conf/mfcc_hires.conf"
RECIPE_LEXICON = RECIPE_DIR / "data/local/dict_nosp/lexicon.txt"
WORK_ROOT = Path("/tmp/gop_service_work")
KEEP_WORKDIR = os.environ.get("GOP_KEEP_WORKDIR", "0") == "1"
KALDI_INIT = f'cd {shlex.quote(str(RECIPE_DIR))} && . ./path.sh && . ./cmd.sh'
app = FastAPI(title="Kaldi GOP Service", version="0.3.0")
class PhoneGop(BaseModel):
pure_phone_id: int
pure_phone: str
gop: float
class ScoreResponse(BaseModel):
utt_id: str
lang: str
target: str
phones: List[PhoneGop]
mean_gop: float
min_gop: float
max_gop: float
work_dir: Optional[str] = None
def _run_cmd(cmd: str) -> None:
p = subprocess.run(
["bash", "-lc", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if p.returncode != 0:
raise RuntimeError(
"CMD FAILED:\n"
f"cmd: {cmd}\n"
f"returncode: {p.returncode}\n"
f"stdout:\n{p.stdout}\n"
f"stderr:\n{p.stderr}\n"
)
def _ensure_exists(p: Path, what: str) -> None:
if not p.exists():
raise HTTPException(status_code=500, detail=f"Missing {what}: {p}")
def _load_kv_map(txt: Path) -> Dict[str, int]:
m = {}
with txt.open("r", encoding="utf-8") as f:
for line in f:
s = line.strip()
if not s:
continue
a, b = s.split()
m[a] = int(b)
return m
def _load_align_lexicon(align_lexicon_txt: Path) -> Dict[str, List[str]]:
out: Dict[str, List[str]] = {}
with align_lexicon_txt.open("r", encoding="utf-8") as f:
for line in f:
parts = line.strip().split()
if len(parts) < 3:
continue
word = parts[0]
phones = parts[2:]
out[word] = phones
return out
def _load_pure_phone_table(pure_txt: Path) -> Dict[int, str]:
m = {}
if not pure_txt.exists():
return m
with pure_txt.open("r", encoding="utf-8") as f:
for line in f:
parts = line.strip().split()
if len(parts) != 2:
continue
sym, idx = parts
m[int(idx)] = sym
return m
def _parse_gop_txt(gop_txt: Path) -> Tuple[str, List[Tuple[int, float]]]:
line = gop_txt.read_text(encoding="utf-8").strip()
if not line:
raise RuntimeError(f"Empty GOP file: {gop_txt}")
utt = line.split()[0]
pairs = re.findall(r"\[\s*(\d+)\s+([-\d\.eE]+)\s*\]", line)
out = [(int(pid), float(val)) for pid, val in pairs]
return utt, out
def _copy_lang_to_work(work_dir: Path) -> Path:
work_lang = work_dir / "data/lang_nosp"
work_lang.parent.mkdir(parents=True, exist_ok=True)
_ensure_exists(BASE_LANG_DIR, "BASE_LANG_DIR")
cmd = f"rm -rf {shlex.quote(str(work_lang))} && cp -a {shlex.quote(str(BASE_LANG_DIR))} {shlex.quote(str(work_lang))}"
_run_cmd(cmd)
model_phones = CHAIN_MODEL_DIR / "phones.txt"
_ensure_exists(model_phones, "model phones.txt")
cmd2 = f"cp -f {shlex.quote(str(model_phones))} {shlex.quote(str(work_lang / 'phones.txt'))}"
_run_cmd(cmd2)
return work_lang
def _lookup_lexicon_pron(word: str) -> List[str]:
"""
from RECIPE_LEXICON find word pronunciation
format:WORD P EH1 N S L
"""
_ensure_exists(RECIPE_LEXICON, "recipe lexicon.txt")
with RECIPE_LEXICON.open("r", encoding="utf-8") as f:
for line in f:
parts = line.strip().split()
if not parts:
continue
if parts[0] == word and len(parts) >= 2:
return parts[1:]
raise HTTPException(status_code=400, detail=f"word not in recipe lexicon: {word}")
def _phones_with_pos_markers(base_phones: List[str]) -> List[str]:
"""
[P, EH1, N, S, L] -> [P_B, EH1_I, N_I, S_I, L_E]
length=1 -> _S
"""
if not base_phones:
return []
if len(base_phones) == 1:
return [base_phones[0] + "_S"]
out = []
for i, ph in enumerate(base_phones):
if i == 0:
out.append(ph + "_B")
elif i == len(base_phones) - 1:
out.append(ph + "_E")
else:
out.append(ph + "_I")
return out
def _ensure_word_in_work_lang(work_lang: Path, target: str) -> None:
words_txt = work_lang / "words.txt"
phones_txt = work_lang / "phones.txt"
align_lexicon_txt = work_lang / "phones/align_lexicon.txt"
_ensure_exists(words_txt, "work words.txt")
_ensure_exists(phones_txt, "work phones.txt")
_ensure_exists(align_lexicon_txt, "work align_lexicon.txt")
words = _load_kv_map(words_txt)
align = _load_align_lexicon(align_lexicon_txt)
phones_map = _load_kv_map(phones_txt)
need_words = target not in words
need_align = target not in align
if not (need_words or need_align):
return
if need_words:
max_id = max(words.values()) if words else 0
new_id = max_id + 1
with words_txt.open("a", encoding="utf-8") as f:
f.write(f"{target} {new_id}\n")
if need_align:
base_pron = _lookup_lexicon_pron(target)
pos_pron = _phones_with_pos_markers(base_pron)
missing = [p for p in pos_pron if p not in phones_map]
if missing:
raise HTTPException(
status_code=500,
detail=f"phones not in model phones.txt: {missing} (word={target}, base={base_pron})",
)
# align_lexicon: WORD WORD P_B EH1_I ...
with align_lexicon_txt.open("a", encoding="utf-8") as f:
f.write(f"{target} {target} " + " ".join(pos_pron) + "\n")
def _prepare_req_data(work_dir: Path, utt_id: str, wav_path: Path, target: str) -> Path:
data_dir = work_dir / "data/req"
data_dir.mkdir(parents=True, exist_ok=True)
(data_dir / "wav.scp").write_text(f"{utt_id} {wav_path}\n", encoding="utf-8")
(data_dir / "text").write_text(f"{utt_id} {target}\n", encoding="utf-8")
(data_dir / "utt2spk").write_text(f"{utt_id} spk1\n", encoding="utf-8")
(data_dir / "spk2utt").write_text(f"spk1 {utt_id}\n", encoding="utf-8")
return data_dir
def _make_mfcc_and_cmvn(work_dir: Path, data_dir: Path) -> None:
exp_dir = work_dir / "exp/make_mfcc_req"
mfcc_dir = work_dir / "mfcc"
exp_dir.mkdir(parents=True, exist_ok=True)
mfcc_dir.mkdir(parents=True, exist_ok=True)
cmd = (
f"{KALDI_INIT} && "
f"steps/make_mfcc.sh --nj 1 --mfcc-config {shlex.quote(str(CONF_MFCC))} --cmd run.pl "
f"{shlex.quote(str(data_dir))} {shlex.quote(str(exp_dir))} {shlex.quote(str(mfcc_dir))} && "
f"steps/compute_cmvn_stats.sh {shlex.quote(str(data_dir))}"
)
_run_cmd(cmd)
def _extract_ivectors(data_dir: Path) -> Path:
ivec_dir = data_dir / "ivectors"
cmd = (
f"{KALDI_INIT} && "
f"steps/online/nnet2/extract_ivectors_online.sh --cmd run.pl --nj 1 "
f"{shlex.quote(str(data_dir))} "
f"{shlex.quote(str(IVECTOR_EXTRACTOR_DIR))} "
f"{shlex.quote(str(ivec_dir))}"
)
_run_cmd(cmd)
return ivec_dir
def _compute_output_probs(data_dir: Path, ivec_dir: Path, work_dir: Path) -> Path:
probs_dir = work_dir / "exp/probs_req"
probs_dir.mkdir(parents=True, exist_ok=True)
cmd = (
f"{KALDI_INIT} && "
f"steps/nnet3/compute_output.sh --cmd run.pl --nj 1 "
f"--use-gpu true "
f"--online-ivector-dir {shlex.quote(str(ivec_dir))} "
f"{shlex.quote(str(data_dir))} "
f"{shlex.quote(str(CHAIN_MODEL_DIR))} "
f"{shlex.quote(str(probs_dir))}"
)
_run_cmd(cmd)
out_ark = probs_dir / "output.1.ark"
if not out_ark.exists() or out_ark.stat().st_size == 0:
raise RuntimeError(f"Missing/empty probs ark: {out_ark}")
return probs_dir
def _make_text_int(work_dir: Path, utt_id: str, target: str, work_lang: Path) -> Path:
words_map = _load_kv_map(work_lang / "words.txt")
if target not in words_map:
raise HTTPException(status_code=400, detail=f"word not in work words.txt after patch: {target}")
text_int = work_dir / "data/req/split1/1/text.int"
text_int.parent.mkdir(parents=True, exist_ok=True)
text_int.write_text(f"{utt_id} {words_map[target]}\n", encoding="utf-8")
return text_int
def _make_text_phone_int(work_dir: Path, utt_id: str, target: str, work_lang: Path) -> Path:
align = _load_align_lexicon(work_lang / "phones/align_lexicon.txt")
phones_map = _load_kv_map(work_lang / "phones.txt")
if target not in align:
raise HTTPException(status_code=400, detail=f"word not in work align_lexicon.txt after patch: {target}")
phones_sym = align[target]
missing = [p for p in phones_sym if p not in phones_map]
if missing:
raise HTTPException(status_code=500, detail=f"phones missing in work phones.txt: {missing}")
phone_ints = [str(phones_map[p]) for p in phones_sym]
out = work_dir / "data/local/text-phone.int"
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(f"{utt_id}.0 " + " ".join(phone_ints) + "\n", encoding="utf-8")
return out
def _make_align_graphs(work_dir: Path, text_int: Path, text_phone_int: Path, work_lang: Path) -> Path:
ali_dir = work_dir / "exp/ali_req"
ali_dir.mkdir(parents=True, exist_ok=True)
disambig_int = work_lang / "phones/disambig.int"
_ensure_exists(disambig_int, "work disambig.int")
fsts_gz = ali_dir / "fsts.1.gz"
cmd = (
f"{KALDI_INIT} && "
f"compile-train-graphs-without-lexicon "
f"--read-disambig-syms={shlex.quote(str(disambig_int))} "
f"{shlex.quote(str(CHAIN_MODEL_DIR / 'tree'))} "
f"{shlex.quote(str(CHAIN_MODEL_DIR / 'final.mdl'))} "
f"\"ark,t:{text_int}\" "
f"\"ark,t:{text_phone_int}\" "
f"\"ark:|gzip -c > {fsts_gz}\""
)
_run_cmd(cmd)
(ali_dir / "num_jobs").write_text("1\n", encoding="utf-8")
return ali_dir
def _align_mapped(data_dir: Path, probs_dir: Path, ali_dir: Path, work_lang: Path) -> None:
cmd = (
f"{KALDI_INIT} && "
f"steps/align_mapped.sh --cmd run.pl --nj 1 --graphs {shlex.quote(str(ali_dir))} "
f"{shlex.quote(str(data_dir))} "
f"{shlex.quote(str(probs_dir))} "
f"{shlex.quote(str(work_lang))} "
f"{shlex.quote(str(CHAIN_MODEL_DIR))} "
f"{shlex.quote(str(ali_dir))}"
)
_run_cmd(cmd)
ali_gz = ali_dir / "ali.1.gz"
if not ali_gz.exists() or ali_gz.stat().st_size == 0:
raise RuntimeError(f"Missing/empty alignment: {ali_gz}")
def _ali_to_phones(ali_dir: Path) -> None:
ali_phone_gz = ali_dir / "ali-phone.1.gz"
cmd = (
f"{KALDI_INIT} && "
f"ali-to-phones --per-frame=true {shlex.quote(str(CHAIN_MODEL_DIR / 'final.mdl'))} "
f"\"ark:gunzip -c {ali_dir / 'ali.1.gz'}|\" "
f"\"ark:|gzip -c > {ali_phone_gz}\""
)
_run_cmd(cmd)
if not ali_phone_gz.exists() or ali_phone_gz.stat().st_size == 0:
raise RuntimeError(f"Missing/empty phone alignment: {ali_phone_gz}")
def _compute_gop(work_dir: Path, probs_dir: Path, ali_dir: Path, work_lang: Path) -> Path:
gop_dir = work_dir / "exp/gop_req"
gop_dir.mkdir(parents=True, exist_ok=True)
gop_txt = gop_dir / "gop.1.txt"
feat_ark = gop_dir / "feat.1.ark"
phone2pure = RECIPE_DIR / "data/lang_nosp/phone-to-pure-phone.int"
phones_pure = RECIPE_DIR / "data/lang_nosp/phones-pure.txt"
_ensure_exists(phone2pure, "phone-to-pure-phone.int")
_ensure_exists(phones_pure, "phones-pure.txt")
cmd = (
f"{KALDI_INIT} && "
f"compute-gop "
f"--phone-map={shlex.quote(str(phone2pure))} "
f"--skip_phones_string=0:1:2 "
f"{shlex.quote(str(CHAIN_MODEL_DIR / 'final.mdl'))} "
f"\"ark:gunzip -c {ali_dir / 'ali.1.gz'}|\" "
f"\"ark:gunzip -c {ali_dir / 'ali-phone.1.gz'}|\" "
f"\"ark:{probs_dir / 'output.1.ark'}\" "
f"\"ark,t:{gop_txt}\" "
f"\"ark:{feat_ark}\""
)
_run_cmd(cmd)
if not gop_txt.exists() or gop_txt.stat().st_size == 0:
raise RuntimeError(f"Missing/empty GOP txt: {gop_txt}")
return gop_txt
def _score_dispatch(lang: str, target: str, wav_path: Path) -> ScoreResponse:
lang = (lang or "en").lower()
if lang not in ("en", "eng", "english"):
raise HTTPException(status_code=400, detail=f"Unsupported lang: {lang}, only 'en' for now")
target = target.strip().upper()
if not target or " " in target:
raise HTTPException(status_code=400, detail="target must be a single word (no spaces)")
_ensure_exists(CHAIN_MODEL_DIR / "final.mdl", "chain final.mdl")
_ensure_exists(CHAIN_MODEL_DIR / "tree", "chain tree")
_ensure_exists(CHAIN_MODEL_DIR / "phones.txt", "chain phones.txt")
_ensure_exists(BASE_LANG_DIR / "words.txt", "BASE lang words.txt")
utt_id = f"utt-{uuid.uuid4().hex[:12]}"
work_dir = WORK_ROOT / utt_id
work_dir.mkdir(parents=True, exist_ok=True)
wav_local = work_dir / "data/req" / f"{utt_id}.wav"
wav_local.parent.mkdir(parents=True, exist_ok=True)
wav_local.write_bytes(wav_path.read_bytes())
try:
# 1) prepare work_lang
work_lang = _copy_lang_to_work(work_dir)
# 2) ensure word in work_lang
_ensure_word_in_work_lang(work_lang, target)
# 3) data dir
data_dir = _prepare_req_data(work_dir, utt_id, wav_local, target)
# 4) mfcc + cmvn
_make_mfcc_and_cmvn(work_dir, data_dir)
# 5) ivectors
ivec_dir = _extract_ivectors(data_dir)
# 6) probs
probs_dir = _compute_output_probs(data_dir, ivec_dir, work_dir)
# 7) text.int / text-phone.int
text_int = _make_text_int(work_dir, utt_id, target, work_lang)
text_phone_int = _make_text_phone_int(work_dir, utt_id, target, work_lang)
# 8) graph + align
ali_dir = _make_align_graphs(work_dir, text_int, text_phone_int, work_lang)
_align_mapped(data_dir, probs_dir, ali_dir, work_lang)
# 9) ali -> phones
_ali_to_phones(ali_dir)
# 10) gop
gop_txt = _compute_gop(work_dir, probs_dir, ali_dir, work_lang)
# 11) output
pure_map = _load_pure_phone_table(RECIPE_DIR / "data/lang_nosp/phones-pure.txt")
utt, pairs = _parse_gop_txt(gop_txt)
phones = []
for pid, g in pairs:
sym = pure_map.get(pid, str(pid))
phones.append(PhoneGop(pure_phone_id=pid, pure_phone=sym, gop=g))
if not phones:
raise RuntimeError("No GOP pairs parsed (empty phones list)")
gops = [p.gop for p in phones]
return ScoreResponse(
utt_id=utt_id,
lang="en",
target=target,
phones=phones,
mean_gop=sum(gops) / len(gops),
min_gop=min(gops),
max_gop=max(gops),
work_dir=str(work_dir) if KEEP_WORKDIR else None,
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e) + f"\nwork_dir={work_dir}")
finally:
if (not KEEP_WORKDIR) and work_dir.exists():
try:
subprocess.run(["bash", "-lc", f"rm -rf {shlex.quote(str(work_dir))}"], check=False)
except Exception:
pass
@app.post("/score/audio/file", response_model=ScoreResponse)
async def score_audio_file(
lang: str = Form("en"),
target: str = Form(...),
file: UploadFile = File(...),
):
if not file.filename:
raise HTTPException(status_code=400, detail="missing file")
data = await file.read()
if not data:
raise HTTPException(status_code=400, detail="empty file")
tmp = Path("/tmp") / f"upload-{uuid.uuid4().hex}.wav"
tmp.write_bytes(data)
try:
return _score_dispatch(lang, target, tmp)
finally:
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
class Base64Req(BaseModel):
lang: str = "en"
target: str
audio_base64: str
@app.post("/score/audio/base64", response_model=ScoreResponse)
async def score_audio_base64(req: Base64Req):
b64 = req.audio_base64.strip()
try:
raw = base64.b64decode(b64, validate=True)
except Exception:
raise HTTPException(status_code=400, detail="invalid base64")
tmp = Path("/tmp") / f"b64-{uuid.uuid4().hex}.wav"
tmp.write_bytes(raw)
try:
return _score_dispatch(req.lang, req.target, tmp)
finally:
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
uvicorn app:app --host 0.0.0.0 --port 8000
评论
暂无评论
* 登录后即可评论