GraphGen / graphgen /models /generator /atomic_generator.py
github-actions[bot]
Auto-sync from demo at Thu Oct 23 12:37:24 UTC 2025
8c66169
raw
history blame
1.87 kB
from typing import Any
from graphgen.bases import BaseGenerator
from graphgen.templates import ATOMIC_GENERATION_PROMPT
from graphgen.utils import compute_content_hash, detect_main_language, logger
class AtomicGenerator(BaseGenerator):
@staticmethod
def build_prompt(
batch: tuple[list[tuple[str, dict]], list[tuple[Any, Any, dict]]]
) -> str:
nodes, edges = batch
context = ""
for node in nodes:
context += f"- {node[0]}: {node[1]['description']}\n"
for edge in edges:
context += f"- {edge[0]} - {edge[1]}: {edge[2]['description']}\n"
language = detect_main_language(context)
prompt = ATOMIC_GENERATION_PROMPT[language].format(context=context)
return prompt
@staticmethod
def parse_response(response: str) -> dict:
"""
AtomicGenerator normally generates one QA pair per response.
So we just need to parse one QA pair from the response.
:param response:
:return:
"""
if "Question:" in response and "Answer:" in response:
question = response.split("Question:")[1].split("Answer:")[0].strip()
answer = response.split("Answer:")[1].strip()
elif "问题:" in response and "答案:" in response:
question = response.split("问题:")[1].split("答案:")[0].strip()
answer = response.split("答案:")[1].strip()
else:
logger.warning("Failed to parse response: %s", response)
return {}
question = question.strip('"')
answer = answer.strip('"')
logger.debug("Question: %s", question)
logger.debug("Answer: %s", answer)
return {
compute_content_hash(question): {
"question": question,
"answer": answer,
}
}