You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

238 lines
8.4KB

  1. import json
  2. import os
  3. import time
  4. from typing import List, Dict, Any
  5. from selenium import webdriver
  6. from selenium.common.exceptions import TimeoutException, NoSuchElementException
  7. from selenium.webdriver.support.ui import WebDriverWait
  8. from selenium.webdriver.support import expected_conditions as EC
  9. from selenium.webdriver.common.by import By
  10. from selenium.webdriver.common.keys import Keys
  11. from selenium.webdriver.firefox.webelement import FirefoxWebElement as Element
  12. selectors = {
  13. "button": "a.hu-input-menu_button",
  14. "text_box": ".hu-textarea",
  15. "bot_message": ".hu-message-text",
  16. "all_messages": ".hu-message-bubble"
  17. }
  18. def check_if_duplicates(list_of_elems: List[Any]) -> bool:
  19. ''' Check if given list contains any duplicates '''
  20. if len(list_of_elems) == len(set(list_of_elems)):
  21. return False
  22. else:
  23. return True
  24. class AnyEc:
  25. """ Use with WebDriverWait to combine expected_conditions
  26. in an OR.
  27. """
  28. def __init__(self, *args):
  29. self.ecs = args
  30. def __call__(self, driver):
  31. for function in self.ecs:
  32. try:
  33. if function(driver):
  34. return True
  35. except:
  36. pass
  37. return False
  38. # After every choice, and on second choice selection
  39. # Check all previous blocks
  40. # if blocks have repeated, stop choosing different choices and default to first for next choice
  41. Blocks = Dict[int, List[str]]
  42. Output = Dict[str, Blocks]
  43. class Scraper:
  44. def __init__(self, page_url: str):
  45. self.id = page_url.split("/")[-2]
  46. self.url = page_url
  47. self.timeout = 15 # Secs
  48. self.output = {"blocks": {}, "choices": {}}
  49. self.no_of_input_sections = 0
  50. self.loop_number = 0
  51. self.required_loops = 1
  52. def get(self):
  53. """being web scraping landbot.io page"""
  54. self.browser = webdriver.Firefox()
  55. self.loop_number += 1
  56. self.prev_messages = set(
  57. ) # This is used to store all previous bot messages, without duplicates
  58. self.browser.get(url)
  59. self.loop_through_page()
  60. all_messages = self.browser.find_elements_by_css_selector(
  61. selectors["all_messages"]
  62. )
  63. messages = self.process_messages(all_messages)
  64. self.output["blocks"] = self.define_blocks(messages)
  65. self.write_to_disk()
  66. self.browser.close()
  67. # If the amount of times we have looped eq the amount we need to loop, then end
  68. # If not, recursively run the function again
  69. if self.loop_number != self.required_loops:
  70. self.get()
  71. def write_to_disk(self):
  72. try:
  73. os.mkdir(f"./{self.id}")
  74. except FileExistsError:
  75. pass # Folder already exists
  76. with open(f"./{self.id}/{self.loop_number}.json", "w") as fp:
  77. fp.write(json.dumps(self.output, ensure_ascii=False))
  78. def decide_recursive_loop_amount(self, choices: List[str]):
  79. no_choices = len(choices)
  80. if no_choices > self.required_loops:
  81. self.required_loops = no_choices
  82. def loop_through_page(self) -> None:
  83. """ loops through waiting for an input and then doing it.
  84. Will exit when times out.
  85. """
  86. while True:
  87. page = self.browser.find_element_by_tag_name("body").text
  88. try:
  89. choices = self.wait_for_input() # Resolve any output needed
  90. # Add choices to output to show
  91. self.output["choices"][self.no_of_input_sections] = choices
  92. self.no_of_input_sections += 1
  93. # Check if choices is more than current loop amount value
  94. self.decide_recursive_loop_amount(choices)
  95. time.sleep(2)
  96. except TimeoutException:
  97. if page == self.browser.find_element_by_tag_name("body").text:
  98. # Page has not changed and should be timed out
  99. break
  100. def wait_for_input(self) -> List[str]:
  101. """
  102. This function waits for a input or choice to become available. Once it is,
  103. click or input text, and return what input it was to parent function"""
  104. wait = WebDriverWait(self.browser, self.timeout)
  105. wait.until(
  106. AnyEc(
  107. EC.element_to_be_clickable(
  108. (By.CSS_SELECTOR, selectors["button"])
  109. ),
  110. EC.visibility_of_element_located(
  111. (By.CSS_SELECTOR, selectors["text_box"])
  112. )
  113. )
  114. )
  115. try:
  116. text_box = self.browser.find_element_by_css_selector(
  117. selectors["text_box"]
  118. )
  119. text_box.send_keys("{INPUT_TEXT}")
  120. text_box.send_keys(Keys.ENTER)
  121. return ["TEXT_BOX"]
  122. except NoSuchElementException:
  123. buttons = self.browser.find_elements_by_css_selector(
  124. selectors["button"]
  125. )
  126. first_button = self.decide_button_to_click(buttons)
  127. choices = [
  128. x.find_element_by_tag_name("span").text for x in buttons
  129. ]
  130. time.sleep(
  131. 0.2
  132. ) # IF not sometimes there is some errors when the choices are deleted
  133. first_button.click()
  134. return choices
  135. def decide_button_to_click(self, buttons: List[Element]) -> Element:
  136. """
  137. Decides what button to click when presented with one.
  138. Checks if loop has occurred.
  139. """
  140. button_amount = len(buttons)
  141. bot_messages = self.browser.find_elements_by_css_selector(
  142. ".hu-background-color_bot-message-background"
  143. )
  144. prev_mesg = self.process_messages(bot_messages)
  145. if check_if_duplicates(prev_mesg):
  146. # If there is duplicates
  147. # Stop previous duplicates from triggering this
  148. # Removes most classes from previous bot messages
  149. # But keeps "hu-message-bubble" to for the end of the script
  150. self.browser.execute_script(
  151. 'list = document.getElementsByClassName("hu-message-bubble hu-position-relative hu-background-color_bot-message-background");for (var i = 0; i < list.length; i++){list[i].className="hu-message-bubble"}'
  152. )
  153. # Return first button to stop text loop
  154. return buttons[0]
  155. if button_amount == 1 or button_amount < self.loop_number:
  156. return buttons[0]
  157. else:
  158. return buttons[self.loop_number - 1]
  159. def define_blocks(self, messages: List[str]) -> Blocks:
  160. user_mesg = self.browser.find_elements_by_css_selector(
  161. ".hu-message-text.hu-color_user-message-text"
  162. )
  163. user_dialog = [x.text for x in user_mesg]
  164. blocks = {}
  165. start_of_block = 0
  166. block_number = 0
  167. for ind, line in enumerate(messages):
  168. if line in user_dialog:
  169. blocks[block_number] = messages[start_of_block:ind]
  170. user_dialog.pop(user_dialog.index(line))
  171. start_of_block = ind + 1
  172. block_number += 1
  173. if not user_dialog:
  174. blocks[block_number] = messages[start_of_block:]
  175. return blocks
  176. def process_messages(self, messages: List[Element]) -> List[str]:
  177. all_processed_dialog = []
  178. for message in messages:
  179. all_processed_dialog.append(self.parse_message_bubble(message))
  180. return all_processed_dialog
  181. def parse_message_bubble(self, message: Element) -> str:
  182. """"""
  183. try:
  184. text = message.find_element_by_css_selector(".hu-message-text")
  185. return text.text
  186. except NoSuchElementException:
  187. try:
  188. image_element = message.find_element_by_tag_name("img")
  189. image_src = image_element.get_attribute('src')
  190. caption = message.find_elements_by_tag_name("p")
  191. if caption:
  192. return "{}: {}".format(caption[0].text, image_src)
  193. return image_src
  194. except NoSuchElementException:
  195. try:
  196. youtube = message.find_element_by_tag_name("iframe")
  197. return youtube.get_attribute("src")
  198. except NoSuchElementException:
  199. print("Could not find text, image, or youtube video.")
  200. return ""
  201. with open("urls.json", "r") as fp:
  202. urls = json.loads(fp.read())
  203. for url in urls:
  204. scraper = Scraper(url)
  205. scraper.get()