| | import hashlib |
| | import json |
| | from aiohttp import web |
| | from server import PromptServer |
| | import folder_paths |
| | import os |
| |
|
| |
|
| | def get_metadata(filepath): |
| | with open(filepath, "rb") as file: |
| | |
| | |
| | header_size = int.from_bytes(file.read(8), "little", signed=False) |
| |
|
| | if header_size <= 0: |
| | raise BufferError("Invalid header size") |
| |
|
| | header = file.read(header_size) |
| | if header_size <= 0: |
| | raise BufferError("Invalid header") |
| |
|
| | header_json = json.loads(header) |
| | return header_json["__metadata__"] if "__metadata__" in header_json else None |
| |
|
| |
|
| | @PromptServer.instance.routes.post("/pysssss/metadata/notes/{name}") |
| | async def save_notes(request): |
| | name = request.match_info["name"] |
| | pos = name.index("/") |
| | type = name[0:pos] |
| | name = name[pos+1:] |
| |
|
| | file_path = None |
| | if type == "embeddings": |
| | name = name.lower() |
| | files = folder_paths.get_filename_list(type) |
| | for f in files: |
| | lower_f = f.lower() |
| | if lower_f == name: |
| | file_path = folder_paths.get_full_path(type, f) |
| | else: |
| | n = os.path.splitext(f)[0].lower() |
| | if n == name: |
| | file_path = folder_paths.get_full_path(type, f) |
| |
|
| | if file_path is not None: |
| | break |
| | else: |
| | file_path = folder_paths.get_full_path( |
| | type, name) |
| | if not file_path: |
| | return web.Response(status=404) |
| |
|
| | file_no_ext = os.path.splitext(file_path)[0] |
| | info_file = file_no_ext + ".txt" |
| | with open(info_file, "w") as f: |
| | f.write(await request.text()) |
| |
|
| | return web.Response(status=200) |
| |
|
| |
|
| | @PromptServer.instance.routes.get("/pysssss/metadata/{name}") |
| | async def load_metadata(request): |
| | name = request.match_info["name"] |
| | pos = name.index("/") |
| | type = name[0:pos] |
| | name = name[pos+1:] |
| |
|
| | file_path = None |
| | if type == "embeddings": |
| | name = name.lower() |
| | files = folder_paths.get_filename_list(type) |
| | for f in files: |
| | lower_f = f.lower() |
| | if lower_f == name: |
| | file_path = folder_paths.get_full_path(type, f) |
| | else: |
| | n = os.path.splitext(f)[0].lower() |
| | if n == name: |
| | file_path = folder_paths.get_full_path(type, f) |
| |
|
| | if file_path is not None: |
| | break |
| | else: |
| | file_path = folder_paths.get_full_path( |
| | type, name) |
| | if not file_path: |
| | return web.Response(status=404) |
| |
|
| | try: |
| | meta = get_metadata(file_path) |
| | except: |
| | meta = None |
| |
|
| | if meta is None: |
| | meta = {} |
| |
|
| | file_no_ext = os.path.splitext(file_path)[0] |
| |
|
| | info_file = file_no_ext + ".txt" |
| | if os.path.isfile(info_file): |
| | with open(info_file, "r") as f: |
| | meta["pysssss.notes"] = f.read() |
| |
|
| | hash_file = file_no_ext + ".sha256" |
| | if os.path.isfile(hash_file): |
| | with open(hash_file, "rt") as f: |
| | meta["pysssss.sha256"] = f.read() |
| | else: |
| | with open(file_path, "rb") as f: |
| | meta["pysssss.sha256"] = hashlib.sha256(f.read()).hexdigest() |
| | with open(hash_file, "wt") as f: |
| | f.write(meta["pysssss.sha256"]) |
| |
|
| | return web.json_response(meta) |
| |
|