Spaces:
Sleeping
Sleeping
| #!pip install -q specklepy | |
| # lone github repro & add to sys paths | |
| import gradio as gr | |
| import os | |
| import sys | |
| import time | |
| import copy | |
| import requests | |
| from notion_client import Client | |
| from huggingface_hub import webhook_endpoint, WebhookPayload | |
| from fastapi import Request | |
| import requests | |
| import datetime | |
| import json | |
| def get_database_properties_overview(database_pages): | |
| # Iterate through the results (each page corresponds to a row in the database) | |
| for page in database_pages: | |
| print(f"Page ID: {page['id']}") | |
| # Print the name and type of each property in this page | |
| for prop_name, prop_data in page['properties'].items(): | |
| prop_type = prop_data['type'] | |
| print(f"Property Name: {prop_name}, Property Type: {prop_type}") | |
| break | |
| # query full database | |
| def fetch_all_database_pages(client, database_id): | |
| """ | |
| Fetches all pages from a specified Notion database. | |
| :param client: Initialized Notion client. | |
| :param database_id: The ID of the Notion database to query. | |
| :return: A list containing all pages from the database. | |
| """ | |
| start_cursor = None | |
| all_pages = [] | |
| while True: | |
| response = client.databases.query( | |
| **{ | |
| "database_id": database_id, | |
| "start_cursor": start_cursor | |
| } | |
| ) | |
| all_pages.extend(response['results']) | |
| # Check if there's more data to fetch | |
| if response['has_more']: | |
| start_cursor = response['next_cursor'] | |
| else: | |
| break | |
| return all_pages | |
| def get_property_value(page, property_name): | |
| """ | |
| Extracts the value from a specific property in a Notion page based on its type. | |
| :param page: The Notion page data as retrieved from the API. | |
| :param property_name: The name of the property whose value is to be fetched. | |
| :return: The value or values contained in the specified property, depending on type. | |
| """ | |
| # Check if the property exists in the page | |
| if property_name not in page['properties']: | |
| return None # or raise an error if you prefer | |
| property_data = page['properties'][property_name] | |
| prop_type = property_data['type'] | |
| # Handle 'title' and 'rich_text' types | |
| if prop_type in ['title', 'rich_text']: | |
| return ''.join(text_block['text']['content'] for text_block in property_data[prop_type]) | |
| # Handle 'number' type | |
| elif prop_type == 'number': | |
| return property_data[prop_type] | |
| # Handle 'select' type | |
| elif prop_type == 'select': | |
| return property_data[prop_type]['name'] if property_data[prop_type] else None | |
| # Handle 'multi_select' type | |
| elif prop_type == 'multi_select': | |
| return [option['name'] for option in property_data[prop_type]] | |
| # Handle 'date' type | |
| elif prop_type == 'date': | |
| if property_data[prop_type]['end']: | |
| return (property_data[prop_type]['start'], property_data[prop_type]['end']) | |
| else: | |
| return property_data[prop_type]['start'] | |
| # Handle 'relation' type | |
| elif prop_type == 'relation': | |
| return [relation['id'] for relation in property_data[prop_type]] | |
| # Handle 'people' type | |
| elif prop_type == 'people': | |
| return [person['name'] for person in property_data[prop_type] if 'name' in person] | |
| # Add more handlers as needed for other property types | |
| else: | |
| # Return None or raise an error for unsupported property types | |
| return None | |
| def replace_property_value(page, property_name, new_value): | |
| """ | |
| Replaces the value of a specified property in a Notion page object. | |
| :param page: The Notion page object. | |
| :param property_name: The name of the property to replace. | |
| :param new_value: The new value to set for the property. | |
| """ | |
| if property_name not in page['properties']: | |
| raise ValueError(f"Property '{property_name}' not found in the page.") | |
| prop_type = page['properties'][property_name]['type'] | |
| # Handle 'rich_text' and 'title' types | |
| if prop_type in ['rich_text', 'title']: | |
| page['properties'][property_name][prop_type] = [{'text': {'content': new_value}}] | |
| # Handle 'number' type | |
| elif prop_type == 'number': | |
| page['properties'][property_name][prop_type] = new_value | |
| # Handle 'select' type | |
| elif prop_type == 'select': | |
| page['properties'][property_name][prop_type] = {'name': new_value} | |
| # Handle 'multi_select' type (assuming new_value is a list of strings) | |
| elif prop_type == 'multi_select': | |
| page['properties'][property_name][prop_type] = [{'name': val} for val in new_value] | |
| # Handle 'date' type | |
| elif prop_type == 'date': | |
| page['properties'][property_name][prop_type] = {'start': new_value} # Assuming new_value is a date string | |
| # Handle 'checkbox' type | |
| elif prop_type == 'checkbox': | |
| page['properties'][property_name][prop_type] = new_value # Assuming new_value is a boolean | |
| # Handle 'url' type | |
| elif prop_type == 'url': | |
| page['properties'][property_name][prop_type] = new_value # Assuming new_value is a valid URL string | |
| # Handle 'email' type | |
| elif prop_type == 'email': | |
| page['properties'][property_name][prop_type] = new_value # Assuming new_value is a valid email string | |
| # Add more handlers for other types as needed | |
| else: | |
| raise NotImplementedError(f"Property type '{prop_type}' is not supported in this function.") | |
| def remove_property(page, property_name): | |
| """ | |
| Removes a specified property from a Notion page object. | |
| :param page: The Notion page object. | |
| :param property_name: The name of the property to remove. | |
| :return: The updated Notion page object. | |
| """ | |
| if property_name in page['properties']: | |
| del page['properties'][property_name] | |
| else: | |
| raise ValueError(f"Property '{property_name}' not found in the page.") | |
| return page | |
| def check_and_update_or_create_page(notion, page_data, target_database_id, title_property_name): | |
| """ | |
| Checks if a page with the given title exists in the target database. | |
| If it does, updates the page; if not, creates a new page. | |
| :param notion: Notion client instance. | |
| :param page_data: Data for the page to create or update. | |
| :param target_database_id: ID of the target database. | |
| :param title_property_name: The name of the title property to check. | |
| """ | |
| # Query the target database for a page with the same >> title << | |
| query_filter = { | |
| "property": title_property_name, | |
| "title": { | |
| "equals": page_data['properties'][title_property_name]['title'][0]['text']['content'] | |
| } | |
| } | |
| response = notion.databases.query(database_id=target_database_id, filter=query_filter) | |
| existing_pages = response.get('results', []) | |
| if existing_pages: | |
| # Page already exists, update it | |
| page_id = existing_pages[0]['id'] | |
| #print(page_data['properties']) | |
| notion.pages.update(page_id=page_id, properties=page_data['properties']) | |
| else: | |
| # Page does not exist, create it | |
| notion.pages.create(parent={"database_id": target_database_id}, properties=page_data['properties']) | |
| def get_page_by_id(notion_db_pages, page_id): | |
| for pg in notion_db_pages: | |
| if pg["id"] == page_id: | |
| return pg | |
| def reset_database(notion, database_id): | |
| """ | |
| Resets a Notion database by archiving all its pages. | |
| Parameters: | |
| notion (Client): The Notion client instance. | |
| database_id (str): The ID of the database to reset. | |
| """ | |
| def retrieve_all_pages(): | |
| has_more = True | |
| start_cursor = None | |
| pages = [] | |
| while has_more: | |
| response = notion.databases.query( | |
| **{"database_id": database_id, "start_cursor": start_cursor} | |
| ) | |
| pages.extend(response["results"]) | |
| has_more = response["has_more"] | |
| start_cursor = response.get("next_cursor") | |
| return pages | |
| def delete_page(page_id): | |
| notion.pages.update(page_id, archived=True) | |
| pages = retrieve_all_pages() | |
| for page in pages: | |
| delete_page(page["id"]) | |
| print(f"Reset of database {database_id} completed.") | |
| guiding_db_an = "1dafd962f861406d928bbdf109b9bfe4" | |
| land_use_abbr = "3b0e681e922a41409b6a13c11105c56a" | |
| target_db = "ec2a636f079d4d7686f94901b6238242" | |
| # Function to run on button click | |
| # Define a unique endpoint URL | |
| async def update_table(request: Request): | |
| # Read the request body as JSON | |
| payload = await request.json() | |
| print(str(payload)) | |
| continueFlag = True | |
| # Accessing nested data in the 'event' object | |
| if continueFlag: | |
| tkn = os.environ.get("NOTION_TOKEN") | |
| notion = Client(auth=tkn) | |
| database_pages = fetch_all_database_pages(notion, guiding_db_an) | |
| lu_name_db = notion.databases.query(database_id=land_use_abbr) | |
| name_mapper = {} | |
| name_mapperM = {} | |
| for page in lu_name_db["results"]: | |
| oriName = get_property_value(page, "name") | |
| shortName = get_property_value(page, "nameShort") | |
| medName = get_property_value(page, "nameMedium") | |
| name_mapper[oriName] = shortName | |
| name_mapperM[oriName] = medName | |
| # Iterate through the results (each page corresponds to a row in the database) | |
| for page in database_pages: | |
| #check speckleAnalysisName | |
| speckName = get_property_value(page, "speckleName") | |
| shortName = get_property_value(page, "nameShort") | |
| longName = get_property_value(page, "nameLong") | |
| #print(shortName) | |
| #print(longName) | |
| subAttributes = get_property_value(page, "level_4") | |
| subAttributeFlag = False | |
| if len(subAttributes)>1: | |
| if subAttributes[0] != 'NA': | |
| subAttributeFlag = True | |
| for subAttr in subAttributes: | |
| # construct new page (speckleName, Name, longName, shortName, remove level_4) | |
| new_page = copy.deepcopy(page) | |
| speckName_full = speckName+subAttr | |
| if "#+" in shortName: | |
| shortestName = name_mapper.get(subAttr,subAttr) | |
| mediumName = name_mapperM.get(subAttr,subAttr) | |
| #print("shortestName", shortestName, subAttr) | |
| shortName_full = shortName.replace("#+", shortestName) | |
| shortName_full.replace("DRT", "ART") | |
| #update short name | |
| replace_property_value(new_page, "nameShort", shortName_full ) | |
| if "#+" in longName: | |
| longName_full = longName.replace("#+", "::" +mediumName) | |
| longName_full = longName_full.replace(" ::", "::") | |
| longName_full = longName_full.replace(":: ", "::") | |
| longName_full.replace("DRT", "ART") | |
| #update long name | |
| replace_property_value(new_page, "nameLong", longName_full ) | |
| #update speckle name (-> as title column) | |
| replace_property_value(new_page, "name", speckName_full) | |
| # remove level_4 and speckleAnalysisName | |
| new_page = remove_property(new_page, "level_4") | |
| new_page = remove_property(new_page, "speckleName") | |
| new_page = remove_property(new_page, "Items") | |
| # update target database in notion | |
| check_and_update_or_create_page(notion, new_page, target_db, "name") | |
| if subAttributeFlag == False: | |
| # forward page | |
| new_page = copy.deepcopy(page) | |
| replace_property_value(new_page, "name", speckName) | |
| new_page = remove_property(new_page, "level_4") | |
| new_page = remove_property(new_page, "speckleName") | |
| new_page = remove_property(new_page, "Items") | |
| # update target database in notion | |
| #print(get_property_value(new_page, "nameLong")) | |
| check_and_update_or_create_page(notion, new_page, target_db, "name") | |
| return "table updated successfully" | |