scrape/scrape.py
2025-01-26 17:48:09 -05:00

215 lines
6.8 KiB
Python

"""
title: Web Scraper
author: Mark Bailey
author_url: https://git.markbailey.dev/cerbervs
git_url: https://git.markbailey.dev/cerbervs/scrape.git
description: Scrapes web with option for recursive scraping.
requirements: websocket, requests, bs4, pydantic
version: 2.1.5
licence: MIT
"""
import json
import websocket
import requests
from urllib.parse import urlparse as parse_url
from bs4 import BeautifulSoup
from pydantic import BaseModel, ConfigDict, Field
class Tools:
class RecursiveScraper(BaseModel):
req_limit: int
visited: list = []
netloc: str | None = None
scheme: str | None = None
data: dict = {}
model_config = ConfigDict(arbitrary_types_allowed=True)
def scrape_website(
self,
url: str,
) -> dict:
if self.netloc is None:
self.netloc = parse_url(url).netloc
self.scheme = parse_url(url).scheme
if self.req_limit == 0:
return self.data
try:
# Clean the URL
cleaned_url = self.clean_link(url)
if cleaned_url is None:
raise Exception("Invalid URL: " + url)
if cleaned_url in self.visited:
return {}
# Try to send GET request using WebSocket
try:
ws = websocket.create_connection(f"ws://{self.netloc}", timeout=1)
ws.send(json.dumps({"url": cleaned_url}))
response = ws.recv()
ws.close()
except Exception:
# Fall back to requests library if WebSocket fails
response = requests.get(cleaned_url).text
if not response:
self.visited.append(cleaned_url)
raise Exception("Failed to fetch URL: " + cleaned_url)
# Parse HTML content using BeautifulSoup and lxml parser
soup = BeautifulSoup(response, "lxml")
data = self.extract_data(soup)
links = data["links"]
keep = ["title", "headings", "paragraphs", "images"]
for key in list(data.keys()):
if key not in keep:
del data[key]
# Mark URL as visited
self.visited.append(cleaned_url)
self.data.update({cleaned_url: data})
self.req_limit -= 1
# Scrape all links in the page
for link in links:
self.scrape_website(link)
return self.data
except Exception as e:
print(f"Error when trying URL: {e}")
return {}
def clean_link(self, link) -> str | None:
parsed_link = parse_url(link)
"""<scheme>://<netloc>/<path>;<params>?<query>#<fragment>"""
scheme = parsed_link.scheme
netloc = parsed_link.netloc
path = parsed_link.path
params = parsed_link.params
query = parsed_link.query
fragment = parsed_link.fragment
if self.netloc is None or self.scheme is None:
return None
if netloc is not None and netloc != "" and netloc != self.netloc:
return None
# clean each part of the URL and then reconstruct it
if scheme is None or scheme == "":
scheme = self.scheme
if netloc is None or netloc == "":
netloc = self.netloc
if path is not None and path != "":
path = "/" + path
if params is not None and params != "":
params = ";" + params
if query is not None and query != "":
query = "?" + query
if fragment is not None and fragment != "":
fragment = "#" + fragment
return f"{scheme}://{netloc}{path}{params}{query}{fragment}"
def extract_data(self, soup: BeautifulSoup) -> dict:
# Extract data
data = {
"title": soup.title.string if soup.title else None,
"headings": [],
"paragraphs": [],
"links": [],
"images": [],
}
# Find all headings (h1 to h6)
headings = soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6"])
if headings:
data["headings"].extend([tag.get_text() for tag in headings])
# Find all paragraphs
paragraphs = soup.find_all("p", recursive=True)
if paragraphs:
data["paragraphs"] = [p.get_text() for p in paragraphs]
divs = soup.find_all("div", recursive=True)
if divs:
data["divs"] = [div.get_text() for div in divs]
lis = soup.find_all("li", recursive=True)
if lis:
data["lis"] = [li.get_text() for li in lis]
# Extract all links
links = soup.find_all("a", href=True)
if links:
data["links"] = [link["href"] for link in links if link["href"]]
# Extract image sources
images = soup.find_all("img", src=True)
if images:
data["images"] = [img["src"] for img in images if img["src"]]
return data
def __init__(self, req_limit: int):
super().__init__(req_limit=req_limit)
class UserValves(BaseModel):
single_request: bool = Field(default=False, description="Single Request")
pass
class Valves(UserValves):
request_limit: int = Field(default=5, description="Request Limit")
pass
def __init__(self):
"""
Initializes the Tools class.
:params req_limit: The number of requests to be made to scrape the website.
"""
self.citation: bool = True
self.valves = self.Valves()
self.user_valves = self.UserValves()
def scrape_recursively(self, url: str) -> str:
"""
Scrapes data from a web page using requests and BeautifulSoup.
:params url: The URL of the web page to be scraped.
"""
single_request = self.user_valves.single_request or self.valves.single_request
if single_request:
request_limit = 1
else:
request_limit = self.valves.request_limit
print(
f"Single Request Mode - {single_request} - Request Limit - {request_limit}"
)
scraper = self.RecursiveScraper(request_limit)
data = scraper.scrape_website(url)
json_s = json.dumps(data)
del scraper
return json_s
if __name__ == "__main__":
tools = Tools()
print(tools.scrape_recursively("https://pkg.go.dev/github.com/go-chi/chi/v5"))