Home >> Kaldi gop-compute

Kaldi gop-compute

2025-12-26 09:53 AtmosphereMao

发音质量计算

  1. Download

  2. kaldi
  3. Librispeech ASR model - Librispeech ASR Chain 1d & Librispeech i-vector extractor
  4. Run

# Librispeech ASR Chain 1d & Librispeech i-vector extractor
# move to
# egs/librispeech/s5

# configuration
cd egs/gop_speechocean762/s5
vim run.sh

    librispeech_eg=../../librispeech/s5
    #model=$librispeech_eg/exp/nnet3_cleaned/tdnn_sp
    model=$librispeech_eg/exp/chain_cleaned/tdnn_1d_sp
    ivector_extractor=$librispeech_eg/exp/nnet3_cleaned/extractor

    #lang=$librispeech_eg/data/lang
    lang=$librispeech_eg/data/lang_nosp

# Run
./run.sh
  1. Deploy API

import base64
import os
import re
import shlex
import subprocess
import uuid
from pathlib import Path
from typing import Dict, List, Tuple, Optional

from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from pydantic import BaseModel

RECIPE_DIR = Path("/your/path/kaldi/egs/gop_speechocean762/s5")
LIBRI_DIR = Path("/your/path/kaldi-gop/kaldi/egs/librispeech/s5")

CHAIN_MODEL_DIR = LIBRI_DIR / "exp/chain_cleaned/tdnn_1d_sp"
IVECTOR_EXTRACTOR_DIR = LIBRI_DIR / "exp/nnet3_cleaned/extractor"

BASE_LANG_DIR = LIBRI_DIR / "data/lang_nosp"

CONF_MFCC = RECIPE_DIR / "conf/mfcc_hires.conf"

RECIPE_LEXICON = RECIPE_DIR / "data/local/dict_nosp/lexicon.txt"

WORK_ROOT = Path("/tmp/gop_service_work")
KEEP_WORKDIR = os.environ.get("GOP_KEEP_WORKDIR", "0") == "1"

KALDI_INIT = f'cd {shlex.quote(str(RECIPE_DIR))} && . ./path.sh && . ./cmd.sh'

app = FastAPI(title="Kaldi GOP Service", version="0.3.0")

class PhoneGop(BaseModel):
    pure_phone_id: int
    pure_phone: str
    gop: float

class ScoreResponse(BaseModel):
    utt_id: str
    lang: str
    target: str
    phones: List[PhoneGop]
    mean_gop: float
    min_gop: float
    max_gop: float
    work_dir: Optional[str] = None

def _run_cmd(cmd: str) -> None:
    p = subprocess.run(
        ["bash", "-lc", cmd],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    if p.returncode != 0:
        raise RuntimeError(
            "CMD FAILED:\n"
            f"cmd: {cmd}\n"
            f"returncode: {p.returncode}\n"
            f"stdout:\n{p.stdout}\n"
            f"stderr:\n{p.stderr}\n"
        )

def _ensure_exists(p: Path, what: str) -> None:
    if not p.exists():
        raise HTTPException(status_code=500, detail=f"Missing {what}: {p}")

def _load_kv_map(txt: Path) -> Dict[str, int]:
    m = {}
    with txt.open("r", encoding="utf-8") as f:
        for line in f:
            s = line.strip()
            if not s:
                continue
            a, b = s.split()
            m[a] = int(b)
    return m

def _load_align_lexicon(align_lexicon_txt: Path) -> Dict[str, List[str]]:
    out: Dict[str, List[str]] = {}
    with align_lexicon_txt.open("r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 3:
                continue
            word = parts[0]
            phones = parts[2:]
            out[word] = phones
    return out

def _load_pure_phone_table(pure_txt: Path) -> Dict[int, str]:
    m = {}
    if not pure_txt.exists():
        return m
    with pure_txt.open("r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 2:
                continue
            sym, idx = parts
            m[int(idx)] = sym
    return m

def _parse_gop_txt(gop_txt: Path) -> Tuple[str, List[Tuple[int, float]]]:
    line = gop_txt.read_text(encoding="utf-8").strip()
    if not line:
        raise RuntimeError(f"Empty GOP file: {gop_txt}")
    utt = line.split()[0]
    pairs = re.findall(r"\[\s*(\d+)\s+([-\d\.eE]+)\s*\]", line)
    out = [(int(pid), float(val)) for pid, val in pairs]
    return utt, out

def _copy_lang_to_work(work_dir: Path) -> Path:
    work_lang = work_dir / "data/lang_nosp"
    work_lang.parent.mkdir(parents=True, exist_ok=True)
    _ensure_exists(BASE_LANG_DIR, "BASE_LANG_DIR")

    cmd = f"rm -rf {shlex.quote(str(work_lang))} && cp -a {shlex.quote(str(BASE_LANG_DIR))} {shlex.quote(str(work_lang))}"
    _run_cmd(cmd)

    model_phones = CHAIN_MODEL_DIR / "phones.txt"
    _ensure_exists(model_phones, "model phones.txt")
    cmd2 = f"cp -f {shlex.quote(str(model_phones))} {shlex.quote(str(work_lang / 'phones.txt'))}"
    _run_cmd(cmd2)

    return work_lang

def _lookup_lexicon_pron(word: str) -> List[str]:
    """
    from RECIPE_LEXICON find word pronunciation
    format:WORD  P EH1 N S L
    """
    _ensure_exists(RECIPE_LEXICON, "recipe lexicon.txt")
    with RECIPE_LEXICON.open("r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split()
            if not parts:
                continue
            if parts[0] == word and len(parts) >= 2:
                return parts[1:]
    raise HTTPException(status_code=400, detail=f"word not in recipe lexicon: {word}")

def _phones_with_pos_markers(base_phones: List[str]) -> List[str]:
    """
    [P, EH1, N, S, L] -> [P_B, EH1_I, N_I, S_I, L_E]
    length=1 -> _S
    """
    if not base_phones:
        return []
    if len(base_phones) == 1:
        return [base_phones[0] + "_S"]
    out = []
    for i, ph in enumerate(base_phones):
        if i == 0:
            out.append(ph + "_B")
        elif i == len(base_phones) - 1:
            out.append(ph + "_E")
        else:
            out.append(ph + "_I")
    return out

def _ensure_word_in_work_lang(work_lang: Path, target: str) -> None:
    words_txt = work_lang / "words.txt"
    phones_txt = work_lang / "phones.txt"
    align_lexicon_txt = work_lang / "phones/align_lexicon.txt"

    _ensure_exists(words_txt, "work words.txt")
    _ensure_exists(phones_txt, "work phones.txt")
    _ensure_exists(align_lexicon_txt, "work align_lexicon.txt")

    words = _load_kv_map(words_txt)
    align = _load_align_lexicon(align_lexicon_txt)
    phones_map = _load_kv_map(phones_txt)

    need_words = target not in words
    need_align = target not in align

    if not (need_words or need_align):
        return

    if need_words:
        max_id = max(words.values()) if words else 0
        new_id = max_id + 1
        with words_txt.open("a", encoding="utf-8") as f:
            f.write(f"{target} {new_id}\n")

    if need_align:
        base_pron = _lookup_lexicon_pron(target)
        pos_pron = _phones_with_pos_markers(base_pron)

        missing = [p for p in pos_pron if p not in phones_map]
        if missing:
            raise HTTPException(
                status_code=500,
                detail=f"phones not in model phones.txt: {missing} (word={target}, base={base_pron})",
            )

        # align_lexicon: WORD WORD P_B EH1_I ...
        with align_lexicon_txt.open("a", encoding="utf-8") as f:
            f.write(f"{target} {target} " + " ".join(pos_pron) + "\n")

def _prepare_req_data(work_dir: Path, utt_id: str, wav_path: Path, target: str) -> Path:
    data_dir = work_dir / "data/req"
    data_dir.mkdir(parents=True, exist_ok=True)

    (data_dir / "wav.scp").write_text(f"{utt_id} {wav_path}\n", encoding="utf-8")
    (data_dir / "text").write_text(f"{utt_id} {target}\n", encoding="utf-8")
    (data_dir / "utt2spk").write_text(f"{utt_id} spk1\n", encoding="utf-8")
    (data_dir / "spk2utt").write_text(f"spk1 {utt_id}\n", encoding="utf-8")
    return data_dir

def _make_mfcc_and_cmvn(work_dir: Path, data_dir: Path) -> None:
    exp_dir = work_dir / "exp/make_mfcc_req"
    mfcc_dir = work_dir / "mfcc"
    exp_dir.mkdir(parents=True, exist_ok=True)
    mfcc_dir.mkdir(parents=True, exist_ok=True)

    cmd = (
        f"{KALDI_INIT} && "
        f"steps/make_mfcc.sh --nj 1 --mfcc-config {shlex.quote(str(CONF_MFCC))} --cmd run.pl "
        f"{shlex.quote(str(data_dir))} {shlex.quote(str(exp_dir))} {shlex.quote(str(mfcc_dir))} && "
        f"steps/compute_cmvn_stats.sh {shlex.quote(str(data_dir))}"
    )
    _run_cmd(cmd)

def _extract_ivectors(data_dir: Path) -> Path:
    ivec_dir = data_dir / "ivectors"
    cmd = (
        f"{KALDI_INIT} && "
        f"steps/online/nnet2/extract_ivectors_online.sh --cmd run.pl --nj 1 "
        f"{shlex.quote(str(data_dir))} "
        f"{shlex.quote(str(IVECTOR_EXTRACTOR_DIR))} "
        f"{shlex.quote(str(ivec_dir))}"
    )
    _run_cmd(cmd)
    return ivec_dir

def _compute_output_probs(data_dir: Path, ivec_dir: Path, work_dir: Path) -> Path:
    probs_dir = work_dir / "exp/probs_req"
    probs_dir.mkdir(parents=True, exist_ok=True)

    cmd = (
        f"{KALDI_INIT} && "
        f"steps/nnet3/compute_output.sh --cmd run.pl --nj 1 "
                f"--use-gpu true "
        f"--online-ivector-dir {shlex.quote(str(ivec_dir))} "
        f"{shlex.quote(str(data_dir))} "
        f"{shlex.quote(str(CHAIN_MODEL_DIR))} "
        f"{shlex.quote(str(probs_dir))}"
    )
    _run_cmd(cmd)

    out_ark = probs_dir / "output.1.ark"
    if not out_ark.exists() or out_ark.stat().st_size == 0:
        raise RuntimeError(f"Missing/empty probs ark: {out_ark}")
    return probs_dir

def _make_text_int(work_dir: Path, utt_id: str, target: str, work_lang: Path) -> Path:
    words_map = _load_kv_map(work_lang / "words.txt")
    if target not in words_map:
        raise HTTPException(status_code=400, detail=f"word not in work words.txt after patch: {target}")

    text_int = work_dir / "data/req/split1/1/text.int"
    text_int.parent.mkdir(parents=True, exist_ok=True)
    text_int.write_text(f"{utt_id} {words_map[target]}\n", encoding="utf-8")
    return text_int

def _make_text_phone_int(work_dir: Path, utt_id: str, target: str, work_lang: Path) -> Path:
    align = _load_align_lexicon(work_lang / "phones/align_lexicon.txt")
    phones_map = _load_kv_map(work_lang / "phones.txt")

    if target not in align:
        raise HTTPException(status_code=400, detail=f"word not in work align_lexicon.txt after patch: {target}")

    phones_sym = align[target]
    missing = [p for p in phones_sym if p not in phones_map]
    if missing:
        raise HTTPException(status_code=500, detail=f"phones missing in work phones.txt: {missing}")

    phone_ints = [str(phones_map[p]) for p in phones_sym]

    out = work_dir / "data/local/text-phone.int"
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(f"{utt_id}.0 " + " ".join(phone_ints) + "\n", encoding="utf-8")
    return out

def _make_align_graphs(work_dir: Path, text_int: Path, text_phone_int: Path, work_lang: Path) -> Path:
    ali_dir = work_dir / "exp/ali_req"
    ali_dir.mkdir(parents=True, exist_ok=True)

    disambig_int = work_lang / "phones/disambig.int"
    _ensure_exists(disambig_int, "work disambig.int")

    fsts_gz = ali_dir / "fsts.1.gz"
    cmd = (
        f"{KALDI_INIT} && "
        f"compile-train-graphs-without-lexicon "
        f"--read-disambig-syms={shlex.quote(str(disambig_int))} "
        f"{shlex.quote(str(CHAIN_MODEL_DIR / 'tree'))} "
        f"{shlex.quote(str(CHAIN_MODEL_DIR / 'final.mdl'))} "
        f"\"ark,t:{text_int}\" "
        f"\"ark,t:{text_phone_int}\" "
        f"\"ark:|gzip -c > {fsts_gz}\""
    )
    _run_cmd(cmd)
    (ali_dir / "num_jobs").write_text("1\n", encoding="utf-8")
    return ali_dir

def _align_mapped(data_dir: Path, probs_dir: Path, ali_dir: Path, work_lang: Path) -> None:
    cmd = (
        f"{KALDI_INIT} && "
        f"steps/align_mapped.sh --cmd run.pl --nj 1 --graphs {shlex.quote(str(ali_dir))} "
        f"{shlex.quote(str(data_dir))} "
        f"{shlex.quote(str(probs_dir))} "
        f"{shlex.quote(str(work_lang))} "
        f"{shlex.quote(str(CHAIN_MODEL_DIR))} "
        f"{shlex.quote(str(ali_dir))}"
    )
    _run_cmd(cmd)

    ali_gz = ali_dir / "ali.1.gz"
    if not ali_gz.exists() or ali_gz.stat().st_size == 0:
        raise RuntimeError(f"Missing/empty alignment: {ali_gz}")

def _ali_to_phones(ali_dir: Path) -> None:
    ali_phone_gz = ali_dir / "ali-phone.1.gz"
    cmd = (
        f"{KALDI_INIT} && "
        f"ali-to-phones --per-frame=true {shlex.quote(str(CHAIN_MODEL_DIR / 'final.mdl'))} "
        f"\"ark:gunzip -c {ali_dir / 'ali.1.gz'}|\" "
        f"\"ark:|gzip -c > {ali_phone_gz}\""
    )
    _run_cmd(cmd)

    if not ali_phone_gz.exists() or ali_phone_gz.stat().st_size == 0:
        raise RuntimeError(f"Missing/empty phone alignment: {ali_phone_gz}")

def _compute_gop(work_dir: Path, probs_dir: Path, ali_dir: Path, work_lang: Path) -> Path:
    gop_dir = work_dir / "exp/gop_req"
    gop_dir.mkdir(parents=True, exist_ok=True)

    gop_txt = gop_dir / "gop.1.txt"
    feat_ark = gop_dir / "feat.1.ark"

    phone2pure = RECIPE_DIR / "data/lang_nosp/phone-to-pure-phone.int"
    phones_pure = RECIPE_DIR / "data/lang_nosp/phones-pure.txt"
    _ensure_exists(phone2pure, "phone-to-pure-phone.int")
    _ensure_exists(phones_pure, "phones-pure.txt")

    cmd = (
        f"{KALDI_INIT} && "
        f"compute-gop "
        f"--phone-map={shlex.quote(str(phone2pure))} "
        f"--skip_phones_string=0:1:2 "
        f"{shlex.quote(str(CHAIN_MODEL_DIR / 'final.mdl'))} "
        f"\"ark:gunzip -c {ali_dir / 'ali.1.gz'}|\" "
        f"\"ark:gunzip -c {ali_dir / 'ali-phone.1.gz'}|\" "
        f"\"ark:{probs_dir / 'output.1.ark'}\" "
        f"\"ark,t:{gop_txt}\" "
        f"\"ark:{feat_ark}\""
    )
    _run_cmd(cmd)

    if not gop_txt.exists() or gop_txt.stat().st_size == 0:
        raise RuntimeError(f"Missing/empty GOP txt: {gop_txt}")
    return gop_txt

def _score_dispatch(lang: str, target: str, wav_path: Path) -> ScoreResponse:
    lang = (lang or "en").lower()
    if lang not in ("en", "eng", "english"):
        raise HTTPException(status_code=400, detail=f"Unsupported lang: {lang}, only 'en' for now")

    target = target.strip().upper()
    if not target or " " in target:
        raise HTTPException(status_code=400, detail="target must be a single word (no spaces)")

    _ensure_exists(CHAIN_MODEL_DIR / "final.mdl", "chain final.mdl")
    _ensure_exists(CHAIN_MODEL_DIR / "tree", "chain tree")
    _ensure_exists(CHAIN_MODEL_DIR / "phones.txt", "chain phones.txt")
    _ensure_exists(BASE_LANG_DIR / "words.txt", "BASE lang words.txt")

    utt_id = f"utt-{uuid.uuid4().hex[:12]}"
    work_dir = WORK_ROOT / utt_id
    work_dir.mkdir(parents=True, exist_ok=True)

    wav_local = work_dir / "data/req" / f"{utt_id}.wav"
    wav_local.parent.mkdir(parents=True, exist_ok=True)
    wav_local.write_bytes(wav_path.read_bytes())

    try:
        # 1) prepare work_lang
        work_lang = _copy_lang_to_work(work_dir)

        # 2) ensure word in work_lang
        _ensure_word_in_work_lang(work_lang, target)

        # 3) data dir
        data_dir = _prepare_req_data(work_dir, utt_id, wav_local, target)

        # 4) mfcc + cmvn
        _make_mfcc_and_cmvn(work_dir, data_dir)

        # 5) ivectors
        ivec_dir = _extract_ivectors(data_dir)

        # 6) probs
        probs_dir = _compute_output_probs(data_dir, ivec_dir, work_dir)

        # 7) text.int / text-phone.int
        text_int = _make_text_int(work_dir, utt_id, target, work_lang)
        text_phone_int = _make_text_phone_int(work_dir, utt_id, target, work_lang)

        # 8) graph + align
        ali_dir = _make_align_graphs(work_dir, text_int, text_phone_int, work_lang)
        _align_mapped(data_dir, probs_dir, ali_dir, work_lang)

        # 9) ali -> phones
        _ali_to_phones(ali_dir)

        # 10) gop
        gop_txt = _compute_gop(work_dir, probs_dir, ali_dir, work_lang)

        # 11) output
        pure_map = _load_pure_phone_table(RECIPE_DIR / "data/lang_nosp/phones-pure.txt")
        utt, pairs = _parse_gop_txt(gop_txt)

        phones = []
        for pid, g in pairs:
            sym = pure_map.get(pid, str(pid))
            phones.append(PhoneGop(pure_phone_id=pid, pure_phone=sym, gop=g))

        if not phones:
            raise RuntimeError("No GOP pairs parsed (empty phones list)")

        gops = [p.gop for p in phones]
        return ScoreResponse(
            utt_id=utt_id,
            lang="en",
            target=target,
            phones=phones,
            mean_gop=sum(gops) / len(gops),
            min_gop=min(gops),
            max_gop=max(gops),
            work_dir=str(work_dir) if KEEP_WORKDIR else None,
        )

    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e) + f"\nwork_dir={work_dir}")
    finally:
        if (not KEEP_WORKDIR) and work_dir.exists():
            try:
                subprocess.run(["bash", "-lc", f"rm -rf {shlex.quote(str(work_dir))}"], check=False)
            except Exception:
                pass

@app.post("/score/audio/file", response_model=ScoreResponse)
async def score_audio_file(
    lang: str = Form("en"),
    target: str = Form(...),
    file: UploadFile = File(...),
):
    if not file.filename:
        raise HTTPException(status_code=400, detail="missing file")
    data = await file.read()
    if not data:
        raise HTTPException(status_code=400, detail="empty file")

    tmp = Path("/tmp") / f"upload-{uuid.uuid4().hex}.wav"
    tmp.write_bytes(data)
    try:
        return _score_dispatch(lang, target, tmp)
    finally:
        try:
            tmp.unlink(missing_ok=True)
        except Exception:
            pass

class Base64Req(BaseModel):
    lang: str = "en"
    target: str
    audio_base64: str

@app.post("/score/audio/base64", response_model=ScoreResponse)
async def score_audio_base64(req: Base64Req):
    b64 = req.audio_base64.strip()
    try:
        raw = base64.b64decode(b64, validate=True)
    except Exception:
        raise HTTPException(status_code=400, detail="invalid base64")

    tmp = Path("/tmp") / f"b64-{uuid.uuid4().hex}.wav"
    tmp.write_bytes(raw)
    try:
        return _score_dispatch(req.lang, req.target, tmp)
    finally:
        try:
            tmp.unlink(missing_ok=True)
        except Exception:
            pass
uvicorn app:app --host 0.0.0.0 --port 8000

评论


暂无评论


* 登录后即可评论

©2022 联系我们

粤ICP备2022023863号
500x500