Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright (c) 2024 OSU Natural Language Processing Group | |
| # | |
| # Licensed under the OpenRAIL-S License; | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # https://www.licenses.ai/ai-pubs-open-rails-vz1 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import re | |
| import asyncio | |
| from difflib import SequenceMatcher | |
| from playwright.sync_api import Playwright, expect, sync_playwright | |
| # from playwright.async_api import async_playwright | |
| from pathlib import Path | |
| import toml | |
| import os | |
| import traceback | |
| async def normal_launch_async(playwright: Playwright,headless=False,args=None): | |
| browser = await playwright.chromium.launch( | |
| traces_dir=None, | |
| headless=False, | |
| args=args, | |
| # ignore_default_args=ignore_args, | |
| # chromium_sandbox=False, | |
| ) | |
| return browser | |
| async def normal_new_context_async( | |
| browser, | |
| storage_state=None, | |
| har_path=None, | |
| video_path=None, | |
| tracing=False, | |
| trace_screenshots=False, | |
| trace_snapshots=False, | |
| trace_sources=False, | |
| locale=None, | |
| geolocation=None, | |
| user_agent: str = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", | |
| viewport: dict = {"width": 1280, "height": 720}, | |
| ): | |
| context = await browser.new_context( | |
| storage_state=storage_state, | |
| user_agent=user_agent, | |
| viewport=viewport, | |
| locale=locale, | |
| record_har_path=har_path, | |
| record_video_dir=video_path, | |
| geolocation=geolocation, | |
| ) | |
| if tracing: | |
| await context.tracing.start(screenshots=trace_screenshots, snapshots=trace_snapshots, sources=trace_sources) | |
| return context | |
| # | |
| # def persistent_launch(playwright: Playwright, user_data_dir: str = ""): | |
| # context = playwright.chromium.launch_persistent_context( | |
| # user_data_dir=user_data_dir, | |
| # headless=False, | |
| # args=["--no-default-browser-check", | |
| # "--no_sandbox", | |
| # "--disable-blink-features=AutomationControlled", | |
| # ], | |
| # ignore_default_args=ignore_args, | |
| # user_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", | |
| # viewport={"width": 1280, "height": 720}, | |
| # bypass_csp=True, | |
| # slow_mo=1000, | |
| # chromium_sandbox=True, | |
| # channel="chrome-dev" | |
| # ) | |
| # return context | |
| # | |
| # async def persistent_launch_async(playwright: Playwright, user_data_dir: str = "", record_video_dir="video"): | |
| # context = await playwright.chromium.launch_persistent_context( | |
| # user_data_dir=user_data_dir, | |
| # headless=False, | |
| # args=[ | |
| # "--disable-blink-features=AutomationControlled", | |
| # ], | |
| # ignore_default_args=ignore_args, | |
| # user_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", | |
| # # viewport={"width": 1280, "height": 720}, | |
| # record_video_dir=record_video_dir, | |
| # channel="chrome-dev" | |
| # # slow_mo=1000, | |
| # ) | |
| # return context | |
| def remove_extra_eol(text): | |
| # Replace EOL symbols | |
| text = text.replace('\n', ' ') | |
| return re.sub(r'\s{2,}', ' ', text) | |
| def get_first_line(s): | |
| first_line = s.split('\n')[0] | |
| tokens = first_line.split() | |
| if len(tokens) > 8: | |
| return ' '.join(tokens[:8]) + '...' | |
| else: | |
| return first_line | |
| async def get_element_description(element, tag_name, role_value, type_value): | |
| ''' | |
| Asynchronously generates a descriptive text for a web element based on its tag type. | |
| Handles various HTML elements like 'select', 'input', and 'textarea', extracting attributes and content relevant to accessibility and interaction. | |
| ''' | |
| # text_content = await element.inner_text(timeout=0) | |
| # text = (text_content or '').strip() | |
| # | |
| # print(text) | |
| salient_attributes = [ | |
| "alt", | |
| "aria-describedby", | |
| "aria-label", | |
| "aria-role", | |
| "input-checked", | |
| # "input-value", | |
| "label", | |
| "name", | |
| "option_selected", | |
| "placeholder", | |
| "readonly", | |
| "text-value", | |
| "title", | |
| "value", | |
| ] | |
| parent_value = "parent_node: " | |
| parent_locator = element.locator('xpath=..') | |
| num_parents = await parent_locator.count() | |
| if num_parents > 0: | |
| # only will be zero or one parent node | |
| parent_text = (await parent_locator.inner_text(timeout=0) or "").strip() | |
| if parent_text: | |
| parent_value += parent_text | |
| parent_value = remove_extra_eol(get_first_line(parent_value)).strip() | |
| if parent_value == "parent_node:": | |
| parent_value = "" | |
| else: | |
| parent_value += " " | |
| if tag_name == "select": | |
| text1 = "Selected Options: " | |
| text3 = " - Options: " | |
| text2 = await element.evaluate( | |
| "select => select.options[select.selectedIndex].textContent", timeout=0 | |
| ) | |
| if text2: | |
| options = await element.evaluate("select => Array.from(select.options).map(option => option.text)", | |
| timeout=0) | |
| text4 = " | ".join(options) | |
| if not text4: | |
| text4 = await element.text_content(timeout=0) | |
| if not text4: | |
| text4 = await element.inner_text(timeout=0) | |
| return parent_value+text1 + remove_extra_eol(text2.strip()) + text3 + text4 | |
| input_value = "" | |
| none_input_type = ["submit", "reset", "checkbox", "radio", "button", "file"] | |
| if tag_name == "input" or tag_name == "textarea": | |
| if role_value not in none_input_type and type_value not in none_input_type: | |
| text1 = "input value=" | |
| text2 = await element.input_value(timeout=0) | |
| if text2: | |
| input_value = text1 + "\"" + text2 + "\"" + " " | |
| text_content = await element.text_content(timeout=0) | |
| text = (text_content or '').strip() | |
| # print(text) | |
| if text: | |
| text = remove_extra_eol(text) | |
| if len(text) > 80: | |
| text_content_in = await element.inner_text(timeout=0) | |
| text_in = (text_content_in or '').strip() | |
| if text_in: | |
| return input_value + remove_extra_eol(text_in) | |
| else: | |
| return input_value + text | |
| # get salient_attributes | |
| text1 = "" | |
| for attr in salient_attributes: | |
| attribute_value = await element.get_attribute(attr, timeout=0) | |
| if attribute_value: | |
| text1 += f"{attr}=" + "\"" + attribute_value.strip() + "\"" + " " | |
| text = (parent_value + text1).strip() | |
| if text: | |
| return input_value + remove_extra_eol(text.strip()) | |
| # try to get from the first child node | |
| first_child_locator = element.locator('xpath=./child::*[1]') | |
| num_childs = await first_child_locator.count() | |
| if num_childs>0: | |
| for attr in salient_attributes: | |
| attribute_value = await first_child_locator.get_attribute(attr, timeout=0) | |
| if attribute_value: | |
| text1 += f"{attr}=" + "\"" + attribute_value.strip() + "\"" + " " | |
| text = (parent_value + text1).strip() | |
| if text: | |
| return input_value + remove_extra_eol(text.strip()) | |
| return None | |
| async def get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=None): | |
| try: | |
| tag_name_list = ['a', 'button', | |
| 'input', | |
| 'select', 'textarea', 'adc-tab'] | |
| rect = await element.bounding_box() or {'x': -1, 'y': -1, 'width': 0, 'height': 0} | |
| if rect['x']<0 or rect['y']<0 or rect['width']<=4 or rect['height']<=4 or rect['y']+rect['height']>viewport_size["height"] or rect['x']+ rect['width']>viewport_size["width"]: | |
| return None | |
| if coordinates is not None: | |
| if coordinates[0]>=rect['x'] and coordinates[0]<=rect['x']+rect['width'] and coordinates[1]>=rect['y'] and coordinates[1]<=rect['y']+rect['height']: | |
| print(coordinates) | |
| print(rect) | |
| else: | |
| return None | |
| box_model = [rect['x'], rect['y'], rect['x'] + rect['width'], rect['y'] + rect['height']] | |
| center_point = (round((box_model[0] + box_model[2]) / 2 , 3), | |
| round((box_model[1] + box_model[3]) / 2 , 3)) | |
| if await element.is_hidden(timeout=0) or await element.is_disabled(timeout=0): | |
| return None | |
| if center_point in seen_elements: | |
| return None | |
| # await aprint(element,tag_name) | |
| if tag_name in tag_name_list: | |
| tag_head = tag_name | |
| real_tag_name = tag_name | |
| else: | |
| real_tag_name = await element.evaluate("element => element.tagName.toLowerCase()", timeout=0) | |
| if real_tag_name in tag_name_list: | |
| # already detected | |
| return None | |
| else: | |
| tag_head = real_tag_name | |
| text_element = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'td', "div","em","center","strong","b","i","small","mark","abbr","cite","q","blockquote","span","nobr"] | |
| if real_tag_name in text_element: | |
| return None | |
| role_value = await element.get_attribute('role', timeout=0) | |
| type_value = await element.get_attribute('type', timeout=0) | |
| # await aprint("start to get element description",element,tag_name ) | |
| description = await get_element_description(element, real_tag_name, role_value, type_value) | |
| # print(description) | |
| if not description: | |
| return None | |
| if role_value: | |
| tag_head += " role=" + "\"" + role_value + "\"" | |
| if type_value: | |
| tag_head += " type=" + "\"" + type_value + "\"" | |
| ''' | |
| 0: center_point =(x,y) | |
| 1: description | |
| 2: tag_with_role: tag_head with role and type # TODO: Consider adding more | |
| 3. box | |
| 4. selector | |
| 5. tag | |
| ''' | |
| selector = element | |
| if coordinates is not None: | |
| if coordinates[0]>=rect['x'] and coordinates[0]<=rect['x']+rect['width'] and coordinates[1]>=rect['y'] and coordinates[1]<=rect['y']+rect['height']: | |
| print(tag_head) | |
| print(description) | |
| print(box_model) | |
| else: | |
| return None | |
| return {"center_point":center_point,"description":description,"tag_with_role":tag_head,"box":box_model,"selector":selector,"tag":real_tag_name} | |
| # return [center_point, description, tag_head, box_model, selector, real_tag_name] | |
| except Exception as e: | |
| print(traceback.format_exc()) | |
| print(e) | |
| return None | |
| async def get_interactive_elements_with_playwright(page,viewport_size,coordinates=None): | |
| print("Get Interactive elements around: ", coordinates) | |
| interactive_elements_selectors = [ | |
| 'a', 'button', | |
| 'input', | |
| 'select', 'textarea', | |
| ] | |
| seen_elements = set() | |
| tasks = [] | |
| for selector in interactive_elements_selectors: | |
| locator = page.locator(selector) | |
| element_count = await locator.count() | |
| for index in range(element_count): | |
| element = locator.nth(index) | |
| tag_name = selector | |
| task = get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=coordinates) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| interactive_elements = [] | |
| for i in results: | |
| if i: | |
| if i["center_point"] in seen_elements: | |
| continue | |
| else: | |
| seen_elements.add(i["center_point"]) | |
| interactive_elements.append(i) | |
| # interactive_elements_selectors = [ | |
| # '*' | |
| # ] | |
| # tasks = [] | |
| # | |
| # for selector in interactive_elements_selectors: | |
| # locator = page.locator(selector) | |
| # element_count = await locator.count() | |
| # for index in range(element_count): | |
| # element = locator.nth(index) | |
| # tag_name = selector | |
| # task = get_element_data(element, tag_name, viewport_size,seen_elements,coordinates) | |
| # | |
| # tasks.append(task) | |
| # | |
| # results = await asyncio.gather(*tasks) | |
| # | |
| # | |
| # for i in results: | |
| # if i: | |
| # if i["center_point"] in seen_elements: | |
| # continue | |
| # else: | |
| # seen_elements.add(i["center_point"]) | |
| # interactive_elements.append(i) | |
| return interactive_elements | |
| async def get_select_elements_with_playwright(page,viewport_size): | |
| interactive_elements_selectors = [ | |
| 'select' | |
| ] | |
| seen_elements = set() | |
| tasks = [] | |
| for selector in interactive_elements_selectors: | |
| locator = page.locator(selector) | |
| element_count = await locator.count() | |
| for index in range(element_count): | |
| element = locator.nth(index) | |
| tag_name = selector | |
| task = get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=None) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| interactive_elements = [] | |
| for i in results: | |
| if i: | |
| if i["center_point"] in seen_elements: | |
| continue | |
| else: | |
| seen_elements.add(i["center_point"]) | |
| interactive_elements.append(i) | |
| return interactive_elements | |
| async def select_option(selector, value): | |
| best_option = [-1, "", -1] | |
| for i in range(await selector.locator("option").count()): | |
| option = await selector.locator("option").nth(i).inner_text() | |
| similarity = SequenceMatcher(None, option, value).ratio() | |
| if similarity > best_option[2]: | |
| best_option = [i, option, similarity] | |
| await selector.select_option(index=best_option[0], timeout=10000) | |
| return remove_extra_eol(best_option[1]).strip() | |
| def saveconfig(config, save_file): | |
| """ | |
| config is a dictionary. | |
| save_path: saving path include file name. | |
| """ | |
| if isinstance(save_file, str): | |
| save_file = Path(save_file) | |
| if isinstance(config, dict): | |
| with open(save_file, 'w') as f: | |
| config_without_key = config | |
| config_without_key["openai"]["api_key"] = "Your API key here" | |
| toml.dump(config_without_key, f) | |
| else: | |
| os.system(" ".join(["cp", str(config), str(save_file)])) | |