Examples¶
dataguy_by_element.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Clicks hard-coded menu items by element and selector.
"""
from typing import List
import webtraversallibrary as wtl
from .util import parse_cli_args
@wtl.single_tab
def policy(_, view: wtl.View) -> List[wtl.Action]:
# Picking an action at random relating to one of these elements
elements = view.snapshot.elements
menu_elements = [e for e in elements if e.location.x < 10 and e.location.y < 200 and e.metadata["tag"] == "a"]
actions_a: wtl.Action = view.actions.by_element(menu_elements[0])
# Another way is doing it by selector - this one matches all the menu items (equivalent to the above)
actions_b: wtl.Action = view.actions.by_selector(wtl.Selector(css=".sidenav div a"))
# Click the first menu item and then, before snapshotting, the second
return [actions_a[0], actions_b[1]]
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.classifiers.add(wtl.ActiveElementFilter(action=wtl.actions.Click))
workflow.run()
workflow.quit()
|
dementor.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Remove an element, one at a time, until the page is empty.
"""
import random
from typing import List, Union
import webtraversallibrary as wtl
from .util import parse_cli_args
@wtl.single_tab
def policy(workflow: wtl.Workflow, view: wtl.View) -> Union[wtl.Action, List[wtl.Action]]:
# After seven deletions, start over from step 3
if workflow.loop_idx == 7:
return wtl.actions.Revert(3)
# Randomly pick one of the deleting actions
return [
random.choice(view.actions.by_type(wtl.actions.Remove)),
wtl.actions.Wait(0.25),
wtl.actions.Clear(viewport=False),
wtl.actions.WaitForUser(),
]
if __name__ == "__main__":
cli_args = parse_cli_args()
wf = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
wf.classifiers.add(
wtl.ElementClassifier(
name="dementor",
enabled=True,
highlight=False,
action=wtl.actions.Remove,
callback=lambda e, _: e, # Will label _all_ elements removable
)
)
wf.run()
wf.quit()
|
dynamic_windows.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Dynamically handles multiple tabs and windows.
Creates new windows and/or tabs every iteration.
"""
from random import choice
from typing import Dict
import webtraversallibrary as wtl
from .util import parse_cli_args
def policy(workflow: wtl.Workflow, view: wtl.View) -> Dict[wtl.View, wtl.Action]:
loop_idx: int = workflow.loop_idx + 1
window_idx: int = loop_idx // 3
# Every third tab, create a new window, otherwise use the latest window
if window_idx >= len(workflow.windows):
window: wtl.window = workflow.create_window(str(window_idx))
else:
window: wtl.window = workflow.windows[-1]
# Create a window to a new Wikipedia number link
window.create_tab(str(loop_idx), url=f"https://en.wikipedia.org/wiki/{loop_idx}")
# Click a random clickable element in a random view/tab
v = choice(list(view.values()))
return {v: choice(v.actions.by_type(wtl.actions.Click))}
if __name__ == "__main__":
cli_args = parse_cli_args()
wf = wtl.Workflow(
config=wtl.Config(cli_args.config), policy=policy, url="https://en.wikipedia.org/wiki/0", output=cli_args.output
)
wf.classifiers.add(wtl.ActiveElementFilter(action=wtl.actions.Click))
wf.run()
wf.quit()
|
fill_form.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Randomly fill all random text input fields on a page with a preset list of names.
"""
from random import choice
from typing import List
import webtraversallibrary as wtl
from webtraversallibrary.actions import Clear, FillText
from .util import parse_cli_args
@wtl.single_tab
def goal(_, view: wtl.View) -> bool:
# Find all elements we could fill in text on
targets: List[wtl.Action] = [action.target for action in view.actions.by_type(FillText)]
# Get all their collective contents
texts: str = [element.metadata["text"] for element in targets]
# Stop running if they have all been filled
return all(t != "" for t in texts)
content = ["Robin Carpenter", "Kenny Turner", "Ernestine Ferguson", "Marcelo Allen", "Marilyn Rich", "Rupert Strong"]
@wtl.single_tab
def policy(_, view: wtl.View) -> List[wtl.Action]:
# Filter out all the FillText actions where the element is still empty
fill_actions: List[wtl.Action] = [
action for action in view.actions.by_type(FillText) if not action.target.metadata["text"]
]
# Randomly pick an action and a text
action: wtl.Action = choice(fill_actions)
text: str = choice(content)
# Execute
return [Clear(), action(text)]
def text_field_classifier_func(elements: wtl.Elements, _) -> List[wtl.PageElement]:
# For now, we consider all input fields where the type attribute has a specific value.
return [e for e in elements if e.metadata["tag"] == "input" and e.metadata["type"] in ("text", "email", "password")]
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(
config=wtl.Config(cli_args.config),
policy=policy,
goal=goal,
url="https://www.getharvest.com/signup",
output=cli_args.output,
)
# We just need a text field classifier, no need to consider what's active (all of them should be).
workflow.classifiers.add(
wtl.ElementClassifier(name="textfield", action=FillText, callback=text_field_classifier_func, highlight=True)
)
workflow.run()
workflow.quit()
print("Workflow successful?", workflow.success)
# This is the last view, i.e. the one where goal() returned True
final_view: wtl.View = workflow.history[-1]
# Get all texts
final_texts: List[str] = [action.target.metadata["text"] for action in final_view.actions.by_type(FillText)]
print("Names entered: ", ", ".join(final_texts))
|
gui.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Basic example of using WTL together with a graphical frontend.
"""
# pylint: disable=global-statement
import tkinter as tk
from pathlib import Path
from threading import Lock, Thread
from time import sleep
from tkinter import font
import webtraversallibrary as wtl
from .util import parse_cli_args
# === THREAD SYNC ===
# Global variables used for communicating.
# Uses a single lock for everything.
# 0=Waiting for new view, 1=Update GUI image, 2=Waiting for new action
data_lock = Lock()
current_view = None
current_action = None
state = 0
# === WTL THREAD ===
# Running WTL in a separate thread (so GUI can run on the main thread).
# Very simple state machine that saves the current view and busy waits
# until an action has been set from the GUI.
@wtl.single_tab
def policy(_, view: wtl.View) -> wtl.Action:
global state, current_view
with data_lock:
current_view = view
state = 1
while True:
with data_lock:
if state == 0:
return current_action
sleep(0.5)
def wtl_thread(cli_args):
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.classifiers.add(wtl.ActiveElementFilter(action=wtl.actions.Click, highlight=True))
workflow.run()
workflow.quit()
# === GUI THREAD (MAIN) ===
#
show_actives = None
current_filename = ""
def gui_thread():
"""
Sets up the window with design and all callbacks.
"""
global show_actives
window = tk.Tk()
window.title("WTL browser")
window.geometry("1920x1080")
top_frame = tk.Frame()
canvas = tk.Canvas(top_frame, width=1280, height=1080)
canvas.pack(padx=10, pady=10, side=tk.LEFT)
img = None
show_actives = tk.IntVar()
ch = tk.Checkbutton(top_frame, text="Show active elements", variable=show_actives)
ch.pack(side=tk.LEFT)
small_font = font.Font(size=16)
listbox = tk.Listbox(top_frame, width=375, height=200, font=small_font)
listbox.pack(padx=5, pady=5, side=tk.LEFT)
label_frame = tk.Frame(window, width=1400, height=100, bg="white")
label_frame.pack_propagate(0)
desc_label = tk.Label(label_frame, text="Hello", justify=tk.LEFT, wraplength=1300, bg="white")
desc_label.pack()
label_frame.pack()
top_frame.pack()
def get_element(mouse): # pylint: disable=inconsistent-return-statements
"""
Look for the element at current coords with smallest bounds
"""
point = wtl.Point(mouse.x - 5, mouse.y - 5)
with data_lock:
if not current_view:
return
smallest_element, smallest_area = None, 999999
for e in current_view.snapshot.elements:
if point in e.bounds and e.bounds.area < smallest_area:
smallest_area, smallest_element = e.bounds.area, e
return smallest_element
def hover(mouse):
"""
Update the top label when hovering over an element.
"""
nonlocal desc_label
smallest_element = get_element(mouse)
with data_lock:
if smallest_element:
output = [f"{k}={str(v)}" for k, v in smallest_element.metadata.items() if k != "text"]
desc_label.config(text=", ".join(output))
else:
desc_label.config(text=str("{}"))
def double_clicked(mouse):
"""
Set action of the clicked element.
Does not check if it's active or not.
"""
global state, current_action
smallest_element = get_element(mouse)
with data_lock:
if smallest_element:
state = 0
current_action = wtl.actions.Click(wtl.Selector(f'[wtl-uid="{smallest_element.wtl_uid}"]'))
def selected(_):
"""
Sets action by clicking an element in the listbox.
"""
global state, current_action
nonlocal listbox
with data_lock:
data = str(listbox.get(listbox.curselection())).split(" (")[0]
state = 0
current_action = wtl.actions.Click(wtl.Selector(f'[wtl-uid="{data}"]'))
# Bind functions to the GUI objects
canvas.bind("<Motion>", hover)
canvas.bind("<Double-Button-1>", double_clicked)
listbox.bind("<<ListboxSelect>>", selected)
def upd_view():
"""
Checks state and updates the GUI with screenshot and list of actions.
"""
global state, current_view, show_actives, current_filename
nonlocal img, listbox
with data_lock:
if state == 1:
current_filename = None
state = 2
listbox.delete(0, tk.END)
for item in current_view.actions.by_type(wtl.actions.Click):
wtl_uid = str(item.target.wtl_uid)
text = item.target.metadata["text"]
listbox.insert(tk.END, wtl_uid + f" ({text})")
if state == 2:
filename = "first" if show_actives.get() == 0 else "is_active"
if filename != current_filename:
current_filename = filename
current_view.snapshot.screenshots[filename].save(Path("."))
img = tk.PhotoImage(file=f"{filename}.png")
canvas.create_image(5, 5, anchor=tk.NW, image=img)
window.after(250, upd_view)
window.after(1000, upd_view)
window.mainloop()
# === === === === ===
# Entry point: Setup WTL thread and run GUI on this (main) thread.
if __name__ == "__main__":
_cli_args = parse_cli_args()
_wtl_thread = Thread(target=wtl_thread, args=(_cli_args,))
_wtl_thread.start()
gui_thread()
_wtl_thread.join()
|
hard_coded.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Simple example showing a hard-coded generator of actions.
"""
import webtraversallibrary as wtl
from webtraversallibrary.actions import Clear, Click, Highlight
from .util import parse_cli_args, start_server
@wtl.single_tab_coroutine
def policy():
# Highlight some titles, and then click a menu item.
# Once the generator is exhausted, workflow will interpret StopIteration as cancelling the tabs.
yield
for i in range(1, 6):
yield [Clear(), Highlight(target=wtl.Selector(f"h2:nth-of-type({i}) > a"))]
yield Click(wtl.Selector("h2:nth-of-type(1) > a"))
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(
config=wtl.Config(cli_args.config), policy=policy, url=start_server() + "/blog", output=cli_args.output
)
workflow.run()
workflow.quit()
|
href_scorer.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Ranks all links on a page by the length of the HREF attribute.
Does nothing with them.
"""
import webtraversallibrary as wtl
from webtraversallibrary.actions import Wait
from .util import parse_cli_args
@wtl.single_tab
def policy(_, __):
# Wait so that the classifier isn't run over and over again
return Wait(10)
def url_length_classifier_func(elements, _):
# Score all elements with an href attribute with a score of the length of the href attribute
href_elements = [element for element in elements if element.metadata["href"]]
return [(element, len(element.metadata["href"])) for element in href_elements]
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.classifiers.add(
wtl.ElementClassifier(
name="url_length",
highlight=True,
mode=wtl.ScalingMode.LINEAR,
highlight_color=wtl.Color(0, 0, 255),
callback=url_length_classifier_func,
)
)
workflow.run()
workflow.quit()
|
interactive.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Simple example of an interactive "game", as an old-style text RPG, for web traversal.
"""
import logging
import random
from typing import List
import webtraversallibrary as wtl
from .util import parse_cli_args
initials = [
"You see a house. Above the door hangs a sign: ",
"You are in an open field. You find a note that says: ",
"You are inside an old cottage. On the wall you see something written: ",
"You are standing in the wtl office. It is loud. One of the screens say: ",
"Rain is pouring through the broken roof. The rain patters a sound. You make out: ",
]
textfield_desc = [
"A person appears and asks you for a name. He hands you a paper and pen to write with.",
"An empty notepad really wants something to be written.",
"You see a dusty whiteboard with a pen that still works.",
"A piece of paper is asking you what is on your mind. You have a pen in your hand.",
"A parchment and quill lie before you.",
]
vague_desc = [
"You can see people in the distance.",
"Marketing people are ringing a bell.",
"Everything around you looks really clean.",
"There are multiple paths forward.",
"You see shadows lurking about far away. They look friendly.",
]
content_desc = [
"A bleached old parchment says: ",
"A pretty little note by your feet says: ",
"You find an old metal bracelet with an inscription: ",
"You are standing next to an old radio. It repeats over and over again: ",
"A whisper is carried by the wind. It says: ",
]
@wtl.single_tab_coroutine
def policy():
print("\n === === === \n")
_, view = yield
initial = random.choice(initials)
spoken = False
while True:
if not spoken:
title = view.snapshot.page_metadata["title"]
print(f'{initial}"{title}"')
spoken = True
cmd = input("\n> ").strip().lower().split(" ")
action = None
if cmd[0] == "help":
print("\nAvailable commands:\nhelp: shows this message")
print("navigate")
print("jump")
print("look")
print("click")
print("move")
print("right")
continue
if cmd[0] == "navigate" or cmd[0] == "jump":
action = wtl.actions.Navigate(cmd[1])
if cmd[0] == "look":
buttons = view.actions.by_type(wtl.actions.Click)
textfields = [v for v in view.actions.by_type(wtl.actions.FillText) if v.target.metadata["text"] == ""]
texts = (
view.snapshot.elements.by_selector(wtl.Selector("h1"))
+ view.snapshot.elements.by_selector(wtl.Selector("h2"))
+ view.snapshot.elements.by_selector(wtl.Selector("h3"))
+ view.snapshot.elements.by_selector(wtl.Selector("p"))
)
if textfields:
print(random.choice(textfield_desc))
elif buttons:
print(random.choice(vague_desc))
if texts:
print(random.choice(content_desc) + '"' + random.choice(texts).metadata["text"] + '"')
continue
if cmd[0] == "click":
text = " ".join(cmd[1:])
elements = view.snapshot.elements.by_selector(wtl.Selector(f'[value~="{text}"]'))
if not elements:
elements = [e for e in view.snapshot.elements if text in e.metadata["text"]]
if not elements:
elements = view.snapshot.elements.by_selector(wtl.Selector(text))
if elements:
action = wtl.actions.Click(random.choice(elements))
if cmd[0] == "move":
action = random.choice(view.actions.by_type(wtl.actions.Click))
initial = random.choice(initials)
if cmd[0] == "write":
textfields = [v for v in view.actions.by_type(wtl.actions.FillText) if v.target.metadata["text"] == ""]
action = random.choice(textfields)(" ".join(cmd[1:]))
if not action:
print("I do not understand.")
continue
spoken = False
_, view = yield action
def text_field_classifier_func(elements: wtl.Elements, _) -> List[wtl.PageElement]:
return [e for e in elements if e.metadata["tag"] == "input" and e.metadata["type"] in ("text", "email", "password")]
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.classifiers.add(wtl.ActiveElementFilter(action=wtl.actions.Click))
workflow.classifiers.add(
wtl.ElementClassifier(
name="textfield", action=wtl.actions.FillText, callback=text_field_classifier_func, highlight=True
)
)
logging.getLogger("wtl").setLevel(logging.CRITICAL)
workflow.run()
workflow.quit()
|
largest_image.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Ranks all images on a page by their geometric size.
Clicks on the largest, then checks if the URL changed, then stops.
"""
from typing import Optional
import webtraversallibrary as wtl
from webtraversallibrary.actions import Click
from webtraversallibrary.goals import N_STEPS
from .util import parse_cli_args
goal = N_STEPS(2)
@wtl.single_tab
def policy(workflow: wtl.Workflow, view: wtl.View) -> Optional[wtl.Action]:
if len(workflow.history) == 1:
images_by_size = sorted(
view.snapshot.elements.by_score("image"), key=lambda element: element.bounds.area, reverse=True
)
return Click(images_by_size[0])
print("\n", view.snapshot.page_metadata["url"] != workflow.history[0].snapshot.page_metadata["url"], "\n")
return None
def image_classifier_func(elements, _):
return [elem for elem in elements if elem.metadata["tag"] == "img"]
if __name__ == "__main__":
cli_args = parse_cli_args()
wf = wtl.Workflow(
config=wtl.Config(cli_args.config), policy=policy, goal=goal, url=cli_args.url, output=cli_args.output
)
wf.classifiers.add(wtl.ElementClassifier(name="image", highlight=True, callback=image_classifier_func))
wf.run()
wf.quit()
|
monkeypatch.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Click any active element, but all links lead to Rome, literally, using workflow.monkeypatches.
"""
from random import choice
import webtraversallibrary as wtl
from webtraversallibrary.actions import Click
from .util import parse_cli_args
@wtl.single_tab
def policy(_, view: wtl.View) -> wtl.Action:
menu_actions = view.actions.by_type(Click).by_score("is_active")
return choice(menu_actions)
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.monkeypatches.set_default("https://en.wikipedia.org/wiki/Rome")
workflow.classifiers.add(wtl.ActiveElementFilter(action=Click))
workflow.run()
workflow.quit()
|
multiples.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Creates a total of three tabs in two windows, and clicks randomly on all of them.
"""
from random import choice
from typing import Dict
import webtraversallibrary as wtl
from .util import parse_cli_args
def policy(_, view: wtl.View) -> Dict[wtl.View, wtl.Action]:
return {v: choice(v.actions.by_type(wtl.actions.Click)) for v in view.values()}
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(
config=wtl.Config(cli_args.config),
policy=policy,
url={
"first": {"A": "www.uppsalahandkraft.se", "B": "https://www.uppsalamodemassa.se"},
"second": {"C": "shop.biskopsgarden.com"},
},
output=cli_args.output,
)
workflow.classifiers.add(wtl.ActiveElementFilter(action=wtl.actions.Click))
workflow.run()
workflow.quit()
|
random_dataguy_memory.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Defines a subset of all active elements (menu items) and clicks randomly on those exactly once.
When they have all been clicked, abort the workflow.
"""
from random import choice
from typing import List
import webtraversallibrary as wtl
from webtraversallibrary.actions import Abort, Click
from .util import parse_cli_args
@wtl.single_tab
def policy(workflow: wtl.Workflow, view: wtl.View) -> wtl.Action:
if "previous" not in view.metadata:
view.metadata["previous"] = []
else:
workflow.js.annotate(
wtl.Point(100, 100), wtl.Color(0, 0, 0), 30, "This is an annotation", wtl.Color(128, 128, 128, 128)
)
if workflow.config.debug.screenshots:
# Create screenshot of previous actions with an emphasis on the latest
scr = view.snapshot.new_screenshot("history", of="full")
for prev in view.metadata["previous"]:
scr.highlight(prev.bounds, color=wtl.Color(255, 0, 0, 100))
scr.highlight(
view.metadata["previous_action"][0].target.bounds, text="Latest action", color=wtl.Color(0, 0, 255, 100)
)
scr.save(workflow.output_path)
# Save screenshot of the current live view
workflow.scraper.capture_screenshot("live").save(workflow.output_path)
# Get all elements tagged as "menu"
menu_elements = view.snapshot.elements.by_score("menu")
# Filter out those we have already clicked on
menu_elements = [
e for e in menu_elements if e.metadata["text"] not in [e.metadata["text"] for e in view.metadata["previous"]]
]
if menu_elements:
# If there are any left, click that and remember its text
element = choice(menu_elements)
action = Click(element)
view.metadata["previous"].append(element)
else:
# Otherwise, stop everything
action = Abort()
# Return
print("Here are the buttons I've clicked: ", [e.metadata["text"] for e in view.metadata["previous"]])
print("Last time I did", view.metadata["previous_action"][0])
return action
def menu_classifier_func(elements: wtl.Elements, _) -> List[wtl.PageElement]:
# The condition here is completely hard-coded for the given page.
return [elem for elem in elements if elem.location.x < 10 and elem.location.y < 200 and elem.metadata["tag"] == "a"]
if __name__ == "__main__":
cli_args = parse_cli_args()
wf = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
wf.classifiers.add(wtl.ActiveElementFilter(action=Click))
wf.classifiers.add(
wtl.ElementClassifier(
name="menu",
action=Click,
subset="is_active", # Consider only active elements
highlight=True,
callback=menu_classifier_func,
)
)
wf.run()
wf.quit()
|
random_dataguy.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Defines a subset of all active elements (menu items) and clicks randomly on those.
"""
from random import choice
from typing import List
import webtraversallibrary as wtl
from webtraversallibrary.actions import Click
from .util import parse_cli_args
@wtl.single_tab
def policy(_, view: wtl.View) -> wtl.Action:
menu_actions = view.actions.by_type(Click).by_score("menu")
return choice(menu_actions)
def menu_classifier_func(elements: wtl.Elements, _) -> List[wtl.PageElement]:
# The condition here is completely hard-coded for the given page.
return [elem for elem in elements if elem.location.x < 10 and elem.location.y < 200 and elem.metadata["tag"] == "a"]
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.classifiers.add(wtl.ActiveElementFilter(action=Click))
workflow.classifiers.add(
wtl.ElementClassifier(
name="menu",
action=Click,
subset="is_active", # Consider only active elements
highlight=True,
callback=menu_classifier_func,
)
)
workflow.run()
workflow.quit()
|
random_traversal.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Click on any random clickable element on a page.
Also demonstrates the use of postload_callbacks.
"""
from functools import partial
from random import choice, random
import webtraversallibrary as wtl
from webtraversallibrary.actions import Click, Refresh
from .util import parse_cli_args
@wtl.single_tab
def policy(workflow: wtl.Workflow, view: wtl.View) -> wtl.Action:
assert workflow.duplicate_loop_idx == workflow.loop_idx
# With some small probabilty, refresh instead of clicking.
return choice(view.actions.by_type(Click)) if random() < 0.95 else view.actions.by_type(Refresh)[0]
def set_duplicate_loop_idx(workflow: wtl.Workflow):
workflow.duplicate_loop_idx = workflow.loop_idx
if __name__ == "__main__":
cli_args = parse_cli_args()
wf = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
wf.classifiers.add(wtl.ActiveElementFilter(action=Click))
wf.postload_callbacks.append(partial(set_duplicate_loop_idx, wf))
wf.run()
wf.quit()
|
size_scorer.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Classifies active elements with a "size score" and then clicks some element with average size.
"""
from random import choice
from typing import Dict
import webtraversallibrary as wtl
from webtraversallibrary.actions import Click
from .util import parse_cli_args
@wtl.single_tab
def policy(_, view: wtl.View) -> wtl.Action:
return choice(view.actions.by_type(Click).by_score("size__average"))
def size_classifier_func(elements: wtl.Elements, _) -> Dict[str, float]:
# Computes a normalized size.
# Note that this is not the simplest way of clicking the largest clickable element.
largest_area = max(e.bounds.area for e in elements)
def score(element):
return element.bounds.area / largest_area
return {
"big": [(e, score(e)) for e in elements if score(e) > 0.75],
"average": [(e, abs(0.5 - score(e))) for e in elements if 0.25 < score(e) <= 0.75],
}
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.classifiers.add(wtl.ActiveElementFilter())
workflow.classifiers.add(
wtl.ElementClassifier(
name="size", subset="is_active", highlight=0.5, action=Click, callback=size_classifier_func
)
)
with workflow:
workflow.run()
|
tictactoe.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Plays TicTacToe online by using a simple AI implementation.
"""
from typing import List
import webtraversallibrary as wtl
from webtraversallibrary.actions import Click
from .util import parse_cli_args
def checkWin(board, player) -> bool:
for i in range(0, 3):
if board[i * 3] == player and board[i * 3 + 1] == player and board[i * 3 + 2] == player:
return True
if board[i] == player and board[i + 3] == player and board[i + 6] == player:
return True
if board[0] == player and board[4] == player and board[8] == player:
return True
if board[2] == player and board[4] == player and board[6] == player:
return True
return False
def getAIMove(board, nextMove, aiPlayer):
if checkWin(board, aiPlayer):
return (-1, 10)
if checkWin(board, "O" if aiPlayer == "X" else "X"):
return (-1, -10)
free = [i for i, b in enumerate(board) if b == " "]
if not free:
return (-1, 0)
if len(free) == len(board):
return (4, 0)
moves = []
for i in free:
nextBoard = board[:]
nextBoard[i] = nextMove
score = getAIMove(nextBoard, ("X" if nextMove == "O" else "O"), aiPlayer)[1]
moves.append((i, score))
moves.sort(key=lambda m: m[1], reverse=nextMove == aiPlayer)
return moves[0]
def printBoard(board):
print("\n")
for i in range(9):
if not board[i] == " ":
print(board[i], end=" ")
else:
print("_", end=" ")
if i in (2, 5):
print("")
print("\n")
@wtl.single_tab
def policy(_, view: wtl.View) -> List[wtl.Action]:
start = view.actions.by_score("start")
if start:
return [start[0]]
tiles = [t.target for t in view.actions.by_score("tile")]
board = [t.metadata["class"][5].upper() if t.metadata["class"] else " " for t in tiles]
move = getAIMove(board, "X", "X")
printBoard(board)
return [wtl.actions.Clear(viewport=False), wtl.actions.Click(tiles[move[0]])]
def _start_btn(elements, _):
return [e for e in elements if e.metadata["id"] == "sync-task-cover" and "block" in e.metadata["display"]]
def _tile_div(elements, _):
return [
e
for e in elements
if e.metadata["tag"] == "span" and e.metadata["id"].startswith("ttt") and e.tag.parent.name == "div"
]
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(
config=wtl.Config(cli_args.config),
policy=policy,
url="https://stanfordnlp.github.io/miniwob-plusplus/html/miniwob/tic-tac-toe.html",
output=cli_args.output,
)
workflow.classifiers.add(wtl.ActiveElementFilter())
workflow.classifiers.add(wtl.ActiveElementFilter(name="start", callback=_start_btn, action=Click))
workflow.classifiers.add(wtl.ActiveElementFilter(name="tile", callback=_tile_div, action=Click))
workflow.run()
workflow.quit()
|
util.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Not an example, just helper functions for the other examples.
"""
import os
import subprocess
from argparse import ArgumentParser
from pathlib import Path
import webtraversallibrary as wtl
def start_server() -> str:
my_env = os.environ.copy()
my_env["FLASK_APP"] = "tests/site/flask_app.py"
subprocess.Popen("python3 -m flask run".split(), env=my_env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return "http://localhost:5000"
def parse_cli_args() -> ArgumentParser:
"""
Parses CLI flags relevant for examples.
"""
parser = ArgumentParser()
group = parser.add_argument_group("Run parameters")
group.add_argument("--url", type=str, default="DEFAULT", help="URL to run the workflow on.")
group.add_argument(
"--output",
type=Path,
help="Where to save the result locally. If save, remember to also add save flag for config.",
default=None,
)
group.add_argument(
"--windows",
type=str,
nargs="*",
default=[wtl.Workflow.SINGLE_TAB],
help="Tab names (comma-separated). Use space separation for multiple windows.",
)
group.add_argument(
"--config",
type=str,
nargs="*",
default=[],
required=False,
help="Names of config files in config/, such as " '"iphone_x_mobile", or key=value pairs.',
)
cli_args = parser.parse_args()
cli_args.config.insert(0, "default")
if cli_args.url == "DEFAULT":
cli_args.url = start_server()
return cli_args
|
view_classifier.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Clicks randomly on a page until _dataguy_ is not part of the URL anymore, using a ViewClassifier.
"""
from random import choice
from typing import List, Set
import webtraversallibrary as wtl
from webtraversallibrary.actions import Annotate, Clear, Click
from .util import parse_cli_args
@wtl.single_tab
def goal(_, view):
# Stop when we dataguy is not part of the URL
return "dataguy" not in view.tags
@wtl.single_tab
def policy(_, view: wtl.View) -> List[wtl.Action]:
# Do any random click
return [
Clear(),
Annotate(
location=wtl.Point(30, 30),
color=wtl.Color(0, 0, 0),
size=20,
text="Still dataguy",
background=wtl.Color(128, 50, 128),
),
choice(view.actions.by_type(Click)),
]
def dataguy_classifier_func(view: wtl.View) -> Set[str]:
# This will assign "dataguy" to a view if the URL contains that, otherwise "other"
# It can be retreived with view.tags
return {"dataguy" if "dataguy" in view.snapshot.page_metadata["url"] else "other"}
if __name__ == "__main__":
cli_args = parse_cli_args()
workflow = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
workflow.classifiers.add(wtl.ActiveElementFilter(action=Click))
# The syntax for a ViewClassifier is similar, but simpler
workflow.classifiers.add(wtl.ViewClassifier(name="dataguy", callback=dataguy_classifier_func))
workflow.run()
workflow.quit()
|
wiki_game.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Click on any random clickable element on a page.
Also demonstrates the use of postload_callbacks.
"""
from random import choice
import webtraversallibrary as wtl
from webtraversallibrary.actions import Abort, Click
from .util import parse_cli_args
@wtl.single_tab_coroutine
def policy():
workflow, view = yield
# Store Page A's URL
page_a_url = workflow.current_window.driver.current_url
workflow, view = yield Click(
choice(view.snapshot.elements.by_subtree(wtl.Selector("div[id='bodyContent']")).by_selector(wtl.Selector("a")))
)
# Store Page B's URL
page_b_url = workflow.current_window.driver.current_url
description = ""
# Stores first paragraph from page B's body
try:
description = view.snapshot.elements.by_selector(wtl.Selector("div p:nth-of-type(1)"))[0].metadata["text"]
if description.empty:
raise IndexError()
except IndexError:
description = view.snapshot.elements.by_selector(wtl.Selector("div p:nth-of-type(2)"))[0].metadata["text"]
# Limit the description to 50 characters to improve search
description_subset = str(description[0:49])
# Navigate back to page A
workflow, view = yield wtl.actions.Navigate(page_a_url)
link_to_click = view.snapshot.elements.by_selector(wtl.Selector("input[type='submit']"))
# In the search bar in page A, fill text with description_subset and
# click search to get search results for the descriptions
workflow, view = yield [
wtl.actions.FillText(wtl.Selector("input[type='search']"), str(description_subset)),
Click(link_to_click[0]),
]
# Store search result's URL
search_url = workflow.current_window.driver.current_url
search_results = view.snapshot.elements.by_selector(wtl.Selector("div[class=mw-search-result-heading] a"))
i = 0
# Go to first link in the search result
try:
workflow, view = yield Click(search_results[i])
except IndexError:
print("Empty search results!!")
yield Abort()
# Check if landing URL equals PAGE B URL, if yes, break, else iterate and go to next link in the search result
# untill the URL's match
while True:
if workflow.current_window.driver.current_url == page_b_url:
print("Woohoo!!!")
break
try:
workflow, view = yield [wtl.actions.Navigate(search_url), Click(search_results[i + 1])]
i += 1
except IndexError:
print("Search result exhausted!!")
break
yield None
if __name__ == "__main__":
cli_args = parse_cli_args()
wf = wtl.Workflow(
config=wtl.Config(cli_args.config),
policy=policy,
url="https://en.wikipedia.org/wiki/Special:Random",
output=cli_args.output,
)
wf.classifiers.add(wtl.ActiveElementFilter(action=Click))
wf.classifiers.add(wtl.ElementClassifier(name="textfield", action=wtl.actions.FillText, highlight=True))
wf.run()
wf.quit()
|
without_workflow.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Goes to a random wikipage, finds 'n' most common words in the article, where 'n' being the number of
most common words that the user wants.
"""
import collections
from collections import defaultdict
import webtraversallibrary as wtl
from .util import parse_cli_args
cli_args = parse_cli_args()
config = wtl.Config(cli_args.config)
window = wtl.Window(config)
# Navigates to a Wiki article chosen at random
window.scraper.navigate("https://en.wikipedia.org/wiki/Special:Random")
# Takes a snapshot of the current page
snapshot = window.scraper.scrape_current_page()
# Fetches all the elements with links in the current page
links = snapshot.elements.by_selector(wtl.Selector("a")) # pylint: disable=no-member
print("Number of links in the article ", len(links))
# Gets the current URL of the page
search_url = window.driver.current_url
print("Link to the wiki article : ", search_url)
paragraphs = snapshot.elements.by_selector(wtl.Selector("div p")) # pylint: disable=no-member
article = ""
# Fetch stopwords from a local file containing an array of stopwords
with open("examples/stopwords.txt") as f:
stopwords = f.read()
for p in paragraphs:
article = article + " " + p.metadata["text"]
# Find n most common words in the article, where n being the number of common words required by the user
wordcount = defaultdict(int)
for word in article.lower().split():
if word not in stopwords:
wordcount[word] += 1
n_print = int(input("How many most common words to print: "))
print("\nOK. The {} most common words are as follows\n".format(n_print))
word_counter = collections.Counter(wordcount)
for word, count in word_counter.most_common(n_print):
print(word, ": ", count)
# Close the browser
window.quit()
|
zindex.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Compute the zIndex of all active elements and highlights the top 1% elements.
"""
import logging
from typing import List, Tuple
import webtraversallibrary as wtl
from webtraversallibrary.logging import setup_logging
from .util import parse_cli_args
@wtl.single_tab
def policy(_, __) -> wtl.Action:
return wtl.actions.WaitForUser()
# https://stackoverflow.com/questions/1388007/getting-the-z-index-of-a-div-in-javascript
Z_INDEX_JS = """
window.getZIndex = function (e) {
if (e === null) {
return 0;
}
let z = window.document.defaultView.getComputedStyle(e).getPropertyValue('z-index');
if (isNaN(z)) {
return window.getZIndex(e.parentElement);
}
return z;
};
console.log("Hello!");
let element = document.querySelector(arguments[0]);
if (element !== null) {
return window.getZIndex(element);
}
"""
def _compute_z_index(elements: wtl.Elements, workflow: wtl.Workflow) -> List[Tuple[wtl.PageElement, float]]:
# This may be slow for many elements. If you need more performance, consider a JS script
# which computes all elements' z-values combined and returns the map directly.
result = []
for e in elements:
zIndex = workflow.js.execute_script(Z_INDEX_JS, e.selector.css) or 0
result.append((e, int(zIndex)))
return result
if __name__ == "__main__":
cli_args = parse_cli_args()
wf = wtl.Workflow(config=wtl.Config(cli_args.config), policy=policy, url=cli_args.url, output=cli_args.output)
wf.classifiers.add(wtl.ActiveElementFilter(action=wtl.actions.Click))
wf.classifiers.add(
wtl.ElementClassifier(
name="zIndex",
subset="is_active",
enabled=True,
highlight=0.99,
mode=wtl.ScalingMode.LINEAR,
callback=_compute_z_index,
)
)
setup_logging(logging_level=logging.DEBUG)
wf.run()
wf.quit()
|