scrape/scrape.py
Mark Bailey 023760c694 feat: v2.1.3
* make text finding recursive
2025-01-26 16:44:05 -05:00

201 lines
6.3 KiB
Python

"""
title: Web Scraper
author: Mark Bailey
author_url: https://git.markbailey.dev/cerbervs
git_url: https://git.markbailey.dev/cerbervs/scrape.git
description: Scrapes web with option for recursive scraping.
requirements: websocket, requests, bs4, pydantic
version: 2.1.2
licence: MIT
"""
import json
import websocket
import requests
from urllib.parse import urlparse as parse_url
from bs4 import BeautifulSoup
from pydantic import BaseModel, ConfigDict, Field
class Tools:
class RecursiveScraper(BaseModel):
req_limit: int
visited: list = []
netloc: str | None = None
scheme: str | None = None
data: dict = {}
model_config = ConfigDict(arbitrary_types_allowed=True)
def set_req_limit(self, req_limit: int):
self.req_limit = req_limit
def scrape_website(
self,
url: str,
) -> dict:
if self.netloc is None:
self.netloc = parse_url(url).netloc
self.scheme = parse_url(url).scheme
if self.req_limit == 0:
return self.data
try:
# Clean the URL
cleaned_url = self.clean_link(url)
if cleaned_url is None:
raise Exception("Invalid URL: " + url)
if cleaned_url in self.visited:
return {}
# Try to send GET request using WebSocket
try:
ws = websocket.create_connection(f"ws://{self.netloc}", timeout=1)
ws.send(json.dumps({"url": cleaned_url}))
response = ws.recv()
ws.close()
except Exception:
# Fall back to requests library if WebSocket fails
response = requests.get(cleaned_url).text
if not response:
self.visited.append(cleaned_url)
raise Exception("Failed to fetch URL: " + cleaned_url)
# Parse HTML content using BeautifulSoup and lxml parser
soup = BeautifulSoup(response, "lxml")
data = self.extract_data(soup)
links = data["links"]
keep = ["title", "headings", "paragraphs", "images"]
for key in list(data.keys()):
if key not in keep:
del data[key]
# Mark URL as visited
self.visited.append(url)
self.data.update({url: data})
self.req_limit -= 1
# Scrape all links in the page
for link in links:
self.scrape_website(link)
return self.data
except Exception:
return {}
def clean_link(self, link) -> str | None:
parsed_link = parse_url(link)
netloc = parsed_link.netloc
path = parsed_link.path
fragment = parsed_link.fragment
if netloc is not None and netloc != "" and netloc != self.netloc:
return None
if path is not None and path != "":
if path.endswith("/"):
path = path[:-1]
if not path.startswith("/"):
path = "/" + path
if fragment is not None or fragment != "":
if parsed_link.fragment.endswith("/"):
link = link[:-1]
if not fragment.startswith("/"):
fragment = "/" + fragment
if self.netloc is None or self.scheme is None:
return None
link = self.netloc + path + fragment
link = self.scheme + "://" + link.replace("//", "/")
return link
def extract_data(self, soup: BeautifulSoup) -> dict:
# Extract data
data = {
"title": soup.title.string if soup.title else None,
"headings": [],
"paragraphs": [],
"links": [],
"images": [],
}
# Find all headings (h1 to h6)
headings = soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6"])
if headings:
data["headings"].extend([tag.get_text() for tag in headings])
# Find all paragraphs
paragraphs = soup.find_all("p", recursive=True)
if paragraphs:
data["paragraphs"] = [p.get_text() for p in paragraphs]
divs = soup.find_all("div", recursive=True)
if divs:
data["divs"] = [div.get_text() for div in divs]
lis = soup.find_all("li", recursive=True)
if lis:
data["lis"] = [li.get_text() for li in lis]
# Extract all links
links = soup.find_all("a", href=True)
if links:
data["links"] = [link["href"] for link in links if link["href"]]
# Extract image sources
images = soup.find_all("img", src=True)
if images:
data["images"] = [img["src"] for img in images if img["src"]]
return data
def __init__(self, req_limit: int):
super().__init__(req_limit=req_limit)
class UserValves(BaseModel):
single_request: bool = Field(default=False, description="Single Request")
pass
class Valves(UserValves):
request_limit: int = Field(default=5, description="Request Limit")
pass
def __init__(self):
"""
Initializes the Tools class.
:params req_limit: The number of requests to be made to scrape the website.
"""
self.citation: bool = True
self.valves = self.Valves()
self.user_valves = self.UserValves()
def scrape_recursively(self, url: str) -> str:
"""
Scrapes data from a web page using requests and BeautifulSoup.
:params url: The URL of the web page to be scraped.
"""
if self.user_valves.single_request:
request_limit = 1
else:
request_limit = self.valves.request_limit
scraper = self.RecursiveScraper(request_limit)
data = scraper.scrape_website(url)
json_s = json.dumps(data)
return json_s
if __name__ == "__main__":
tools = Tools()
print(tools.scrape_recursively("https://en.wikipedia.org/wiki/Shamisen"))