Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import numpy as np | |
| import sys | |
| def compute_wer(ref_file, hyp_file, cer_detail_file): | |
| rst = { | |
| "Wrd": 0, | |
| "Corr": 0, | |
| "Ins": 0, | |
| "Del": 0, | |
| "Sub": 0, | |
| "Snt": 0, | |
| "Err": 0.0, | |
| "S.Err": 0.0, | |
| "wrong_words": 0, | |
| "wrong_sentences": 0, | |
| } | |
| hyp_dict = {} | |
| ref_dict = {} | |
| with open(hyp_file, "r") as hyp_reader: | |
| for line in hyp_reader: | |
| key = line.strip().split()[0] | |
| value = line.strip().split()[1:] | |
| hyp_dict[key] = value | |
| with open(ref_file, "r") as ref_reader: | |
| for line in ref_reader: | |
| key = line.strip().split()[0] | |
| value = line.strip().split()[1:] | |
| ref_dict[key] = value | |
| cer_detail_writer = open(cer_detail_file, "w") | |
| for hyp_key in hyp_dict: | |
| if hyp_key in ref_dict: | |
| out_item = compute_wer_by_line(hyp_dict[hyp_key], ref_dict[hyp_key]) | |
| rst["Wrd"] += out_item["nwords"] | |
| rst["Corr"] += out_item["cor"] | |
| rst["wrong_words"] += out_item["wrong"] | |
| rst["Ins"] += out_item["ins"] | |
| rst["Del"] += out_item["del"] | |
| rst["Sub"] += out_item["sub"] | |
| rst["Snt"] += 1 | |
| if out_item["wrong"] > 0: | |
| rst["wrong_sentences"] += 1 | |
| cer_detail_writer.write(hyp_key + print_cer_detail(out_item) + "\n") | |
| cer_detail_writer.write( | |
| "ref:" | |
| + "\t" | |
| + " ".join(list(map(lambda x: x.lower(), ref_dict[hyp_key]))) | |
| + "\n" | |
| ) | |
| cer_detail_writer.write( | |
| "hyp:" | |
| + "\t" | |
| + " ".join(list(map(lambda x: x.lower(), hyp_dict[hyp_key]))) | |
| + "\n" | |
| ) | |
| if rst["Wrd"] > 0: | |
| rst["Err"] = round(rst["wrong_words"] * 100 / rst["Wrd"], 2) | |
| if rst["Snt"] > 0: | |
| rst["S.Err"] = round(rst["wrong_sentences"] * 100 / rst["Snt"], 2) | |
| cer_detail_writer.write("\n") | |
| cer_detail_writer.write( | |
| "%WER " | |
| + str(rst["Err"]) | |
| + " [ " | |
| + str(rst["wrong_words"]) | |
| + " / " | |
| + str(rst["Wrd"]) | |
| + ", " | |
| + str(rst["Ins"]) | |
| + " ins, " | |
| + str(rst["Del"]) | |
| + " del, " | |
| + str(rst["Sub"]) | |
| + " sub ]" | |
| + "\n" | |
| ) | |
| cer_detail_writer.write( | |
| "%SER " | |
| + str(rst["S.Err"]) | |
| + " [ " | |
| + str(rst["wrong_sentences"]) | |
| + " / " | |
| + str(rst["Snt"]) | |
| + " ]" | |
| + "\n" | |
| ) | |
| cer_detail_writer.write( | |
| "Scored " | |
| + str(len(hyp_dict)) | |
| + " sentences, " | |
| + str(len(hyp_dict) - rst["Snt"]) | |
| + " not present in hyp." | |
| + "\n" | |
| ) | |
| def compute_wer_by_line(hyp, ref): | |
| hyp = list(map(lambda x: x.lower(), hyp)) | |
| ref = list(map(lambda x: x.lower(), ref)) | |
| len_hyp = len(hyp) | |
| len_ref = len(ref) | |
| cost_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int16) | |
| ops_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int8) | |
| for i in range(len_hyp + 1): | |
| cost_matrix[i][0] = i | |
| for j in range(len_ref + 1): | |
| cost_matrix[0][j] = j | |
| for i in range(1, len_hyp + 1): | |
| for j in range(1, len_ref + 1): | |
| if hyp[i - 1] == ref[j - 1]: | |
| cost_matrix[i][j] = cost_matrix[i - 1][j - 1] | |
| else: | |
| substitution = cost_matrix[i - 1][j - 1] + 1 | |
| insertion = cost_matrix[i - 1][j] + 1 | |
| deletion = cost_matrix[i][j - 1] + 1 | |
| compare_val = [substitution, insertion, deletion] | |
| min_val = min(compare_val) | |
| operation_idx = compare_val.index(min_val) + 1 | |
| cost_matrix[i][j] = min_val | |
| ops_matrix[i][j] = operation_idx | |
| match_idx = [] | |
| i = len_hyp | |
| j = len_ref | |
| rst = {"nwords": len_ref, "cor": 0, "wrong": 0, "ins": 0, "del": 0, "sub": 0} | |
| while i >= 0 or j >= 0: | |
| i_idx = max(0, i) | |
| j_idx = max(0, j) | |
| if ops_matrix[i_idx][j_idx] == 0: # correct | |
| if i - 1 >= 0 and j - 1 >= 0: | |
| match_idx.append((j - 1, i - 1)) | |
| rst["cor"] += 1 | |
| i -= 1 | |
| j -= 1 | |
| elif ops_matrix[i_idx][j_idx] == 2: # insert | |
| i -= 1 | |
| rst["ins"] += 1 | |
| elif ops_matrix[i_idx][j_idx] == 3: # delete | |
| j -= 1 | |
| rst["del"] += 1 | |
| elif ops_matrix[i_idx][j_idx] == 1: # substitute | |
| i -= 1 | |
| j -= 1 | |
| rst["sub"] += 1 | |
| if i < 0 and j >= 0: | |
| rst["del"] += 1 | |
| elif j < 0 and i >= 0: | |
| rst["ins"] += 1 | |
| match_idx.reverse() | |
| wrong_cnt = cost_matrix[len_hyp][len_ref] | |
| rst["wrong"] = wrong_cnt | |
| return rst | |
| def print_cer_detail(rst): | |
| return ( | |
| "(" | |
| + "nwords=" | |
| + str(rst["nwords"]) | |
| + ",cor=" | |
| + str(rst["cor"]) | |
| + ",ins=" | |
| + str(rst["ins"]) | |
| + ",del=" | |
| + str(rst["del"]) | |
| + ",sub=" | |
| + str(rst["sub"]) | |
| + ") corr:" | |
| + "{:.2%}".format(rst["cor"] / rst["nwords"]) | |
| + ",cer:" | |
| + "{:.2%}".format(rst["wrong"] / rst["nwords"]) | |
| ) | |
| if __name__ == "__main__": | |
| if len(sys.argv) != 4: | |
| print("usage : python compute-wer.py test.ref test.hyp test.wer") | |
| sys.exit(0) | |
| ref_file = sys.argv[1] | |
| hyp_file = sys.argv[2] | |
| cer_detail_file = sys.argv[3] | |
| compute_wer(ref_file, hyp_file, cer_detail_file) | |