Spaces:
Running
Running
| from typing import Any | |
| from graphgen.bases import BaseGenerator | |
| from graphgen.templates import ATOMIC_GENERATION_PROMPT | |
| from graphgen.utils import compute_content_hash, detect_main_language, logger | |
| class AtomicGenerator(BaseGenerator): | |
| def build_prompt( | |
| batch: tuple[list[tuple[str, dict]], list[tuple[Any, Any, dict]]] | |
| ) -> str: | |
| nodes, edges = batch | |
| context = "" | |
| for node in nodes: | |
| context += f"- {node[0]}: {node[1]['description']}\n" | |
| for edge in edges: | |
| context += f"- {edge[0]} - {edge[1]}: {edge[2]['description']}\n" | |
| language = detect_main_language(context) | |
| prompt = ATOMIC_GENERATION_PROMPT[language].format(context=context) | |
| return prompt | |
| def parse_response(response: str) -> dict: | |
| """ | |
| AtomicGenerator normally generates one QA pair per response. | |
| So we just need to parse one QA pair from the response. | |
| :param response: | |
| :return: | |
| """ | |
| if "Question:" in response and "Answer:" in response: | |
| question = response.split("Question:")[1].split("Answer:")[0].strip() | |
| answer = response.split("Answer:")[1].strip() | |
| elif "问题:" in response and "答案:" in response: | |
| question = response.split("问题:")[1].split("答案:")[0].strip() | |
| answer = response.split("答案:")[1].strip() | |
| else: | |
| logger.warning("Failed to parse response: %s", response) | |
| return {} | |
| question = question.strip('"') | |
| answer = answer.strip('"') | |
| logger.debug("Question: %s", question) | |
| logger.debug("Answer: %s", answer) | |
| return { | |
| compute_content_hash(question): { | |
| "question": question, | |
| "answer": answer, | |
| } | |
| } | |