mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-03 19:49:17 +02:00
refactor a mega function for readability and make sure to increment r… (#4542)
* refactor a mega function for readability and make sure to increment retry_count on exception so that we don't infinitely loop * improve session and page level context handling * don't use pydantic for the session context * we don't need retry success * move playwright handling into the session context * need to break on ok * return doc from scrape * fix comment --------- Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
parent
6df1c6c72f
commit
04ebde7838
@ -43,6 +43,48 @@ from shared_configs.configs import MULTI_TENANT
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class ScrapeSessionContext:
|
||||
"""Session level context for scraping"""
|
||||
|
||||
def __init__(self, base_url: str, to_visit: list[str]):
|
||||
self.base_url = base_url
|
||||
self.to_visit = to_visit
|
||||
|
||||
def initialize(self) -> None:
|
||||
self.stop()
|
||||
self.playwright, self.playwright_context = start_playwright()
|
||||
|
||||
def stop(self) -> None:
|
||||
if self.playwright_context:
|
||||
self.playwright_context.close()
|
||||
self.playwright_context = None
|
||||
|
||||
if self.playwright:
|
||||
self.playwright.stop()
|
||||
self.playwright = None
|
||||
|
||||
base_url: str
|
||||
to_visit: list[str]
|
||||
playwright: Playwright | None = None
|
||||
playwright_context: BrowserContext | None = None
|
||||
|
||||
visited_links: set[str] = set()
|
||||
content_hashes: set[int] = set()
|
||||
|
||||
doc_batch: list[Document] = []
|
||||
|
||||
at_least_one_doc: bool = False
|
||||
last_error: str | None = None
|
||||
|
||||
needs_retry: bool = False
|
||||
|
||||
|
||||
class ScrapeResult:
|
||||
doc: Document | None = None
|
||||
retry: bool = False
|
||||
|
||||
|
||||
WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS = 20
|
||||
# Threshold for determining when to replace vs append iframe content
|
||||
IFRAME_TEXT_LENGTH_THRESHOLD = 700
|
||||
@ -394,6 +436,8 @@ def _handle_cookies(context: BrowserContext, url: str) -> None:
|
||||
|
||||
|
||||
class WebConnector(LoadConnector):
|
||||
MAX_RETRIES = 3
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str, # Can't change this without disrupting existing users
|
||||
@ -444,261 +488,244 @@ class WebConnector(LoadConnector):
|
||||
logger.warning("Unexpected credentials provided for Web Connector")
|
||||
return None
|
||||
|
||||
def _do_scrape(
|
||||
self,
|
||||
index: int,
|
||||
initial_url: str,
|
||||
session_ctx: ScrapeSessionContext,
|
||||
) -> ScrapeResult:
|
||||
"""Returns a ScrapeResult object with a doc and retry flag."""
|
||||
|
||||
if session_ctx.playwright is None:
|
||||
raise RuntimeError("scrape_context.playwright is None")
|
||||
|
||||
if session_ctx.playwright_context is None:
|
||||
raise RuntimeError("scrape_context.playwright_context is None")
|
||||
|
||||
result = ScrapeResult()
|
||||
|
||||
# Handle cookies for the URL
|
||||
_handle_cookies(session_ctx.playwright_context, initial_url)
|
||||
|
||||
# First do a HEAD request to check content type without downloading the entire content
|
||||
head_response = requests.head(
|
||||
initial_url, headers=DEFAULT_HEADERS, allow_redirects=True
|
||||
)
|
||||
is_pdf = is_pdf_content(head_response)
|
||||
|
||||
if is_pdf or initial_url.lower().endswith(".pdf"):
|
||||
# PDF files are not checked for links
|
||||
response = requests.get(initial_url, headers=DEFAULT_HEADERS)
|
||||
page_text, metadata, images = read_pdf_file(
|
||||
file=io.BytesIO(response.content)
|
||||
)
|
||||
last_modified = response.headers.get("Last-Modified")
|
||||
|
||||
result.doc = Document(
|
||||
id=initial_url,
|
||||
sections=[TextSection(link=initial_url, text=page_text)],
|
||||
source=DocumentSource.WEB,
|
||||
semantic_identifier=initial_url.split("/")[-1],
|
||||
metadata=metadata,
|
||||
doc_updated_at=(
|
||||
_get_datetime_from_last_modified_header(last_modified)
|
||||
if last_modified
|
||||
else None
|
||||
),
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
page = session_ctx.playwright_context.new_page()
|
||||
try:
|
||||
if self.add_randomness:
|
||||
# Add random mouse movements and scrolling to mimic human behavior
|
||||
page.mouse.move(random.randint(100, 700), random.randint(100, 500))
|
||||
|
||||
# Can't use wait_until="networkidle" because it interferes with the scrolling behavior
|
||||
page_response = page.goto(
|
||||
initial_url,
|
||||
timeout=30000, # 30 seconds
|
||||
wait_until="domcontentloaded", # Wait for DOM to be ready
|
||||
)
|
||||
|
||||
# Add a small random delay to mimic human behavior
|
||||
time.sleep(random.uniform(0.5, 2.0))
|
||||
|
||||
last_modified = (
|
||||
page_response.header_value("Last-Modified") if page_response else None
|
||||
)
|
||||
final_url = page.url
|
||||
if final_url != initial_url:
|
||||
protected_url_check(final_url)
|
||||
initial_url = final_url
|
||||
if initial_url in session_ctx.visited_links:
|
||||
logger.info(
|
||||
f"{index}: {initial_url} redirected to {final_url} - already indexed"
|
||||
)
|
||||
page.close()
|
||||
return result
|
||||
|
||||
logger.info(f"{index}: {initial_url} redirected to {final_url}")
|
||||
session_ctx.visited_links.add(initial_url)
|
||||
|
||||
# If we got here, the request was successful
|
||||
if self.scroll_before_scraping:
|
||||
scroll_attempts = 0
|
||||
previous_height = page.evaluate("document.body.scrollHeight")
|
||||
while scroll_attempts < WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS:
|
||||
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
||||
page.wait_for_load_state("networkidle", timeout=30000)
|
||||
new_height = page.evaluate("document.body.scrollHeight")
|
||||
if new_height == previous_height:
|
||||
break # Stop scrolling when no more content is loaded
|
||||
previous_height = new_height
|
||||
scroll_attempts += 1
|
||||
|
||||
content = page.content()
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
|
||||
if self.recursive:
|
||||
internal_links = get_internal_links(
|
||||
session_ctx.base_url, initial_url, soup
|
||||
)
|
||||
for link in internal_links:
|
||||
if link not in session_ctx.visited_links:
|
||||
session_ctx.to_visit.append(link)
|
||||
|
||||
if page_response and str(page_response.status)[0] in ("4", "5"):
|
||||
session_ctx.last_error = f"Skipped indexing {initial_url} due to HTTP {page_response.status} response"
|
||||
logger.info(session_ctx.last_error)
|
||||
result.retry = True
|
||||
return result
|
||||
|
||||
# after this point, we don't need the caller to retry
|
||||
parsed_html = web_html_cleanup(soup, self.mintlify_cleanup)
|
||||
|
||||
"""For websites containing iframes that need to be scraped,
|
||||
the code below can extract text from within these iframes.
|
||||
"""
|
||||
logger.debug(
|
||||
f"{index}: Length of cleaned text {len(parsed_html.cleaned_text)}"
|
||||
)
|
||||
if JAVASCRIPT_DISABLED_MESSAGE in parsed_html.cleaned_text:
|
||||
iframe_count = page.frame_locator("iframe").locator("html").count()
|
||||
if iframe_count > 0:
|
||||
iframe_texts = (
|
||||
page.frame_locator("iframe").locator("html").all_inner_texts()
|
||||
)
|
||||
document_text = "\n".join(iframe_texts)
|
||||
""" 700 is the threshold value for the length of the text extracted
|
||||
from the iframe based on the issue faced """
|
||||
if len(parsed_html.cleaned_text) < IFRAME_TEXT_LENGTH_THRESHOLD:
|
||||
parsed_html.cleaned_text = document_text
|
||||
else:
|
||||
parsed_html.cleaned_text += "\n" + document_text
|
||||
|
||||
# Sometimes pages with #! will serve duplicate content
|
||||
# There are also just other ways this can happen
|
||||
hashed_text = hash((parsed_html.title, parsed_html.cleaned_text))
|
||||
if hashed_text in session_ctx.content_hashes:
|
||||
logger.info(
|
||||
f"{index}: Skipping duplicate title + content for {initial_url}"
|
||||
)
|
||||
return result
|
||||
|
||||
session_ctx.content_hashes.add(hashed_text)
|
||||
|
||||
result.doc = Document(
|
||||
id=initial_url,
|
||||
sections=[TextSection(link=initial_url, text=parsed_html.cleaned_text)],
|
||||
source=DocumentSource.WEB,
|
||||
semantic_identifier=parsed_html.title or initial_url,
|
||||
metadata={},
|
||||
doc_updated_at=(
|
||||
_get_datetime_from_last_modified_header(last_modified)
|
||||
if last_modified
|
||||
else None
|
||||
),
|
||||
)
|
||||
finally:
|
||||
page.close()
|
||||
|
||||
return result
|
||||
|
||||
def load_from_state(self) -> GenerateDocumentsOutput:
|
||||
"""Traverses through all pages found on the website
|
||||
and converts them into documents"""
|
||||
visited_links: set[str] = set()
|
||||
to_visit: list[str] = self.to_visit_list
|
||||
content_hashes = set()
|
||||
|
||||
if not to_visit:
|
||||
if not self.to_visit_list:
|
||||
raise ValueError("No URLs to visit")
|
||||
|
||||
base_url = to_visit[0] # For the recursive case
|
||||
doc_batch: list[Document] = []
|
||||
base_url = self.to_visit_list[0] # For the recursive case
|
||||
check_internet_connection(base_url) # make sure we can connect to the base url
|
||||
|
||||
# make sure we can connect to the base url
|
||||
check_internet_connection(base_url)
|
||||
session_ctx = ScrapeSessionContext(base_url, self.to_visit_list)
|
||||
session_ctx.initialize()
|
||||
|
||||
# Needed to report error
|
||||
at_least_one_doc = False
|
||||
last_error = None
|
||||
|
||||
playwright, context = start_playwright()
|
||||
restart_playwright = False
|
||||
while to_visit:
|
||||
initial_url = to_visit.pop()
|
||||
if initial_url in visited_links:
|
||||
while session_ctx.to_visit:
|
||||
initial_url = session_ctx.to_visit.pop()
|
||||
if initial_url in session_ctx.visited_links:
|
||||
continue
|
||||
visited_links.add(initial_url)
|
||||
session_ctx.visited_links.add(initial_url)
|
||||
|
||||
try:
|
||||
protected_url_check(initial_url)
|
||||
except Exception as e:
|
||||
last_error = f"Invalid URL {initial_url} due to {e}"
|
||||
logger.warning(last_error)
|
||||
session_ctx.last_error = f"Invalid URL {initial_url} due to {e}"
|
||||
logger.warning(session_ctx.last_error)
|
||||
continue
|
||||
|
||||
index = len(visited_links)
|
||||
index = len(session_ctx.visited_links)
|
||||
logger.info(f"{index}: Visiting {initial_url}")
|
||||
|
||||
# Add retry mechanism with exponential backoff
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
retry_success = False
|
||||
|
||||
while retry_count < max_retries and not retry_success:
|
||||
while retry_count < self.MAX_RETRIES:
|
||||
if retry_count > 0:
|
||||
# Add a random delay between retries (exponential backoff)
|
||||
delay = min(2**retry_count + random.uniform(0, 1), 10)
|
||||
logger.info(
|
||||
f"Retry {retry_count}/{self.MAX_RETRIES} for {initial_url} after {delay:.2f}s delay"
|
||||
)
|
||||
time.sleep(delay)
|
||||
|
||||
try:
|
||||
if retry_count > 0:
|
||||
# Add a random delay between retries (exponential backoff)
|
||||
delay = min(2**retry_count + random.uniform(0, 1), 10)
|
||||
logger.info(
|
||||
f"Retry {retry_count}/{max_retries} for {initial_url} after {delay:.2f}s delay"
|
||||
)
|
||||
time.sleep(delay)
|
||||
|
||||
if restart_playwright:
|
||||
playwright, context = start_playwright()
|
||||
restart_playwright = False
|
||||
|
||||
# Handle cookies for the URL
|
||||
_handle_cookies(context, initial_url)
|
||||
|
||||
# First do a HEAD request to check content type without downloading the entire content
|
||||
head_response = requests.head(
|
||||
initial_url, headers=DEFAULT_HEADERS, allow_redirects=True
|
||||
)
|
||||
is_pdf = is_pdf_content(head_response)
|
||||
|
||||
if is_pdf or initial_url.lower().endswith(".pdf"):
|
||||
# PDF files are not checked for links
|
||||
response = requests.get(initial_url, headers=DEFAULT_HEADERS)
|
||||
page_text, metadata, images = read_pdf_file(
|
||||
file=io.BytesIO(response.content)
|
||||
)
|
||||
last_modified = response.headers.get("Last-Modified")
|
||||
|
||||
doc_batch.append(
|
||||
Document(
|
||||
id=initial_url,
|
||||
sections=[
|
||||
TextSection(link=initial_url, text=page_text)
|
||||
],
|
||||
source=DocumentSource.WEB,
|
||||
semantic_identifier=initial_url.split("/")[-1],
|
||||
metadata=metadata,
|
||||
doc_updated_at=(
|
||||
_get_datetime_from_last_modified_header(
|
||||
last_modified
|
||||
)
|
||||
if last_modified
|
||||
else None
|
||||
),
|
||||
)
|
||||
)
|
||||
retry_success = True
|
||||
result = self._do_scrape(index, initial_url, session_ctx)
|
||||
if result.retry:
|
||||
continue
|
||||
|
||||
page = context.new_page()
|
||||
|
||||
if self.add_randomness:
|
||||
# Add random mouse movements and scrolling to mimic human behavior
|
||||
page.mouse.move(
|
||||
random.randint(100, 700), random.randint(100, 500)
|
||||
)
|
||||
|
||||
# Can't use wait_until="networkidle" because it interferes with the scrolling behavior
|
||||
page_response = page.goto(
|
||||
initial_url,
|
||||
timeout=30000, # 30 seconds
|
||||
wait_until="domcontentloaded", # Wait for DOM to be ready
|
||||
)
|
||||
|
||||
# Add a small random delay to mimic human behavior
|
||||
time.sleep(random.uniform(0.5, 2.0))
|
||||
|
||||
# Check if we got a 403 error
|
||||
if page_response and page_response.status == 403:
|
||||
logger.warning(
|
||||
f"Received 403 Forbidden for {initial_url}, retrying..."
|
||||
)
|
||||
page.close()
|
||||
retry_count += 1
|
||||
continue
|
||||
|
||||
last_modified = (
|
||||
page_response.header_value("Last-Modified")
|
||||
if page_response
|
||||
else None
|
||||
)
|
||||
final_url = page.url
|
||||
if final_url != initial_url:
|
||||
protected_url_check(final_url)
|
||||
initial_url = final_url
|
||||
if initial_url in visited_links:
|
||||
logger.info(
|
||||
f"{index}: {initial_url} redirected to {final_url} - already indexed"
|
||||
)
|
||||
page.close()
|
||||
retry_success = True
|
||||
continue
|
||||
logger.info(f"{index}: {initial_url} redirected to {final_url}")
|
||||
visited_links.add(initial_url)
|
||||
|
||||
# If we got here, the request was successful
|
||||
retry_success = True
|
||||
|
||||
if self.scroll_before_scraping:
|
||||
scroll_attempts = 0
|
||||
previous_height = page.evaluate("document.body.scrollHeight")
|
||||
while scroll_attempts < WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS:
|
||||
page.evaluate(
|
||||
"window.scrollTo(0, document.body.scrollHeight)"
|
||||
)
|
||||
page.wait_for_load_state("networkidle", timeout=30000)
|
||||
new_height = page.evaluate("document.body.scrollHeight")
|
||||
if new_height == previous_height:
|
||||
break # Stop scrolling when no more content is loaded
|
||||
previous_height = new_height
|
||||
scroll_attempts += 1
|
||||
|
||||
content = page.content()
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
|
||||
if self.recursive:
|
||||
internal_links = get_internal_links(base_url, initial_url, soup)
|
||||
for link in internal_links:
|
||||
if link not in visited_links:
|
||||
to_visit.append(link)
|
||||
|
||||
if page_response and str(page_response.status)[0] in ("4", "5"):
|
||||
last_error = f"Skipped indexing {initial_url} due to HTTP {page_response.status} response"
|
||||
logger.info(last_error)
|
||||
continue
|
||||
|
||||
parsed_html = web_html_cleanup(soup, self.mintlify_cleanup)
|
||||
|
||||
"""For websites containing iframes that need to be scraped,
|
||||
the code below can extract text from within these iframes.
|
||||
"""
|
||||
logger.debug(
|
||||
f"{index}: Length of cleaned text {len(parsed_html.cleaned_text)}"
|
||||
)
|
||||
if JAVASCRIPT_DISABLED_MESSAGE in parsed_html.cleaned_text:
|
||||
iframe_count = (
|
||||
page.frame_locator("iframe").locator("html").count()
|
||||
)
|
||||
if iframe_count > 0:
|
||||
iframe_texts = (
|
||||
page.frame_locator("iframe")
|
||||
.locator("html")
|
||||
.all_inner_texts()
|
||||
)
|
||||
document_text = "\n".join(iframe_texts)
|
||||
""" 700 is the threshold value for the length of the text extracted
|
||||
from the iframe based on the issue faced """
|
||||
if (
|
||||
len(parsed_html.cleaned_text)
|
||||
< IFRAME_TEXT_LENGTH_THRESHOLD
|
||||
):
|
||||
parsed_html.cleaned_text = document_text
|
||||
else:
|
||||
parsed_html.cleaned_text += "\n" + document_text
|
||||
|
||||
# Sometimes pages with #! will serve duplicate content
|
||||
# There are also just other ways this can happen
|
||||
hashed_text = hash((parsed_html.title, parsed_html.cleaned_text))
|
||||
if hashed_text in content_hashes:
|
||||
logger.info(
|
||||
f"{index}: Skipping duplicate title + content for {initial_url}"
|
||||
)
|
||||
continue
|
||||
content_hashes.add(hashed_text)
|
||||
|
||||
doc_batch.append(
|
||||
Document(
|
||||
id=initial_url,
|
||||
sections=[
|
||||
TextSection(
|
||||
link=initial_url, text=parsed_html.cleaned_text
|
||||
)
|
||||
],
|
||||
source=DocumentSource.WEB,
|
||||
semantic_identifier=parsed_html.title or initial_url,
|
||||
metadata={},
|
||||
doc_updated_at=(
|
||||
_get_datetime_from_last_modified_header(last_modified)
|
||||
if last_modified
|
||||
else None
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
page.close()
|
||||
if result.doc:
|
||||
session_ctx.doc_batch.append(result.doc)
|
||||
except Exception as e:
|
||||
last_error = f"Failed to fetch '{initial_url}': {e}"
|
||||
logger.exception(last_error)
|
||||
playwright.stop()
|
||||
restart_playwright = True
|
||||
session_ctx.last_error = f"Failed to fetch '{initial_url}': {e}"
|
||||
logger.exception(session_ctx.last_error)
|
||||
session_ctx.initialize()
|
||||
continue
|
||||
finally:
|
||||
retry_count += 1
|
||||
|
||||
if len(doc_batch) >= self.batch_size:
|
||||
playwright.stop()
|
||||
restart_playwright = True
|
||||
at_least_one_doc = True
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
break # success / don't retry
|
||||
|
||||
if doc_batch:
|
||||
playwright.stop()
|
||||
at_least_one_doc = True
|
||||
yield doc_batch
|
||||
if len(session_ctx.doc_batch) >= self.batch_size:
|
||||
session_ctx.initialize()
|
||||
session_ctx.at_least_one_doc = True
|
||||
yield session_ctx.doc_batch
|
||||
session_ctx.doc_batch = []
|
||||
|
||||
if not at_least_one_doc:
|
||||
if last_error:
|
||||
raise RuntimeError(last_error)
|
||||
if session_ctx.doc_batch:
|
||||
session_ctx.stop()
|
||||
session_ctx.at_least_one_doc = True
|
||||
yield session_ctx.doc_batch
|
||||
|
||||
if not session_ctx.at_least_one_doc:
|
||||
if session_ctx.last_error:
|
||||
raise RuntimeError(session_ctx.last_error)
|
||||
raise RuntimeError("No valid pages found.")
|
||||
|
||||
session_ctx.stop()
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
# Make sure we have at least one valid URL to check
|
||||
if not self.to_visit_list:
|
||||
|
Loading…
x
Reference in New Issue
Block a user