diff --git a/docs/1_getting_started.md b/docs/1_getting_started.md index 27b16f7dc..66c3594aa 100644 --- a/docs/1_getting_started.md +++ b/docs/1_getting_started.md @@ -49,8 +49,8 @@ crawler = Crawler(PublisherCollection) # How to crawl articles -Now to crawl articles make use of the `crawl()` method of the initialized crawler class. -Calling this will return an `Iterator` over articles. +To crawl articles, call the `crawl()` method of the initialized crawler. +This returns an `Iterator` over articles. Let's crawl one news article from a publisher based in the US and print it. @@ -76,7 +76,7 @@ Fundus-Article: - From: FreeBeacon (2023-05-11 18:41) ``` -You can also crawl all available articles by simply removing the `max_articles` parameter. +You can also crawl all available articles by simply omitting the `max_articles` parameter. ```` python # crawl all available articles diff --git a/docs/2_crawl_from_cc_news.md b/docs/2_crawl_from_cc_news.md index fd2dc25c2..f06930bba 100644 --- a/docs/2_crawl_from_cc_news.md +++ b/docs/2_crawl_from_cc_news.md @@ -59,9 +59,9 @@ The CC-NEWS dataset consists of multiple terabytes of articles. Due to the sheer amount of data, the crawler utilizes multiple processes. Per default, it uses all CPUs available in your system. You can alter the number of additional processes used for crawling with the `processes` parameter of `CCNewsCrawler`. -For optimal performance, we recommend setting the amount of process used manually. +For optimal performance, we recommend setting the number of processes used manually. A good rule of thumb is to allocate `one process per 200 Mbps of bandwidth`. -This can vary depending on the actual speed of your cpu cores. +This can vary depending on the actual speed of your CPU cores. ````python from fundus import CCNewsCrawler, PublisherCollection @@ -70,7 +70,7 @@ from fundus import CCNewsCrawler, PublisherCollection crawler = CCNewsCrawler(*PublisherCollection, processes=5) ```` -To omit multiprocessing, pass `-1` to the `processes` parameter. +To omit multiprocessing, pass `0` to the `processes` parameter. In the [next section](3_the_article_class.md) we will introduce you to the `Article` class. diff --git a/docs/3_the_article_class.md b/docs/3_the_article_class.md index dc5884e68..c147dd4c4 100644 --- a/docs/3_the_article_class.md +++ b/docs/3_the_article_class.md @@ -38,37 +38,41 @@ Donald Trump asks judge to delay classified documents trial Now have a look at the [**attribute guidelines**](attribute_guidelines.md). All attributes listed here can be safely accessed through the `Article` class. -**_NOTE:_** The listed attributes represent fields of the `Article` dataclass with all of them having default values. +> [!NOTE] +> The listed attributes are exposed as properties of the `Article` class, each falling back to a default value when the parser is unable to extract it. Some parsers may support additional attributes not listed in the guidelines. You can find those attributes under the [**supported publisher**](supported_publishers.md) tables under `Additional Attributes`. -**_NOTE:_** Keep in mind that these additional attributes are specific to a parser and cannot be accessed safely for every article. +> [!NOTE] +> Keep in mind that these additional attributes are specific to a parser and cannot be accessed safely for every article. Sometimes an attribute listed in the attribute guidelines isn't supported at all by a specific parser. You can find this information under the `Missing Attributes` tab within the supported publisher tables. -There is also a built-in search mechanic you can learn about [here](5_advanced_topics) +There is also a built-in search mechanism you can learn about [here](5_advanced_topics.md). ## The articles' body Fundus supports two methods to access the body of the article 1. Accessing the `plaintext` property of `Article` with `article.plaintext`. - This will return a cleaned and formatted version of the article body as a single string object and should be suitable for most use cases.
- **_NOTE:_** The different DOM elements are joined with two new lines and cleaned with `split()` and `' '.join()`. + This will return a cleaned and formatted version of the article body as a single string object and should be suitable for most use cases. 2. Accessing the `body` attribute of `Article`. This returns an `ArticleBody` instance, granting more fine-grained access to the DOM structure of the article body. +> [!NOTE] +> When the body is rendered as text, its DOM elements are joined with two newlines and normalized with `split()` and `' '.join()`. + The `ArticleBody` consists of - a `summary` giving a brief introduction of the article -- a attribute `sections` containing multiple `ArticleSection` +- an attribute `sections` containing multiple `ArticleSection` With `ArticleSection` including - a `headline`; separating the section from other sections - multiple `paragraphs` following the headline ````console -ArticleSection - |-- headline: TextSequence +ArticleBody + |-- summary: TextSequence |-- sections: List[ArticleSection] |-- headline: TextSequence |-- paragraphs: TextSequence @@ -101,9 +105,10 @@ This is a paragraph: When someone dies, the executor presents their will [...] This is a paragraph: People who would like to keep the details of their [...] ``` -**_NOTE:_** Not all publishers support the layout format shown above. -Sometimes headlines are missing or the entire summary is. -You can always check the specific parser what to expect, but even within publishers, the layout differs from article to article. +> [!NOTE] +> Not all publishers support the layout format shown above. +> Sometimes headlines are missing or the entire summary is. +> You can always check the specific parser what to expect, but even within publishers, the layout differs from article to article. ## HTML @@ -116,7 +121,7 @@ Here you have access to the following information: Often the same as `requested_url`; can change with redirects. 3. `content: str`: The HTML content. 4. `crawl_date: datetime`: The exact timestamp the article was crawled. -5. `source_info: SourceInfo`: Some information about the HTML's origins, mostly for debugging purpose. +5. `source_info: SourceInfo`: Provenance metadata about the HTML's origin, mostly for debugging purposes. ## Images @@ -170,6 +175,6 @@ for article in crawler.crawl(max_articles=10): article_json = article.to_json("title", "plaintext", "lang") ```` -To save all articles at once, using the default serialization and only specifying a location, refer to [this section](5_advanced_topics.md#saving-the-crawled-articles). +To save all articles at once, using the default serialization and only specifying a location, refer to [this section](1_getting_started.md#saving-crawled-articles). In the [**next section**](4_how_to_filter_articles.md) we will show you how to filter articles. diff --git a/docs/4_how_to_filter_articles.md b/docs/4_how_to_filter_articles.md index a881f10fc..4a99980d8 100644 --- a/docs/4_how_to_filter_articles.md +++ b/docs/4_how_to_filter_articles.md @@ -3,7 +3,7 @@ * [How to filter articles](#how-to-filter-articles) * [Extraction filter](#extraction-filter) * [Custom extraction filter](#custom-extraction-filter) - * [Some more extraction filter examples:](#some-more-extraction-filter-examples) + * [Some more extraction filter examples](#some-more-extraction-filter-examples) * [URL filter](#url-filter) * [Combine filters](#combine-filters) * [Filter sources](#filter-sources) @@ -20,7 +20,7 @@ A specific article may not contain all attributes the parser is capable of extra By default, Fundus drops all articles without at least a title, body, and publishing date extracted to ensure data quality. To alter this behavior make use of the `only_complete` parameter of the `crawl()` method. You have three options to do so: -- Use the build in `ExtractionFilter` `Requires`, or write a custome one. +- Use the built-in `ExtractionFilter` `Requires`, or write a custom one. - Set it to `false` to disable extraction filtering entirely. - Set it to `true` to yield only fully extracted articles. @@ -35,7 +35,8 @@ for article in crawler.crawl(max_articles=2, only_complete=Requires("title", "bo print(article) ```` -**_NOTE:_** We recommend thinking about what kind of data is needed first and then running Fundus with a configured extraction filter afterward. +> [!NOTE] +> We recommend thinking about what kind of data is needed first and then running Fundus with a configured extraction filter afterward. ### Custom extraction filter @@ -64,11 +65,12 @@ for us_themed_article in crawler.crawl(only_complete=topic_filter): print(us_themed_article) ```` -**_NOTE:_** Fundus' filters work inversely to Python's built-in filter. -A filter in Fundus describes what is filtered out and not what's kept. -If a filter returns True on a specific element the element will be dropped. +> [!NOTE] +> Fundus' filters work inversely to Python's built-in filter. +> A filter in Fundus describes what is filtered out and not what's kept. +> If a filter returns True on a specific element, the element will be dropped. -#### Some more extraction filter examples: +#### Some more extraction filter examples ````python # only select articles from the past seven days @@ -106,8 +108,8 @@ for article in crawler.crawl(max_articles=5, url_filter=regex_filter("advertisem print(article.html.requested_url) ```` -Often it's useful to select certain criteria rather than filtering them. -To do so use the `inverse` operator from `fundus.scraping.filter.py`. +Often it's useful to select for certain criteria rather than filtering them out. +To do so use the `inverse` operator from `fundus.scraping.filter`. Let's crawl a bunch of articles with URLs including the string `politic`. @@ -131,12 +133,13 @@ https://www.cnbc.com/2023/07/12/thai-elections-deep-generational-divides-belie-t https://www.reuters.com/business/autos-transportation/volkswagens-china-chief-welcomes-political-goal-germanys-beijing-strategy-2023-07-13/ ```` -**_NOTE:_** As with the `ExtractionFilter` you can also write custom URL filters satisfying the `URLFilter` protocol. +> [!NOTE] +> As with the `ExtractionFilter` you can also write custom URL filters satisfying the `URLFilter` protocol. ### Combine filters Sometimes it is useful to combine filters of the same kind. -You can do so by using the `lor` (logic `or`) and `land` (logic `and`) operators from `fundus.scraping.filter.py`. +You can do so by using the `lor` (logic `or`) and `land` (logic `and`) operators from `fundus.scraping.filter`. Let's combine both URL filters from the examples above and add a new condition. Our goal is to get articles that include both strings 'politic' and 'trump' in their URL and don't include the strings 'podcast' or 'advertisement'. @@ -169,8 +172,9 @@ https://www.thegatewaypundit.com/2023/06/pres-trump-defends-punching-down-politi https://www.thegatewaypundit.com/2023/06/breaking-poll-trump-most-popular-politician-country-rfk/ ```` -**_NOTE:_** You can use the `combine`, `lor`, and `land` operators on `ExtractionFilter` as well. -Make sure to only use them on filters of the same kind. +> [!NOTE] +> You can use the `lor` and `land` operators on `ExtractionFilter` as well. +> Make sure to only use them on filters of the same kind. ## Filter sources @@ -179,7 +183,8 @@ Fundus supports different sources for articles which are split into two categori 1. Only recent articles: `RSSFeed`, `NewsMap` (recommended for continuous crawling jobs) 2. The whole site: `Sitemap` (recommended for one-time crawling) -**_NOTE:_** Sometimes the `Sitemap` provided by a specific publisher won't span the entire site. +> [!NOTE] +> Sometimes the `Sitemap` provided by a specific publisher won't span the entire site. You can preselect the source for your articles when initializing a new `Crawler`. Let's initiate a crawler who only crawls from `NewsMaps`'s. @@ -190,7 +195,8 @@ from fundus import Crawler, PublisherCollection, NewsMap crawler = Crawler(PublisherCollection.us, restrict_sources_to=[NewsMap]) ```` -**_NOTE:_** The `restrict_sources_to` parameter expects a list as value to specify multiple sources at once, e.g. `[RSSFeed, NewsMap]` +> [!NOTE] +> The `restrict_sources_to` parameter expects a list as value to specify multiple sources at once, e.g. `[RSSFeed, NewsMap]` ## Filter unique articles @@ -202,4 +208,4 @@ You can alter this behavior by setting the `only_unique` parameter. Finally, the `crawl()` method also allows you to filter articles by language. You can do so by passing a list of 2 letter language codes ([ISO 639-1](https://en.wikipedia.org/wiki/List_of_ISO_639_language_codes)) to the method using the `language_filter` parameter. -In the [next section](5_advanced_topics) we will guide you through advanced topics as how to search through publishers in the `PublisherCollection` and how to deal with deprecated publishers. +In the [next section](5_advanced_topics.md) we will guide you through advanced topics such as how to search through publishers in the `PublisherCollection` and how to deal with deprecated publishers. diff --git a/docs/5_advanced_topics.md b/docs/5_advanced_topics.md index bf766dfb5..6cbc384b6 100644 --- a/docs/5_advanced_topics.md +++ b/docs/5_advanced_topics.md @@ -1,13 +1,13 @@ # Table of Contents -* [Advanced Topics](#advanced-topics) +* [Advanced topics](#advanced-topics) * [How to search for publishers](#how-to-search-for-publishers) * [Using `search()`](#using-search) * [Working with deprecated publishers](#working-with-deprecated-publishers) * [Filtering publishers for AI training](#filtering-publishers-for-ai-training) * [Browser impersonation](#browser-impersonation) -# Advanced Topics +# Advanced topics This tutorial will show further options such as searching for specific publishers in the `PublisherCollection` or dealing with deprecated ones. @@ -19,7 +19,7 @@ There are quite a few differences between the publishers, especially in the attr You can search through the collection to get only publishers fitting your use case by utilizing the `search()` method. Let's get some publishers based in the US, supporting an attribute called `topics` and `NewsMap` as a source, and use them to initialize a crawler afterward. -The `search()` method also implements an internal language filter, allowing you to restrict your results to a specific languages. +The `search()` method also implements an internal language filter, allowing you to restrict your results to specific languages. In this example, we are only interested in Spanish articles. ````python @@ -32,7 +32,7 @@ crawler = Crawler(*fitting_publishers) ## Working with deprecated publishers When we notice that a publisher is uncrawlable for whatever reason, we will mark it with a deprecated flag. -This mostly has internal usages, since the default value for the `Crawler` `ignore_deprecated` flag is `False`. +This is mostly for internal use, since the `Crawler`'s `ignore_deprecated` flag defaults to `False`. You can alter this behaviour when initiating the `Crawler` and setting the `ignore_deprecated` flag. ## Filtering publishers for AI training diff --git a/docs/6_logging.md b/docs/6_logging.md index d82f865e9..1744c696c 100644 --- a/docs/6_logging.md +++ b/docs/6_logging.md @@ -1,7 +1,7 @@ # Table of Contents * [Logging in Fundus](#logging-in-fundus) - * [Principals](#principals) + * [Principles](#principals) * [Accessing loggers](#accessing-loggers) * [Changing log levels](#changing-log-levels) * [Format and handlers](#format-and-handlers) @@ -10,9 +10,9 @@ This tutorial will introduce you to the logging mechanics used in Fundus -## Principals +## Principles -Fundus uses module scoped logging with module names as logger names. +Fundus uses module-scoped logging with module names as logger names. Not every module has a logger per se, but every module that logs a message has. All module related implementation is centralized in Fundus' logging module under `fundus.logging`. @@ -25,14 +25,15 @@ Fundus uses 4 different log levels: with default log level for all Fundus loggers being `ERROR`. -*__NOTE__*: Depending on the spawn method (spawn) your OS uses to spawn new processes in python (this effects mostly Windows), log messages beneath `ERROR` won't be received when using multiprocessing. +> [!NOTE] +> Depending on the start method your OS uses to spawn new processes in Python (this mainly affects Windows), log messages below `ERROR` won't be received when using multiprocessing. ## Accessing loggers You can import a specific logger from the corresponding module like this: ````python -from fundus.scraping.crawler import logger +from fundus.scraping.crawler.web import logger ```` Or find a collection of all existing loggers with their module names here: @@ -58,7 +59,7 @@ from fundus.logging import set_log_level set_log_level(logging.DEBUG) ```` -## Format and Handlers +## Format and handlers By default, all Fundus log messages are written to `stderr` with the following format `%(asctime)s - %(name)s - %(levelname)s - %(message)s` To add another handler use the `add_handler` function. @@ -73,5 +74,6 @@ file_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(mes add_handler(file_handler) ```` -*__NOTE__*: All of the above can also be done individually for every logger by [accessing loggers](#accessing-loggers) directly. +> [!NOTE] +> All of the above can also be done individually for every logger by [accessing loggers](#accessing-loggers) directly. diff --git a/docs/attribute_guidelines.md b/docs/attribute_guidelines.md index ded38695f..2d6c9ebdb 100644 --- a/docs/attribute_guidelines.md +++ b/docs/attribute_guidelines.md @@ -4,14 +4,16 @@ Consistency between publishers and parsers is a main goal, please report any cas document. If you want to contribute a parser to this library, please ensure that these attributes are named consistently. -**_NOTE:_** There are certain utility functions to aid you with parsing. -These can be found under `fundus/parser/utility.py`. -We *highly* recommend using them. +> [!NOTE] +> There are certain utility functions to aid you with parsing. +> These can be found under `fundus/parser/utility.py`. +> We *highly* recommend using them. The following table lists Fundus' core attributes and includes the name of the corresponding utility function. Those attributes will be validated with unit tests when used. -**_NOTE:_** If you want to bypass validation you can set the `validate` parameter of the `attribute` decorator to false. +> [!NOTE] +> If you want to bypass validation you can set the `validate` parameter of the `attribute` decorator to `False`. ## Attributes table @@ -60,7 +62,7 @@ Those attributes will be validated with unit tests when used. free_access - A boolean which is set to be False, if the article is restricted to users with a subscription. This usually indicates + A boolean that is False if the article is restricted to users with a subscription. This usually indicates that the article cannot be crawled completely. This attribute is implemented by default bool @@ -68,8 +70,8 @@ Those attributes will be validated with unit tests when used. images - A list of `Images` - Fundus own datatype for image representation - included within the article. - The `Images` include metadata like caption, authors, and position if available. + A list of `Image` objects — Fundus' own datatype for image representation — included within the article. + The `Image` objects include metadata like caption, authors, and position if available. List[Image] image_extraction diff --git a/docs/how_to_add_a_publisher.md b/docs/how_to_add_a_publisher.md index a3a063bce..ba23bd35a 100644 --- a/docs/how_to_add_a_publisher.md +++ b/docs/how_to_add_a_publisher.md @@ -1,41 +1,42 @@ # Table of Contents -* [How to add a Publisher](#how-to-add-a-publisher) - * [1. Creating a Parser Stub](#1-creating-a-parser-stub) - * [2. Creating a Publisher Specification](#2-creating-a-publisher-specification) - * [Adding Sources](#adding-sources) +* [How to add a publisher](#how-to-add-a-publisher) + * [1. Creating a parser stub](#1-creating-a-parser-stub) + * [2. Creating a publisher specification](#2-creating-a-publisher-specification) + * [Adding sources](#adding-sources) * [Different `URLSource` types](#different-urlsource-types) * [How to specify a `URLSource`](#how-to-specify-a-urlsource) * [RSS feeds](#rss-feeds) * [Sitemaps](#sitemaps) * [How to differentiate between `Sitemap` and `NewsMap`](#how-to-differentiate-between-sitemap-and-newsmap) - * [Finishing the Publisher Specification](#finishing-the-publisher-specification) - * [4. Validating the Current Implementation Progress](#4-validating-the-current-implementation-progress) - * [5. Implementing the Parser](#5-implementing-the-parser) - * [Extracting Attributes from Precomputed](#extracting-attributes-from-precomputed) - * [Extracting Attributes with XPath and CSS-Select](#extracting-attributes-with-xpath-and-css-select) + * [Finishing the publisher specification](#finishing-the-publisher-specification) + * [3. Validating the current implementation progress](#3-validating-the-current-implementation-progress) + * [4. Implementing the parser](#4-implementing-the-parser) + * [Extracting attributes from Precomputed](#extracting-attributes-from-precomputed) + * [Extracting attributes with XPath and CSS-Select](#extracting-attributes-with-xpath-and-css-select) * [Working with `lxml`](#working-with-lxml) * [CSS-Select](#css-select) * [XPath](#xpath) * [Extracting the ArticleBody](#extracting-the-articlebody) - * [Extracting the Images](#extracting-the-images) + * [Extracting the images](#extracting-the-images) * [Checking the free_access attribute](#checking-the-free_access-attribute) - * [Finishing the Parser](#finishing-the-parser) - * [6. Generate unit tests and update tables](#6-generate-unit-tests-and-update-tables) + * [Finishing the parser](#finishing-the-parser) + * [5. Generate unit tests and update tables](#5-generate-unit-tests-and-update-tables) * [Add unit tests](#add-unit-tests) * [Update tables](#update-tables) - * [7. Opening a Pull Request](#7-opening-a-pull-request) - * [8. Maintaining publishers](#8-maintaining-publishers) + * [6. Opening a pull request](#6-opening-a-pull-request) + * [7. Maintaining publishers](#7-maintaining-publishers) -# How to add a Publisher +# How to add a publisher Before contributing a publisher make sure you set up Fundus correctly alongside [these](how_to_contribute.md#setup-fundus) steps. Then check the [**supported publishers**](supported_publishers.md) table if there is already support for your desired publisher. In the following, we will walk you through an example implementation of the [*The Intercept*](https://www.theintercept.com/) covering the best practices for adding a new publisher. -**_NOTE:_**: Before proceeding, it's essential to ensure that the publisher you intend to add is crawl-able. -Fundus keeps track of those who aren't in [this issue](https://github.com/flairNLP/fundus/issues/309). -To verify, simply replace the three dots `...` in the code snippet below with the URL of an article from the publisher you wish to add, and run the snippet afterward. +> [!NOTE] +> Before proceeding, it's essential to ensure that the publisher you intend to add is crawl-able. +> Fundus keeps track of those who aren't in [this issue](https://github.com/flairNLP/fundus/issues/309). +> To verify, simply replace the three dots `...` in the code snippet below with the URL of an article from the publisher you wish to add, and run the snippet afterward. ````python import urllib.request @@ -51,7 +52,7 @@ In such cases, please comment on the issue mentioned above, mentioning the publi This helps keep the list accurate and up-to-date. -## 1. Creating a Parser Stub +## 1. Creating a parser stub Take a look at the file structure in `fundus/publishers`. Fundus uses the [**ALPHA-2**](https://www.iban.com/country-codes) codes specified in ISO3166 to sort publishers into directories by country of origin. @@ -100,7 +101,7 @@ class TheInterceptParser(ParserProxy): Internally, the `ParserProxy` maps crawl dates to specific versions (`V1`, `V2`, etc.) subclassing `BaseParser`. Since Fundus' parsers are handcrafted and usually tied to specific layouts, this proxying step helps address changes to the layout. -## 2. Creating a Publisher Specification +## 2. Creating a publisher specification Next, add a new publisher specification for the publisher you want to cover. The publisher specification links information about the publisher, sources from where to get the HTML to parse, and the corresponding parser used by Fundus' `Crawler`. @@ -120,9 +121,9 @@ class US(PublisherGroup): ) ``` -If the country section for your publisher did not exist before step 1, please add the `PublisherGroup` to the `PublisherCollection` in `fundus/publishers/__init__.py'`. +If the country section for your publisher did not exist before step 1, please add the `PublisherGroup` to the `PublisherCollection` in `fundus/publishers/__init__.py`. -### Adding Sources +### Adding sources For your newly added publisher to work you first need to specify where to find articles - in the form of HTML - to parse. Fundus adopts a unique approach by utilizing access points provided by the publishers, rather than resorting to generic web spiders. @@ -131,7 +132,7 @@ Presently, Fundus supports RSS feeds and sitemaps by adding them as correspondin #### Different `URLSource` types -Fundus provides the following types of `URLSource`, which you can import from `fundus.scraping.html`. +Fundus provides the following types of `URLSource`, which you can import from `fundus.scraping.url`. 1. `RSSFeed` - specifying RSS feeds 2. `Sitemap` - specifying sitemaps @@ -139,10 +140,11 @@ Fundus provides the following types of `URLSource`, which you can import from `f Fundus distinguishes between these source types to facilitate crawling only recent articles (`RSSFeed`, `NewsMap`) or an entire website (`Sitemap`). This differentiation is mainly for efficiency reasons. -Refer to [this](4_how_to_filter_articles#filter-sources) documentation on how to filter for different source types. +Refer to [this](4_how_to_filter_articles.md#filter-sources) documentation on how to filter for different source types. -**_NOTE:_** When adding a new publisher, it is recommended to specify at least one `Sitemap` and one `RSSFeed` or `NewsMap` (preferred). -If your publisher provides a `NewsFeed`, there is no need to specify an `RSSFeed`. +> [!NOTE] +> When adding a new publisher, it is recommended to specify at least one `Sitemap` and one `RSSFeed` or `NewsMap` (preferred). +> If your publisher provides a `NewsFeed`, there is no need to specify an `RSSFeed`. #### How to specify a `URLSource` @@ -179,12 +181,13 @@ A typical sitemap looks like this: ... ``` -**_NOTE:_** There is a known issue with Firefox not displaying XML properly. -You can find a plugin to resolve this issue [here](https://addons.mozilla.org/de/firefox/addon/pretty-xml/) +> [!NOTE] +> There is a known issue with Firefox not displaying XML properly. +> You can find a plugin to resolve this issue [here](https://addons.mozilla.org/de/firefox/addon/pretty-xml/) Links to sitemaps are typically found within the `robots.txt` file provided by the publisher, often located at the end of it. To access this file, append `robots.txt` at the end of the publisher's domain. -For example, to access The Intercepts' `robots.txt`, use https://theintercept.com/robots.txt in your preferred browser. +For example, to access The Intercept's `robots.txt`, use https://theintercept.com/robots.txt in your preferred browser. This will give you one sitemap link: ```console @@ -217,7 +220,8 @@ Sitemap: https://theintercept.com/news-sitemap.xml This link points to a NewsMap, which is a special kind of Sitemap. To have a look at how to differentiate between those two, refer to [this](#how-to-differentiate-between-sitemap-and-newsmap) section. -**_NOTE:_** If you wonder why you should reverse your sources from time to time, `URLSource`'s should, if possible, yield URLs in descending order by publishing date. +> [!NOTE] +> If you wonder why you should reverse your sources from time to time, `URLSource`'s should, if possible, yield URLs in descending order by publishing date. Now building a new `URLSource` for a `NewsMap` covering The Intercept looks like this: @@ -237,8 +241,8 @@ You can check if a sitemap is a news map by: While this is a very simple method this can be unreliable. 2. Checking the namespace: Typically there is a namespace `news` defined within a news map using the `xmlns:news` attribute of the `` tag. - E.g. ``
- **_NOTE:_** This can only be found within the actual sitemap and not the index map. + E.g. ``. + Note that this can only be found within the actual sitemap and not the index map. #### Filter noisy sitemaps @@ -255,12 +259,12 @@ sitemap_filter=inverse(regex_filter("sitemap-content-")) ```` will exclude all sitemap URLs not containing the substring `sitemap-content-`. -### Finishing the Publisher Specification +### Finishing the publisher specification -1. If your publisher requires to use custom request headers to work properly you can alter it by using the `request_header` parameter of `PublisherSpec`. +1. If your publisher requires custom request headers to work properly you can set them using the `request_header` parameter of `Publisher`. The default is: `{"user-agent": "Fundus/2.0 (contact: github.com/flairnlp/fundus)"}`. 2. If you want to block URLs for the entire publisher use the `url_filter` parameter of `Publisher`. -3. In some cases it can be necessary to append query parameters to the end of the URL, e.g. to load the article as one page. This can be achieved by adding the `query_parameter` attribute of `PublisherSpec` and assigning it a dictionary object containing the key - value pairs: e.g. `{"page": "all"}`. These key - value pairs will be appended to all crawled URLs. +3. In some cases it can be necessary to append query parameters to the end of the URL, e.g. to load the article as one page. This can be achieved by setting the `query_parameter` parameter of `Publisher` and assigning it a dictionary containing the key-value pairs, e.g. `{"page": "all"}`. These key-value pairs will be appended to all crawled URLs. 4. If the publisher is only reachable through a browser-like TLS/HTTP fingerprint (i.e. plain `requests`/`curl` get blocked by an anti-bot layer such as Cloudflare or Akamai), you can declare a browser profile via the `impersonate` parameter, e.g. `impersonate="chrome"`. See [curl_cffi's supported targets](https://curl-cffi.readthedocs.io/en/latest/impersonate/targets.html) for the full list. Because browser impersonation is an opt-in feature on the user side (see [Browser impersonation](5_advanced_topics.md#browser-impersonation)), the profile only takes effect when the user constructs the `Crawler` with `impersonate=True`; with the default `impersonate=False` your publisher will be requested without impersonation and will likely fail. Only set this when the publisher genuinely cannot be crawled without it. @@ -284,7 +288,7 @@ class US(PublisherGroup): ) ``` -## 4. Validating the Current Implementation Progress +## 3. Validating the current implementation progress Now validate your implementation progress by crawling some example articles from your publisher. The following script fits The Intercept and is adaptable by changing the publisher variable accordingly. @@ -319,14 +323,14 @@ Fundus-Article: Since we didn't add any specific implementation to the parser yet, most entries are empty. -## 5. Implementing the Parser +## 4. Implementing the parser Now bring your parser to life and define the attributes you want to extract. One important caveat to consider is the type of content on a particular page. Some news outlets feature live tickers, displaying podcasts, or hub sites that link to other pages but are not articles themselves. -At this stage, there's no need to concern yourself with handling non-article pages. -our parser should concentrate on extracting desired attributes from most pages that can be classified as articles. +At this stage, there's no need to concern yourself with handling non-article pages. +Your parser should concentrate on extracting the desired attributes from most pages that can be classified as articles. Pages lacking the desired attributes will be filtered out by the library during a later phase of the processing pipeline. You can add attributes by decorating the methods of your parser with the `@attribute` decorator. @@ -337,8 +341,8 @@ There you can locate an attribute named `title`, which precisely corresponds to It is essential to adhere to the specified return types, as they are enforced through our unit tests. While you're welcome to experiment locally, contributions to the repository won't be accepted if your pull request deviates from the guidelines. -**_NOTE:_** -Should you wish to add an attribute not covered in the guidelines, set the `validate` parameter of the attribute decorator to `False`, like this: +> [!NOTE] +> Should you wish to add an attribute not covered in the guidelines, set the `validate` parameter of the attribute decorator to `False`, like this: ``` python @attribute(validate=False) @@ -373,10 +377,10 @@ This is a title This is a title ``` -Fundus will automatically add your decorated attributes as instance attributes to the `article` object during parsing. -Additionally, attributes defined in the attribute guidelines are explicitly defined as `dataclasses.fields`. +Fundus will automatically expose your decorated attributes on the `article` object during parsing. +Attributes defined in the attribute guidelines are additionally available as typed properties of `Article`, each with a default value, so they can be accessed safely even on articles whose parser didn't extract them. -### Extracting Attributes from Precomputed +### Extracting attributes from Precomputed One way to extract useful information from articles rather than placeholders is to utilize the `ld` and `meta` attributes of the `Article`. These attributes are automatically extracted when they are present in the currently parsed HTML. @@ -412,10 +416,11 @@ For instance, to extract the title for an article in The Intercept, we can acces return self.precomputed.ld.get_value_by_key_path(["NewsArticle", "headline"]) ``` -**_NOTE:_** In case a `class` is present in the HTML `meta` tag, it will be appended as a namespace to avoid collisions. -I.e. the content of the following meta tag ` [!NOTE] +> In case a `class` is present in the HTML `meta` tag, it will be appended as a namespace to avoid collisions. +> I.e. the content of the following meta tag ` [!NOTE] +> The nodes are returned in depth-first pre-order. Similarly, you can select based on the `class` attribute of a tag. For instance, selecting all `

` tags with class `A` looks like this. @@ -537,8 +543,9 @@ Output: This is a paragraph with a weird attribute ```` -**_NOTE:_** It's also possible to select solely by the existence of an attribute by omitting the equality. -Sticking to the above example you can simply use `CSSSelector("p[additional-attribute]")` instead. +> [!NOTE] +> It's also possible to select solely by the existence of an attribute by omitting the equality. +> Sticking to the above example you can simply use `CSSSelector("p[additional-attribute]")` instead. #### XPath @@ -546,11 +553,13 @@ Sticking to the above example you can simply use `CSSSelector("p[additional-attr Given the complexity of XPath compared to CSS-Select, we refrain from providing an extensive tutorial here. Instead, we recommend referring to [this](https://devhints.io/xpath) documentation for a translation table and a concise overview of XPath functionalities beyond CSS-Select. -**_NOTE:_** Although it's possible to select nodes using the built-in methods of `lxml.html.HtmlElement`, it's recommended to use the dedicated selectors [`CSSSelect`](https://lxml.de/cssselect.html) and [`XPath`](https://lxml.de/xpathxslt.html), as demonstrated in the above examples. +> [!NOTE] +> Although it's possible to select nodes using the built-in methods of `lxml.html.HtmlElement`, it's recommended to use the dedicated selectors [`CSSSelect`](https://lxml.de/cssselect.html) and [`XPath`](https://lxml.de/xpathxslt.html), as demonstrated in the above examples. -**_NOTE:_** The `fundus/parser/utility.py` module includes several utility functions that can assist you in implementing parser attributes. -Make sure to examine other parsers and consult the [attribute guidelines](attribute_guidelines.md) for specifics on attribute implementation. -We strongly encourage utilizing these utility functions, especially when parsing the `ArticleBody`. +> [!NOTE] +> The `fundus/parser/utility.py` module includes several utility functions that can assist you in implementing parser attributes. +> Make sure to examine other parsers and consult the [attribute guidelines](attribute_guidelines.md) for specifics on attribute implementation. +> We strongly encourage utilizing these utility functions, especially when parsing the `ArticleBody`. ### Extracting the ArticleBody @@ -620,7 +629,7 @@ def free_access(self) -> bool: Usually you can identify a premium article by an indicator within the URL or by using XPath or CSSSelector and selecting the element asking to purchase a subscription to view the article. -### Finishing the Parser +### Finishing the parser Bringing all the above together, the The Intercept Parser now looks like this. @@ -682,7 +691,7 @@ class TheInterceptParser(ParserProxy): ``` -Now, execute the example script from step 4 to validate your implementation. +Now, execute the example script from step 3 to validate your implementation. If the attributes are implemented correctly, they appear in the printout accordingly. ```console @@ -700,7 +709,7 @@ Fundus-Article: - From: The Intercept (2024-06-06 17:16) ``` -## 6. Generate unit tests and update tables +## 5. Generate unit tests and update tables ### Add unit tests @@ -719,7 +728,7 @@ Then in most cases it should be enough to simply run python -m scripts.generate_parser_test_files -p ```` -with being the class name of the `Publisher` your working on. +with being the class name of the `Publisher` you're working on. In our case, we would run: @@ -729,8 +738,9 @@ python -m scripts.generate_parser_test_files -p TheIntercept to generate a unit test for our parser. -Note: If you need to modify your parser slightly after already adding a unit test, there's no need to create a new test case and load a new HTML file. -You can simply run the script with the `-oj` flag. +> [!NOTE] +> If you need to modify your parser slightly after already adding a unit test, there's no need to create a new test case and load a new HTML file. +> You can simply run the script with the `-oj` flag. In our scenario, the command would be: @@ -755,14 +765,14 @@ Now to test your newly added publisher you should run pytest with the following pytest ```` -## 7. Opening a Pull Request +## 6. Opening a pull request 1. Make sure you tested your parser using `pytest`. 2. Run `ruff format src`, `ruff check --fix src`, and `mypy src` with no errors. 3. Push and open a new PR -4. Congratulation and thank you very much. +4. Congratulations and thank you very much. -## 8. Maintaining publishers +## 7. Maintaining publishers Website layouts change over time, so we may occasionally need to update a publisher's parser. If you run into an issue, feel free to correct it and submit a pull request (PR). diff --git a/docs/how_to_contribute.md b/docs/how_to_contribute.md index 7ebf80019..4ef37650d 100644 --- a/docs/how_to_contribute.md +++ b/docs/how_to_contribute.md @@ -29,7 +29,7 @@ If you haven't done this yet or are uncertain, follow these steps: 3. Navigate to the root of the repository. 4. Run `pip install -e .[dev]` -## Known issues: +## Known issues 1. `zsh: no matches found: .[dev]` When using zsh, you have to wrap the optional dependencies in quotes like this: `pip install -e .'[dev]'`. @@ -40,4 +40,5 @@ See [this issue](https://github.com/mu-editor/mu/issues/852#issue-451861103) for 1. [How to add a publisher](how_to_add_a_publisher.md) -**_NOTE:_** If you run into any problems while contributing don't hesitate to ask questions in the [**issue**](https://github.com/flairNLP/fundus/issues) tab. \ No newline at end of file +> [!NOTE] +> If you run into any problems while contributing don't hesitate to ask questions in the [**issue**](https://github.com/flairNLP/fundus/issues) tab. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 6d79a93da..c5423e87c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,4 +88,7 @@ quote-style = "double" filterwarnings = [ "error" ] +markers = [ + "integration: slow integration tests requiring mocked I/O", +] diff --git a/scripts/generate_parser_test_files.py b/scripts/generate_parser_test_files.py index daf49e187..72b387f53 100644 --- a/scripts/generate_parser_test_files.py +++ b/scripts/generate_parser_test_files.py @@ -11,9 +11,9 @@ from fundus.publishers.base_objects import Publisher from fundus.scraping.article import Article from fundus.scraping.filter import RequiresAll -from fundus.scraping.html import WebSource -from fundus.scraping.scraper import BaseScraper -from tests.test_parser import attributes_required_to_cover +from fundus.scraping.pipeline import Pipeline +from fundus.scraping.pipeline.source.web import WebSource +from tests.publishers.test_parser_coverage import attributes_required_to_cover from tests.utility import HTMLTestFile, get_test_case_json, load_html_test_file_mapping logger = create_logger(__name__) @@ -22,11 +22,11 @@ def get_test_article(publisher: Publisher, url: Optional[str] = None) -> Optional[Article]: if url is not None: source = WebSource([url], publisher=publisher) - scraper = BaseScraper(source, parser_mapping={publisher.name: publisher.parser}) - return next(scraper.scrape(error_handling="suppress", extraction_filter=RequiresAll()), None) + pipeline = Pipeline(source, publishers=[publisher]) + return next(pipeline.run(raise_on_error=False, extraction_filter=RequiresAll()), None) crawler = Crawler(publisher) - return next(crawler.crawl(max_articles=1, error_handling="suppress", only_complete=RequiresAll()), None) + return next(crawler.crawl(max_articles=1, only_complete=RequiresAll()), None) def parse_arguments() -> Namespace: diff --git a/scripts/publisher_coverage.py b/scripts/publisher_coverage.py index 4d9acdb59..173ec1495 100644 --- a/scripts/publisher_coverage.py +++ b/scripts/publisher_coverage.py @@ -8,7 +8,7 @@ import sys import traceback from argparse import ArgumentParser -from typing import Any, Callable, List, Optional, Union +from typing import List, Optional from fundus import Crawler, PublisherCollection from fundus.publishers.base_objects import Publisher, PublisherGroup @@ -55,58 +55,37 @@ def main() -> None: crawler: Crawler = Crawler(publisher, delay=0.4, ignore_robots=True) complete_article: Optional[Article] = next( - crawler.crawl( - max_articles=1, timeout=timeout_in_seconds, only_complete=True, error_handling="suppress" - ), + crawler.crawl(max_articles=1, timeout=timeout_in_seconds, only_complete=True), None, ) if complete_article is None: - incomplete_article: Optional[Article] = next( - crawler.crawl( - max_articles=1, timeout=timeout_in_seconds, only_complete=False, error_handling="catch" - ), - None, - ) + try: + incomplete_article: Optional[Article] = next( + crawler.crawl( + max_articles=1, timeout=timeout_in_seconds, only_complete=False, raise_on_error=True + ), + None, + ) + except Exception as exception: + print(f"❌ FAILED: {publisher_name!r} - Encountered exception during crawling") + traceback.print_exception(type(exception), exception, exception.__traceback__, file=sys.stdout) + failed += 1 + continue if incomplete_article is None: print(f"❌ FAILED: {publisher_name!r} - No articles received") - elif incomplete_article.exception is not None: - print( - f"❌ FAILED: {publisher_name!r} - Encountered exception during crawling " - f"(URL: {incomplete_article.html.requested_url})" - ) - traceback.print_exception( - etype=type(incomplete_article.exception), - value=incomplete_article.exception, - tb=incomplete_article.exception.__traceback__, - file=sys.stdout, - ) - else: - - def guard(field, fnc: Callable[[Any], bool] = lambda x: x is not None) -> Union[bool, str]: - """Makes a boolean evaluation of based on and guards exceptions - - Args: - field: The article field to evaluate - fnc: The evaluation function - - Returns: - Either True, False or Exception if isinstance(field, Exception) = True - """ - return fnc(field) if not isinstance(field, Exception) else repr(field) - print( f"❌ FAILED: {publisher_name!r} - No complete articles received " f"(URL of an incomplete article: {incomplete_article.html.requested_url}) with attributes:\n" - f"title: {guard(incomplete_article.title)}\n" - f"plaintext: {guard(incomplete_article.body, bool)}\n" - f"publishing_date: {guard(incomplete_article.publishing_date)}\n" - f"authors: {guard(incomplete_article.authors, bool)}\n" - f"topics: {guard(incomplete_article.topics, bool)}\n" - f"images: {guard(incomplete_article.images, bool)}\n" + f"title: {incomplete_article.title is not None}\n" + f"plaintext: {bool(incomplete_article.body)}\n" + f"publishing_date: {incomplete_article.publishing_date is not None}\n" + f"authors: {bool(incomplete_article.authors)}\n" + f"topics: {bool(incomplete_article.topics)}\n" + f"images: {bool(incomplete_article.images)}\n" ) failed += 1 continue diff --git a/src/fundus/parser/base_parser.py b/src/fundus/parser/base_parser.py index 30f3ab2cf..a54b75ab6 100644 --- a/src/fundus/parser/base_parser.py +++ b/src/fundus/parser/base_parser.py @@ -13,7 +13,6 @@ Dict, Iterator, List, - Literal, Optional, Tuple, Type, @@ -278,7 +277,7 @@ def _base_setup(self, html: str) -> None: doc = lxml.html.document_fromstring(html) self.precomputed = Precomputed(html, doc, get_meta_content(doc), get_ld_content(doc)) - def parse(self, html: str, error_handling: Literal["suppress", "catch", "raise"] = "raise") -> Dict[str, Any]: + def parse(self, html: str, raise_on_error: bool = True) -> Dict[str, Any]: # wipe existing precomputed self._base_setup(html) @@ -294,18 +293,13 @@ def parse(self, html: str, error_handling: Literal["suppress", "catch", "raise"] try: parsed_data[attribute_name] = func() except Exception as err: - if error_handling == "suppress": - parsed_data[attribute_name] = func.__default__ - logger.info( - f"Couldn't parse attribute {attribute_name!r} for " - f"{self.precomputed.meta.get('og:url')!r}: {err!r}" - ) - elif error_handling == "catch": - parsed_data[attribute_name] = err - elif error_handling == "raise": + if raise_on_error: raise err - else: - raise ValueError(f"Invalid value {error_handling!r} for parameter ") + parsed_data[attribute_name] = func.__default__ + logger.info( + f"Couldn't parse attribute {attribute_name!r} for " + f"{self.precomputed.meta.get('og:url')!r}: {err!r}" + ) else: raise TypeError(f"Invalid type for {func}. Only subclasses of 'RegisteredFunction' are allowed") diff --git a/src/fundus/parser/data.py b/src/fundus/parser/data.py index 819869df3..6f82e6300 100644 --- a/src/fundus/parser/data.py +++ b/src/fundus/parser/data.py @@ -20,7 +20,6 @@ Union, overload, ) -from urllib.parse import urljoin, urlparse import lxml.etree import lxml.html @@ -30,7 +29,7 @@ from lxml.etree import XPath, fromstring, tostring from typing_extensions import Self, TypeAlias, deprecated -from fundus.scraping.url import is_valid_url +from fundus.scraping.url import is_valid_url, strip_query_and_fragment from fundus.utils.serialization import ( DataclassSerializationMixin, JSONVal, @@ -457,12 +456,6 @@ def from_ratio( return None -def remove_query_parameters_from_url(url: str) -> str: - if any(parameter_indicator in url for parameter_indicator in ("?", "#")): - return urljoin(url, urlparse(url).path) - return url - - @total_ordering @dataclass class ImageVersion(DataclassSerializationMixin): @@ -475,7 +468,7 @@ class ImageVersion(DataclassSerializationMixin): def __post_init__(self): if not self.type: - url_without_query = remove_query_parameters_from_url(self.url) + url_without_query = strip_query_and_fragment(self.url) self.type = self._parse_type(url_without_query) def _parse_type(self, url: str) -> Optional[str]: diff --git a/src/fundus/parser/utility.py b/src/fundus/parser/utility.py index fbcffd9ab..6565ad788 100644 --- a/src/fundus/parser/utility.py +++ b/src/fundus/parser/utility.py @@ -46,7 +46,6 @@ LinkedDataMapping, TextSequence, ) -from fundus.scraping.url import is_valid_url from fundus.utils.regex import _get_match_dict from fundus.utils.serialization import JSONVal @@ -607,15 +606,6 @@ def parse_title_from_root(root: lxml.html.HtmlElement) -> Optional[str]: return strip_nodes_to_text(title_node) -def preprocess_url(url: str, domain: str) -> str: - url = re.sub(r"\\/", "/", url) - # Some publishers use relative URLs - if not is_valid_url(url): - publisher_domain = "https://" + domain - url = urljoin(publisher_domain, url) - return url - - def image_author_parsing(authors: Union[str, List[str]]) -> List[str]: credit_keywords = [ "Источник", diff --git a/src/fundus/publishers/base_objects.py b/src/fundus/publishers/base_objects.py index 83fd20fc2..10ce798c2 100644 --- a/src/fundus/publishers/base_objects.py +++ b/src/fundus/publishers/base_objects.py @@ -67,7 +67,10 @@ def read(self) -> None: " Defaulting to disallow all." ) self.disallow_all = True - elif 400 <= err.response.status_code < 500: + else: + # Any other HTTP error — a 4xx without a robots.txt, or a 5xx server error — + # leaves us with no retrievable rules, so default to allow-all rather than an + # unset parser state. (Inside this except, raise_for_status guarantees >= 400.) self.allow_all = True else: self.parse(response.text.splitlines()) @@ -231,6 +234,9 @@ def source_types(self) -> Set[Type[URLSource]]: def __str__(self) -> str: return f"{self.name}" + def serialize(self) -> str: + return self.name + def __hash__(self) -> int: return hash(self.name) diff --git a/src/fundus/scraping/article.py b/src/fundus/scraping/article.py index a64502bc0..95db316a9 100644 --- a/src/fundus/scraping/article.py +++ b/src/fundus/scraping/article.py @@ -1,6 +1,6 @@ from datetime import datetime from textwrap import TextWrapper, dedent -from typing import Any, Dict, List, Mapping, Optional +from typing import Any, ClassVar, Dict, List, Optional, Tuple, TypedDict, cast import langdetect import lxml.html @@ -9,89 +9,138 @@ from fundus.logging import create_logger from fundus.parser import ArticleBody, Image from fundus.scraping.html import HTML -from fundus.utils.serialization import JSONVal, is_jsonable +from fundus.utils.serialization import JSONVal, serialize_value logger = create_logger(__name__) -class AttributeView: - def __init__(self, key: str, extraction: Mapping[str, Any]): - self.ref = extraction - self.key = key +class Extraction(TypedDict, total=False): + """Schema for the narrowly-typed subset of extraction keys. - def __get__(self, instance: object, owner: type): - return self.ref[self.key] + Parsers may pass additional keys; those live in __extraction__ alongside these + and are exposed via __getattr__ with type Any. Only the keys declared here are + type-checked at the property accessors. + """ - def __set__(self, obj, value): - # For now, this is read-only - raise AttributeError("attribute is read only") + # TODO: once PEP 728 (https://peps.python.org/pep-0728/) is accepted and supported + # by our mypy version, inherit from typing_extensions.TypedDict and add the + # `extra_items=Any` parameter. That lets us drop the `_narrow` cast workaround and + # annotate __init__ kwargs as `**extraction: Unpack[Extraction]` while still + # accepting parser-specific extras. + + title: Optional[str] + body: Optional[ArticleBody] + authors: List[str] + publishing_date: Optional[datetime] + topics: List[str] + free_access: bool + images: List[Image] class Article: - __extraction__: Mapping[str, Any] = {} + """A parsed news article: the source HTML plus the parser's extracted attributes. + + Declared attributes (title, body, authors, publishing_date, topics, free_access, + images) are exposed as type-checked properties; any extra keys a parser returns are + accessible as read-only attributes via __getattr__. Derived properties (plaintext, + lang, publisher) are computed on access. Use to_json() to export selected fields. + """ + + DEFAULT_EXPORT_FIELDS: ClassVar[Tuple[str, ...]] = ( + "title", + "authors", + "publishing_date", + "topics", + "free_access", + "body", + "images", + "plaintext", + "lang", + "publisher", + ) + + def __init__(self, *, html: HTML, **extraction: Any) -> None: + """Build an article from its source HTML and the parser's extracted attributes. + + Args: + html (HTML): The source document the article was parsed from. + **extraction (Any): Attributes produced by the parser (e.g. title, body, + authors). Declared keys are surfaced through typed properties; any + additional keys are exposed as read-only attributes via __getattr__. - def __init__(self, *, html: HTML, exception: Optional[Exception] = None, **extraction: Any) -> None: + """ self.html = html - self.exception = exception - self.__extraction__ = extraction + self.__extraction__: Dict[str, Any] = extraction - # create descriptors for attributes that aren't pre-defined as properties. - for attribute in extraction.keys(): - if not hasattr(self, attribute): - setattr(self, attribute, AttributeView(attribute, self.__extraction__)) + @property + def _narrow(self) -> Extraction: + """View of __extraction__ restricted to the narrowly-typed schema. + + Storage stays Dict[str, Any] because the dict legitimately holds parser-extras + outside the schema. This cast applies the schema only where it's true: at the + narrow accessors below. + """ + return cast(Extraction, self.__extraction__) @property def title(self) -> Optional[str]: - return self.__extraction__.get("title") + return self._narrow.get("title") @property def body(self) -> Optional[ArticleBody]: - return self.__extraction__.get("body") + return self._narrow.get("body") @property def authors(self) -> List[str]: - return self.__extraction__.get("authors", []) + return self._narrow.get("authors", []) @property def publishing_date(self) -> Optional[datetime]: - return self.__extraction__.get("publishing_date") + return self._narrow.get("publishing_date") @property def topics(self) -> List[str]: - return self.__extraction__.get("topics", []) + return self._narrow.get("topics", []) @property def free_access(self) -> bool: - return self.__extraction__.get("free_access", False) + return self._narrow.get("free_access", False) @property def images(self) -> List[Image]: - return self.__extraction__.get("images", []) + return self._narrow.get("images", []) @property def publisher(self) -> str: return self.html.source_info.publisher - def __getattribute__(self, item: str): - if (attribute := object.__getattribute__(self, item)) and hasattr(attribute, "__get__"): - return attribute.__get__(self, type(self)) - return attribute - - def __setattr__(self, key: str, value: object): - if hasattr(self, key): - # we can't use getattr here, because it would invoke __get__, so unfortunately no default value - attribute = object.__getattribute__(self, key) - if hasattr(attribute, "__set__"): - attribute.__set__(key, value) - return - object.__setattr__(self, key, value) + def __getattr__(self, item: str) -> Any: + """Expose parser-extra extraction keys as read-only attributes; raise AttributeError otherwise. - def __getattr__(self, item: str): - raise AttributeError(f"{type(self).__name__!r} object has no attribute {str(item)!r}") + Only invoked when normal attribute lookup fails. + """ + # Read from __dict__ directly to avoid infinite recursion when __extraction__ itself isn't + # set yet (e.g., during unpickling before __setstate__ restores instance state). + extraction = self.__dict__.get("__extraction__") + if extraction is None or item not in extraction: + raise AttributeError(f"{type(self).__name__!r} object has no attribute {item!r}") + return extraction[item] + + def __setattr__(self, key: str, value: object) -> None: + """Block writes to extraction-backed attributes; allow all others.""" + # During __init__, html/__extraction__ are assigned before __extraction__ exists; + # check via __dict__ to avoid triggering __getattr__. + extraction = self.__dict__.get("__extraction__") + if extraction is not None and key in extraction: + raise AttributeError(f"attribute {key!r} is read only") + object.__setattr__(self, key, value) @property def plaintext(self) -> Optional[str]: - return str(self.body) or None if not isinstance(self.body, Exception) else None + body = self.body + if body is None or isinstance(body, Exception): + return None + return str(body) or None @property def lang(self) -> Optional[str]: @@ -104,53 +153,39 @@ def lang(self) -> Optional[str]: logger.debug(f"Unable to detect language for article {self.html.responded_url!r}") # use @lang attribute of tag as fallback - if not language or language == langdetect.detector_factory.Detector.UNKNOWN_LANG: + if (not language or language == langdetect.detector_factory.Detector.UNKNOWN_LANG) and self.html.content: language = lxml.html.fromstring(self.html.content).get("lang") if language and "-" in language: language = language.split("-")[0] return language - def to_json(self, *attributes: str) -> Dict[str, JSONVal]: - """Converts article object into a JSON serializable dictionary. - - One can specify which attributes should be included by passing attribute names as parameters. - Default: title, plaintext, authors, publishing_date, topics, free_access + unvalidated attributes + def to_json(self, *fields: str) -> Dict[str, JSONVal]: + """Export selected article fields as a JSON-compatible dict. Args: - *attributes: The attributes to serialize. Default: see docstring. + *fields: Field names to export. Each must resolve to an attribute of this + article (a built-in property or an extraction key). If empty, + DEFAULT_EXPORT_FIELDS is used. Pass "html" to include the source + document with its provenance metadata. Returns: - A json serializable dictionary - """ - - # default value for attributes - if not attributes: - attributes = tuple(set(self.__extraction__.keys()) - {"meta", "ld"}) + A JSON-serializable dict. Key order matches the order of . - def serialize(v: Any) -> JSONVal: - if hasattr(v, "serialize"): - return v.serialize() # type: ignore[no-any-return] - elif isinstance(v, datetime): - return str(v) - elif not is_jsonable(v): - raise TypeError(f"Attribute {attribute!r} of type {type(v)!r} is not JSON serializable") - return v # type: ignore[no-any-return] - - serialization: Dict[str, JSONVal] = {} - for attribute in attributes: - if not hasattr(self, attribute): - continue - value = getattr(self, attribute) - - if isinstance(value, list): - serialization[attribute] = [serialize(item) for item in value] - else: - serialization[attribute] = serialize(value) - - return serialization + Raises: + KeyError: If a requested field is not present on this article. + TypeError: If a value's type has no defined serialization. + """ + selected = fields or self.DEFAULT_EXPORT_FIELDS + output: Dict[str, JSONVal] = {} + for field in selected: + if not hasattr(self, field): + raise KeyError(field) + output[field] = serialize_value(getattr(self, field), field) + return output def __str__(self): + """Render a compact, human-readable summary (title, truncated text, URL, publisher, date).""" # the subsequent indent here is a bit wacky, but textwrapper.dedent won't work with tabs, so we have to use # whitespaces instead. title_wrapper = TextWrapper(width=80, max_lines=1, initial_indent="") diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py deleted file mode 100644 index ebb9a8236..000000000 --- a/src/fundus/scraping/crawler.py +++ /dev/null @@ -1,872 +0,0 @@ -from __future__ import annotations - -import contextlib -import gzip -import json -import logging.config -import multiprocessing -import os -import random -import re -import time -import traceback -from abc import ABC, abstractmethod -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime -from functools import lru_cache, partial, wraps -from multiprocessing import Manager -from multiprocessing.context import TimeoutError -from multiprocessing.managers import BaseManager -from multiprocessing.pool import MapResult, Pool, ThreadPool -from pathlib import Path -from queue import Empty, Full, Queue -from threading import current_thread -from typing import ( - Any, - Callable, - Dict, - Generic, - Iterator, - List, - Literal, - Optional, - Pattern, - Set, - Tuple, - Type, - TypeVar, - Union, - cast, -) - -import dill -import fastwarc.stream_io -import more_itertools -import requests -import urllib3.exceptions -from dateutil.rrule import MONTHLY, rrule -from more_itertools import roundrobin -from tqdm import tqdm -from typing_extensions import ParamSpec, TypeAlias - -from fundus.logging import create_logger, get_current_config -from fundus.parser.data import remove_query_parameters_from_url -from fundus.publishers.base_objects import FilteredPublisher, Publisher, PublisherGroup -from fundus.scraping.article import Article -from fundus.scraping.delay import Delay -from fundus.scraping.filter import ExtractionFilter, Requires, RequiresAll, URLFilter -from fundus.scraping.html import CCNewsSource -from fundus.scraping.scraper import CCNewsScraper, WebScraper -from fundus.scraping.session import CrashThread, session_handler -from fundus.scraping.url import URLSource -from fundus.utils.events import __EVENTS__ -from fundus.utils.timeout import Timeout - -logger = create_logger(__name__) - -__MAIN_THREAD_ALIAS__ = "main-thread" - -_T = TypeVar("_T") -_P = ParamSpec("_P") - -PublisherType: TypeAlias = Union[Publisher, PublisherGroup] - - -class RemoteException(Exception): - pass - - -class TQDMManager(BaseManager): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.register("_tqdm", tqdm) - - def tqdm(self, *args, **kwargs) -> tqdm: - return getattr(self, "_tqdm")(*args, **kwargs) - - -@contextlib.contextmanager -def get_proxy_tqdm(*args, **kwargs) -> tqdm: - """ - This functions returns a proxy to a tqdm instance. Init args are the same as for any other tqdm instance. - :param args: tqdm args - :param kwargs: tqdm kwargs - :return: a self-managed, proxied tqdm instance - """ - manager = TQDMManager() - try: - manager.start() - yield manager.tqdm(*args, **kwargs) - finally: - manager.shutdown() - - -# noinspection PyPep8Naming -class dill_wrapper(Generic[_P, _T]): - def __init__(self, target: Callable[_P, _T]): - """Wraps function in dill serialization. - - This is in order to use unpickable functions within multiprocessing. - - Args: - target: The function to wrap. - """ - self._serialized_target: bytes = dill.dumps(target) - - @lru_cache - def _deserialize(self) -> Callable[_P, _T]: - return cast(Callable[_P, _T], dill.loads(self._serialized_target)) - - def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: - return self._deserialize()(*args, **kwargs) - - -def get_execution_context(): - """ - Determines whether the current execution context is in a thread or process. - Returns: - context (str): "thread" or "process" - ident (int): Thread ID or Process ID - """ - if multiprocessing.current_process().name != "MainProcess": - process = multiprocessing.current_process() - return process.name, process.ident - else: - thread = current_thread() - return thread.name, thread.ident - - -def publisher_context_wrapper(func: Callable[[Publisher], None]) -> Callable[[Publisher], None]: - """Wraps a callable to register an ``__EVENTS__`` alias context for the publisher argument. - - The alias is entered as the very first thing the thread does and stays alive for the - entire call — including any exception handling in the caller — so that - ``__EVENTS__.get_alias`` always resolves while the thread is running. - - Args: - func: A callable whose first positional argument is a :class:`Publisher`. - - Returns: - The wrapped callable. - """ - - @wraps(func) - def wrapper(publisher: Publisher) -> None: - with __EVENTS__.context(publisher.name): - func(publisher) - - return wrapper - - -def queue_wrapper( - queue: Queue[Union[_T, Exception]], - target: Callable[_P, Iterator[_T]], - silenced_exceptions: Tuple[Type[BaseException], ...] = (), -) -> Callable[_P, None]: - """Wraps the target callable to add its results to the queue instead of returning them directly. - - Args: - queue: The buffer queue. - target: A target callable. - silenced_exceptions: Exception types that should be silenced - - Returns: - (Callable[_P, None]) The wrapped target. - """ - - @wraps(target) - def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: - def _guarded_put(obj: _T) -> bool: - """Safely putting results on the queue avoiding deadlocks""" - while True: - try: - # We use nowait here to avoid a deadlock on the put when the pool is already shutting down - # and therefore the queue never will never be free. - queue.put_nowait(obj) - except Full: - if __EVENTS__.is_event_set("stop", __MAIN_THREAD_ALIAS__): - return False - time.sleep(0.05) - else: - return True - - def _process_target(): - """Iterate over and put results into """ - for obj in target(*args, **kwargs): - if not _guarded_put(obj): - return - - try: - _process_target() - except silenced_exceptions: - pass - except Exception as err: - tb_str = "".join(traceback.TracebackException.from_exception(err).format()) - context, ident = get_execution_context() - alias = __EVENTS__.get_alias(ident, "") - queue.put( - RemoteException( - f"There was a(n) {type(err).__name__!r} occurring in {context} " - f"with ident {ident} ({alias})\n{tb_str}" - ) - ) - - logger.debug(f"Encountered remote exception in thread {ident} ({alias}): {err!r}") - - return wrapper - - -def pool_queue_iter(handle: MapResult[Any], queue: Queue[Union[_T, Exception]]) -> Iterator[_T]: - """Utility function to iterate exhaustively over a pool queue. - - The underlying iterator of this function repeatedly exhausts the given queue. - Then, if the queue is empty only if all the pool's jobs have finished, the iterator reruns. - Otherwise, it waits for the queue to be populated with the next result from the pool. - - Args: - handle: A handle of the MappedResult of the underling multiprocessing pool. - queue: The pool queue. - - Returns: - Iterator[_T]: The iterator over the queue as it is populated. - """ - - def _exception_guard() -> _T: - if isinstance(nxt := queue.get_nowait(), Exception): - raise Exception("There was an exception occurring in a remote thread/process") from nxt - return nxt - - while True: - try: - yield _exception_guard() - except Empty: - try: - handle.get(timeout=0.01) - except TimeoutError: - # listen for stop-event set for main-thread - if __EVENTS__.is_event_set("stop", __MAIN_THREAD_ALIAS__): - __EVENTS__.clear_event("stop", __MAIN_THREAD_ALIAS__) - break - continue - - # empty queue and look for exception - while not queue.empty(): - yield _exception_guard() - - return - - -def random_sleep(func: Callable[_P, _T], between: Tuple[float, float]) -> Callable[_P, _T]: - @wraps(func) - def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: - time.sleep(random.uniform(*between)) - return func(*args, **kwargs) - - return wrapper - - -class CrawlerBase(ABC): - def __init__(self, *publishers: PublisherType): - self.publishers: List[Union[Publisher, FilteredPublisher]] = list(set(more_itertools.collapse(publishers))) - if not self.publishers: - raise ValueError("param of must include at least one publisher.") - - @abstractmethod - def _build_article_iterator( - self, - publishers: Tuple[Publisher, ...], - error_handling: Literal["suppress", "catch", "raise"], - extraction_filter: Optional[ExtractionFilter], - url_filter: Optional[URLFilter], - language_filter: Optional[List[str]], - skip_publishers_disallowing_training: bool = False, - ) -> Iterator[Article]: - raise NotImplementedError - - def crawl( - self, - max_articles: Optional[int] = None, - max_articles_per_publisher: Optional[int] = None, - timeout: Optional[int] = None, - error_handling: Literal["suppress", "catch", "raise"] = "suppress", - only_complete: Union[bool, ExtractionFilter] = Requires("title", "body", "publishing_date"), - url_filter: Optional[URLFilter] = None, - language_filter: Optional[List[str]] = None, - only_unique: bool = True, - save_to_file: Union[None, str, Path] = None, - skip_publishers_disallowing_training: bool = False, - ) -> Iterator[Article]: - """Yields articles from initialized scrapers - - Args: - max_articles (Optional[int]): Number of articles to crawl. If there are fewer articles - than max_articles the Iterator will stop before max_articles. If None, all retrievable - articles are returned. Defaults to None. - max_articles_per_publisher: Specify the number of articles to crawl per publisher. - Disables . Defaults to None. - timeout (Optional[int]): timeout (Optional[int]): Specifies the duration in seconds the crawler - will wait without receiving any articles before stopping. If set <= 0, or if not provided, - the crawler will run until all sources are exhausted. Defaults to None. - error_handling (Literal["suppress", "catch", "raise"]): Define how to handle errors - encountered during extraction. If set to "suppress", all errors will be skipped, either - with None values for respective attributes in the extraction or by skipping entire articles. - If set to "catch", errors will be caught as attribute values or, if an entire article fails, - through Article.exception. If set to "raise" all errors encountered during extraction will - be raised. Defaults to "suppress". - only_complete (Union[bool, ExtractionFilter]): Set a callable satisfying the ExtractionFilter - protocol as an extraction filter or use a boolean. If False, all articles will be yielded, - if True, only those with all attributes extracted. Defaults to ExtractionFilter letting - through all articles with at least title, body, and publishing_date set. - url_filter (Optional[URLFilter]): A callable object satisfying the URLFilter protocol to skip - URLs before download. This filter applies on both requested and responded URL. Defaults to None. - language_filter (Optional[List[str]]): A set of language codes to filter the articles by. If set, - articles of different languages will be skipped and not counted towards the article count. Defaults - to None. - only_unique (bool): If set to True, articles yielded will be unique on the responded URL. - Always returns the first encountered article. Defaults to True. - save_to_file (Union[None, str, Path]): If set, the crawled articles will be collected saved to the - specified file as a JSON list. - skip_publishers_disallowing_training (bool): If set to True, publishers that disallow training - are skipped. Note that this is an indicator only and users with the intention of using Fundus to gather - training data should always check the publisher's terms of use beforehand. - - Returns: - Iterator[Article]: An iterator yielding objects of type Article. - """ - - if max_articles == 0: - return - - max_articles = max_articles or -1 - timeout = timeout or -1 - - if max_articles_per_publisher: - if timeout < 120: - print( - "It is recommended to set a minimum of 120 seconds when using max_articles_per_publisher." - ) - max_articles = -1 - - def build_extraction_filter() -> Optional[ExtractionFilter]: - if isinstance(only_complete, bool): - return None if only_complete is False else RequiresAll() - else: - return only_complete - - response_cache: Set[str] = set() - - extraction_filter = build_extraction_filter() - fitting_publishers: List[Union[Publisher, FilteredPublisher]] = [] - - if isinstance(extraction_filter, Requires): - for publisher in self.publishers: - supported_attributes = set( - more_itertools.flatten( - collection.names for collection in publisher.parser.attribute_mapping.values() - ) - ) - if missing_attributes := extraction_filter.required_attributes - supported_attributes: - logger.warning( - f"The required attribute(s) `{', '.join(missing_attributes)}` " - f"is(are) not supported by {publisher.name}. Skipping publisher" - ) - elif language_filter and not publisher.supports(languages=language_filter): - logger.warning( - f"None of the required language(s) `{', '.join(language_filter)}` " - f"is(are) supported by {publisher.name}. Skipping publisher" - ) - else: - fitting_publishers.append(publisher) - - if not fitting_publishers: - logger.error( - f"Could not find any fitting publishers for required attributes " - f"`{', '.join(extraction_filter.required_attributes)}`" - ) - return - else: - fitting_publishers = self.publishers - - # check if there are filtered publishers and if so, adopt their language restrictions - publisher_language_filter = set() - for publisher in fitting_publishers: - if isinstance(publisher, FilteredPublisher): - publisher_language_filter.update(publisher.language_filter) - - if language_filter and publisher_language_filter: - language_filter = list(set(language_filter).union(publisher_language_filter)) - logger.info( - f"Publisher language filter: {publisher_language_filter} will be added to the given language filter: " - f"{language_filter}. " - ) - elif publisher_language_filter: - language_filter = list(publisher_language_filter) - logger.info(f"Publisher language filter: {publisher_language_filter} will be used as the language filter. ") - - article_count: Dict[str, int] = defaultdict(int) - crawled_articles: Dict[str, List[Article]] = defaultdict(list) - - # Unfortunately we relly on this little workaround here to terminate the 'Pool' used within - # the 'CCNewsCrawler'. The 'Timeout' contextmanager utilizes '_thread.interrupt_main', - # throwing a KeyboardInterrupt in the main thread after

- - Args: - url: The url the evaluation should be based on. - - Returns: - bool: True if an should be filtered out and not - considered for extraction, False otherwise. - - """ - ... + def __call__(self, url: str) -> bool: ... def regex_filter(regex: str) -> URLFilter: + """Returns a URLFilter that filters out URLs matching `regex`.""" + pattern = re.compile(regex) + def url_filter(url: str) -> bool: - return bool(re.search(regex, url)) + return bool(pattern.search(url)) return url_filter class SupportsBool(Protocol): + """Anything convertible to bool; the return type of an ExtractionFilter call.""" + def __bool__(self) -> bool: ... class ExtractionFilter(Protocol): - """Protocol to define filters used after article extraction. + """Callable protocol for filters applied after article extraction. + + A truthy return value excludes the article; falsy keeps it — intentionally + inverse to Python's built-in filter(). - Filters satisfying this protocol should work inverse to build in filter(), - so that True gets filtered and False don't. + Example — exclude articles whose body is shorter than 500 characters:: + + def min_body_length(extraction: Dict[str, Any]) -> bool: + body = extraction.get("body") + return not body or len(str(body)) < 500 """ def __call__(self, extraction: Dict[str, Any]) -> SupportsBool: - """This should implement a selection based on . - - Extracted will be a dictionary returned by a parser mapping the attribute - names of the parser to the extracted values. + """Evaluate the extraction and return whether it should be filtered out. Args: - extraction: The extracted values the evaluation - should be based on. + extraction: Maps attribute names to their extracted values, as returned + by a parser. Attributes absent from the article are not present in the dict. Returns: - bool: True if extraction should be filtered out, False otherwise. - + A truthy value to exclude the article, falsy to keep it. """ ... class FilterResultWithMissingAttributes: + """Return value of Requires.__call__. Truthy when one or more attributes are missing or falsy.""" + def __init__(self, *attributes: str) -> None: self.missing_attributes = attributes @@ -118,7 +91,8 @@ def __bool__(self) -> bool: return bool(self.missing_attributes) -def _guarded_bool(value: Any): +def _eval_unless_bool(value: Any) -> bool: + """Booleans always pass; only non-boolean values are evaluated with bool().""" if isinstance(value, bool): return True else: @@ -126,50 +100,60 @@ def _guarded_bool(value: Any): class Requires: - def __init__(self, *required_attributes: str, eval_booleans: bool = True) -> None: - """Class to filter extractions based on attribute values + """Filters extractions based on the presence and truthiness of named attributes. - If a required_attribute is not present in the extracted data or evaluates to bool() -> False, - this filter won't be passed. By default, required boolean attributes are evaluated with bool(). + When called with an extraction dict, returns a FilterResultWithMissingAttributes + that is truthy if any required attribute is absent, falsy, or an Exception. + With no required attributes specified, all keys in the extraction are evaluated. - I.e., + By default, boolean attributes are evaluated with bool(): - Requires("free_access")({"free_access": False}) -> will be filtered out + Requires("free_access")({"free_access": False}) # filtered out - You can alter this behaviour by setting `eval_bool=False` + Set eval_booleans=False to let boolean values pass unconditionally: - I.e., + Requires("free_access", eval_booleans=False)({"free_access": False}) # passes - Requires("free_access", eval_bool=False)({"free_access": False}) -> will pass + Args: + *required_attributes: Attributes that must be present and truthy. If none are + given, all keys in the extraction are evaluated. + eval_booleans: If True, boolean values are evaluated with bool(). If False, + boolean values always pass. Defaults to True. + """ - Args: - *required_attributes: Attributes required to evaluate to True in order to - pass the filter. If no attributes are given, all attributes will be evaluated - eval_bool: If True the boolean values will also be evaluated with bool(). - If False, all boolean values evaluate to True. Defaults to True. - """ + def __init__(self, *required_attributes: str, eval_booleans: bool = True) -> None: self.required_attributes = set(required_attributes) # somehow mypy does not recognize bool as callable :( - self._eval: Callable[[Any], bool] = bool if eval_booleans else _guarded_bool # type: ignore[assignment] + self._eval: Callable[[Any], bool] = bool if eval_booleans else _eval_unless_bool # type: ignore[assignment] + + def _is_missing(self, value: Any) -> bool: + """True if value is absent, falsy, or an Exception.""" + return not self._eval(value) or isinstance(value, Exception) def __call__(self, extraction: Dict[str, Any]) -> FilterResultWithMissingAttributes: - missing_attributes = [ - attribute - for attribute in self.required_attributes or extraction.keys() - if not self._eval(value := extraction.get(attribute)) or isinstance(value, Exception) - ] + """Evaluate the extraction against the required attributes. + + Args: + extraction: A dictionary mapping attribute names to their extracted values. + + Returns: + FilterResultWithMissingAttributes that is truthy if any required attribute + is absent, falsy, or an Exception. + """ + attributes = self.required_attributes if self.required_attributes else extraction.keys() + missing_attributes = [attribute for attribute in attributes if self._is_missing(extraction.get(attribute))] return FilterResultWithMissingAttributes(*missing_attributes) class RequiresAll(Requires): - def __init__(self, eval_booleans: bool = False) -> None: - """Name wrap for Requires(eval_booleans=False) + """Requires all attributes in the extraction to be present and truthy. - This is for readability only. By default, it requires all non-boolean attributes of the extraction - to evaluate to True. Set `eval_booleans=True` to include boolean values in the evaluation as well. - See class:Requires docstring for more information. + Equivalent to Requires() with no specified attributes, but with eval_booleans=False + by default so boolean attributes are not counted as missing regardless of their value. - Args: - eval_booleans: See Requires docstring for more information. Defaults to False. - """ + Args: + eval_booleans: If True, boolean values are also evaluated. Defaults to False. + """ + + def __init__(self, eval_booleans: bool = False) -> None: super().__init__(eval_booleans=eval_booleans) diff --git a/src/fundus/scraping/html.py b/src/fundus/scraping/html.py index 33e0517ff..da1e70666 100644 --- a/src/fundus/scraping/html.py +++ b/src/fundus/scraping/html.py @@ -1,337 +1,76 @@ -import time -from dataclasses import dataclass -from datetime import datetime -from typing import Callable, Dict, Iterable, Iterator, List, Optional, Protocol, Union -from urllib.parse import urlparse +from __future__ import annotations -import chardet -import requests -from curl_cffi.requests.exceptions import ConnectionError, HTTPError, ReadTimeout -from fastwarc import ArchiveIterator, WarcRecord, WarcRecordType +from dataclasses import dataclass, fields +from datetime import datetime +from typing import Dict -from fundus.logging import create_logger -from fundus.publishers.base_objects import Publisher, Robots -from fundus.scraping.delay import Delay -from fundus.scraping.filter import URLFilter -from fundus.scraping.session import _default_header, session_handler -from fundus.scraping.url import URLSource, is_valid_url -from fundus.utils.events import __EVENTS__ +from fundus.utils.serialization import JSONVal, serialize_value __all__ = [ "HTML", "SourceInfo", - "WarcSourceInfo", - "WebSourceInfo", - "HTMLSource", - "WebSource", - "CCNewsSource", ] -logger = create_logger(__name__) - - -@dataclass(frozen=True) -class HTML: - requested_url: str - responded_url: str - content: str - crawl_date: datetime - source_info: "SourceInfo" - @dataclass(frozen=True) class SourceInfo: - publisher: str - - -@dataclass(frozen=True) -class WarcSourceInfo(SourceInfo): - warc_path: str - warc_headers: Dict[str, str] - http_headers: Dict[str, str] - - -@dataclass(frozen=True) -class WebSourceInfo(SourceInfo): - type: str - url: str + """Provenance metadata for an HTML record. + The base form carries only the publisher's name; needs to be pickable. Per-backend + subclasses (WebSourceInfo, WarcSourceInfo) add their own origin fields. -class HTMLSource(Protocol): - def fetch(self, url_filter: Optional[URLFilter] = None) -> Iterator[HTML]: ... + Attributes: + publisher (str): The publisher's name (its identity). + """ + publisher: str -class _Clock: - def __init__( - self, delay: Optional[Delay], sleep: Callable[[float], None] = time.sleep, warm_start: bool = True - ) -> None: - """Utility class for time-aligned delay. - - Keeps track of the time passed since last call or init and waits at most the remaining delay. - - Args: - delay: A customized delay. - sleep: A customized sleep function. Defaults to . - warm_start: If true, skips first delay. - """ - self.delay = delay - self.timestamp = time.time() - - if warm_start and self.delay is not None: - self.timestamp -= self.delay() - - self.sleep = sleep - - def __call__(self, blocking: bool = True) -> bool: - """Waits at most seconds since last called. - - Args: - blocking: If True, blocks until seconds have elapsed since last call. - If non-blocking returns False if less time has elapsed, else returns True. + def serialize(self) -> Dict[str, JSONVal]: + """Serialize all dataclass fields to a JSON-compatible dict. - Returns: True if seconds have elapsed since last call. False otherwise. + Subclasses inherit this unchanged and automatically pick up their extra fields, + since it reflects over the dataclass fields rather than naming them explicitly. + Returns: + Dict[str, JSONVal]: Field name to JSON-serializable value for every field. """ - if self.delay is None: - return True - - if delay := max(0.0, self.delay() - time.time() + self.timestamp): - if not blocking: - return False - self.sleep(delay) - self.reset() - return True - - def reset(self): - self.timestamp = time.time() - - -class WebSource: - def __init__( - self, - url_source: Union[URLSource, Iterable[str]], - publisher: Publisher, - url_filter: Optional[URLFilter] = None, - query_parameters: Optional[Dict[str, str]] = None, - delay: Optional[Delay] = None, - ignore_robots: bool = False, - ignore_crawl_delay: bool = False, - impersonate: bool = False, - ): - self.url_source = url_source - self.publisher = publisher - self.url_filter = url_filter - self.query_parameters = query_parameters or {} - self._impersonate_profile = publisher.impersonate if impersonate else None - - # parse robots: - self.robots: Optional[Robots] = None - if not ignore_robots: - self.robots = self.publisher.robots - - if not ignore_crawl_delay: - if robots_delay := self.robots.crawl_delay(self.publisher.request_header.get("user-agent", "*")): - logger.debug( - f"Found crawl-delay of {robots_delay} seconds in robots.txt for {self.publisher.name}. " - f"Overwriting existing delay." - ) - - def delay() -> float: - return robots_delay - - self.clock = _Clock(delay=delay, sleep=self._sleep) - - @property - def _is_stopped(self): - return __EVENTS__.is_event_set("stop") - - @staticmethod - def _sleep(s: float): - __EVENTS__.get("stop").wait(s) + return {f.name: serialize_value(getattr(self, f.name)) for f in fields(self)} - def _fetch_html(self, url: str, url_filter: URLFilter) -> Optional[HTML]: - # check if URL is malformed - if not is_valid_url(url): - logger.debug(f"Skipped requested URL {url!r} because the URL is malformed") - return None - # apply URL filter to requested URL - if url_filter(url): - logger.debug(f"Skipped requested URL {url!r} because of URL filter") - return None - - # check robots - if not ( - self.robots is None or self.robots.can_fetch(self.publisher.request_header.get("user-agent", "*"), url) - ): - logger.debug(f"Skipped requested URL {url!r} because of robots.txt") - return None - - session = session_handler.get_session(self._impersonate_profile) - - # prepare query parameters - for key, value in self.query_parameters.items(): - if "?" in url: - url += "&" + key + "=" + value - else: - url += "?" + key + "=" + value - - # apply crawl-delay - self.clock() - - # fetch html - try: - response = session.get_with_interrupt(url, headers=self.publisher.request_header) - - except (HTTPError, ConnectionError, ReadTimeout) as error: - logger.warning(f"Skipped requested URL {url!r} because of {error!r}") - if isinstance(error, HTTPError) and error.response.status_code >= 500: - logger.warning(f"Skipped {self.publisher.name!r} due to server errors: {error!r}") - return None - - # apply URL filter to responded URL - if url_filter(str(response.url)): - logger.debug(f"Skipped responded URL {str(response.url)!r} because of URL filter") - return None - - html = response.text - - # check for redirects - if response.history: - logger.info(f"Got redirected {len(response.history)} time(s) from {url!r} -> {response.url!r}") - - # create WebSourceInfo - source_info = ( - WebSourceInfo(self.publisher.name, type(self.url_source).__name__, self.url_source.url) - if isinstance(self.url_source, URLSource) - else SourceInfo(self.publisher.name) - ) - - # create HTML - return HTML( - requested_url=url, - responded_url=str(response.url), - content=html, - crawl_date=datetime.now(), - source_info=source_info, - ) - - def _build_url_filter(self, url_filter: Optional[URLFilter]) -> URLFilter: - combined_filters: List[URLFilter] = ([self.url_filter] if self.url_filter else []) + ( - [url_filter] if url_filter else [] - ) - - def combined_url_filter(url: str) -> bool: - return any(f(url) for f in combined_filters) +@dataclass(frozen=True) +class HTML: + """A fetched HTML document together with its URLs, crawl time, and source provenance. - return combined_url_filter + The unit of exchange between the Source and Pipeline layers: a Source yields HTML, + the Pipeline parses it into an Article. Frozen so it can be shared/pickled safely. - def fetch(self, url_filter: Optional[URLFilter] = None) -> Iterator[HTML]: - if isinstance(self.url_source, URLSource): - url_iterator = self.url_source.fetch( - session_handler.get_session(self._impersonate_profile), - self.publisher.request_header, - ) - else: - url_iterator = iter(self.url_source) + Attributes: + requested_url (str): The URL that was requested. + responded_url (str): The final URL after redirects (equals requested_url when none). + content (str): The decoded HTML body. + crawl_date (datetime): When the document was fetched (or its WARC record date). + source_info (SourceInfo): Provenance metadata describing where the record came from. + """ - while not self._is_stopped: - try: - # check iterator - if (url := next(url_iterator, None)) is None: - return - except Exception as error: - logger.error( - f"Warning! URLSource {self.url_source!r} crashed because of an unexpected error: {error!r}" - ) - return + requested_url: str + responded_url: str + content: str + crawl_date: datetime + source_info: SourceInfo - try: - if html := self._fetch_html(url, self._build_url_filter(url_filter)): - yield html - except Exception as error: - logger.error(f"Warning! Skipped requested URL {url!r} because of an unexpected error {error!r}") - continue + def serialize(self) -> Dict[str, JSONVal]: + """Serialize the record to a JSON-compatible dict. + The crawl date is ISO-formatted and the source info is serialized via its own + serialize(); all other fields are emitted as-is. -class CCNewsSource: - def __init__(self, *publishers: Publisher, warc_path: str, headers: Optional[Dict[str, str]] = None): - self.publishers = publishers - self.warc_path = warc_path - self.headers = headers or _default_header - self._publisher_mapping: Dict[str, Publisher] = { - urlparse(publisher.domain).netloc: publisher for publisher in self.publishers + Returns: + Dict[str, JSONVal]: The record's fields as JSON-serializable values. + """ + return { + "requested_url": self.requested_url, + "responded_url": self.responded_url, + "content": self.content, + "crawl_date": self.crawl_date.isoformat(), + "source_info": self.source_info.serialize(), } - - def fetch(self, url_filter: Optional[URLFilter] = None) -> Iterator[HTML]: - def extract_content(record: WarcRecord) -> Optional[str]: - warc_body: bytes = record.reader.read() - - try: - return str(warc_body, encoding=record.http_charset) # type: ignore[arg-type] - except (UnicodeDecodeError, TypeError): - encoding: Optional[str] = chardet.detect(warc_body)["encoding"] - - if encoding is not None: - logger.debug( - f"Trying to decode record {record.record_id!r} from {target_url!r} " - f"using detected encoding {encoding}." - ) - - try: - return str(warc_body, encoding=encoding) - except UnicodeDecodeError: - logger.warning( - f"Couldn't decode record {record.record_id!r} from {target_url!r} with " - f"original charset {record.http_charset!r} using detected charset {encoding!r}." - ) - else: - logger.warning( - f"Couldn't detect charset for record {record.record_id!r} from {target_url!r} " - f"with invalid original charset {record.http_charset!r}." - ) - - return None - - with requests.Session() as session: - response = session.get(self.warc_path, stream=True, headers=self.headers) - response.raise_for_status() - - for warc_record in ArchiveIterator(response.raw, record_types=WarcRecordType.response, verify_digests=True): - if not warc_record.record_date: - continue - - target_url = str(warc_record.headers["WARC-Target-URI"]) - - if url_filter is not None and url_filter(target_url): - logger.debug(f"Skipped WARC record with target URI {target_url!r} because of URL filter") - continue - - publisher_domain: str = urlparse(target_url).netloc - - if publisher_domain not in self._publisher_mapping: - continue - - publisher = self._publisher_mapping[publisher_domain] - - if publisher.url_filter is not None and publisher.url_filter(target_url): - logger.debug( - f"Skipped WARC record with target URI {target_url!r} because of publisher specific URL filter" - ) - continue - - if (content := extract_content(warc_record)) is None: - continue - - yield HTML( - requested_url=target_url, - responded_url=target_url, - content=content, - crawl_date=warc_record.record_date, - source_info=WarcSourceInfo( - publisher=publisher.name, - warc_path=self.warc_path, - warc_headers=dict(warc_record.headers), - http_headers=dict(warc_record.http_headers or {}), - ), - ) diff --git a/src/fundus/scraping/pipeline/__init__.py b/src/fundus/scraping/pipeline/__init__.py new file mode 100644 index 000000000..e9f2d2a35 --- /dev/null +++ b/src/fundus/scraping/pipeline/__init__.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +from typing import Collection, Dict, Iterator, List, Optional, Protocol + +from fundus.logging import create_logger +from fundus.parser import ParserProxy +from fundus.publishers.base_objects import Publisher +from fundus.scraping.article import Article +from fundus.scraping.filter import ExtractionFilter, FilterResultWithMissingAttributes, URLFilter +from fundus.scraping.html import HTML, SourceInfo + +logger = create_logger(__name__) + +__all__ = [ + "HTML", + "SourceInfo", + "HTMLSource", + "Pipeline", + "PipelineError", +] + + +class HTMLSource(Protocol): + """Protocol for HTML producers: yields HTML records, optionally gated by a URL filter. + + Implemented by WebSource (live web) and CCNewsSource (CC-NEWS WARC archive). + """ + + def fetch(self, url_filter: Optional[URLFilter] = None) -> Iterator[HTML]: + """Stream HTML records from the underlying source. + + Args: + url_filter (Optional[URLFilter]): Per-call URL filter; a truthy result skips the URL. + Combined with any source- or publisher-level filter by the implementor. + + Yields: + HTML: One record per kept/fetched URL. + + """ + ... + + +class PipelineError(Exception): + """Raised when an error occurs during a pipeline run.""" + + pass + + +class Pipeline: + """Pairs an HTMLSource with publisher parsers, turning each fetched HTML into an Article. + + Pulls HTML from the source, looks up the parser for the HTML's publisher by name, parses it, + and applies the extraction and language filters. HTML that fails parsing or any filter is dropped. + """ + + def __init__(self, source: HTMLSource, publishers: Collection[Publisher]) -> None: + """Build a pipeline over the given source and the parsers of the supplied publishers. + + Args: + source (HTMLSource): The HTML producer to pull records from. + publishers (Collection[Publisher]): Publishers whose parsers may be needed to process + the source's HTML. Each HTML is re-associated with a parser by its publisher name. + + """ + self.source = source + # Identity -> parser. The parser is behavior and can't ride on the (picklable) HTML, so + # each HTML carries only its publisher's name and we re-associate the parser here. + self._parsers: Dict[str, ParserProxy] = {publisher.name: publisher.parser for publisher in publishers} + + def _extract( + self, + html: HTML, + raise_on_error: bool, + extraction_filter: Optional[ExtractionFilter] = None, + language_filter: Optional[List[str]] = None, + ) -> Optional[Article]: + """Parse one HTML into an Article, or None if parsing fails or a filter drops it.""" + if (parser := self._parsers.get(html.source_info.publisher)) is None: + raise PipelineError( + f"No parser for publisher {html.source_info.publisher!r}; " + f"pipeline was built for {sorted(self._parsers)}" + ) + + try: + extraction = parser(html.crawl_date).parse(html.content, raise_on_error) + + except Exception as error: + if raise_on_error: + error_message = f"Run into an error processing article {html.requested_url!r}" + logger.error(error_message) + error.args = (str(error) + "\n\n" + error_message,) + raise + logger.info(f"Skipped article at {html.requested_url!r} because of: {error!r}") + return None + + else: + if extraction_filter and (filter_result := extraction_filter(extraction)): + if isinstance(filter_result, FilterResultWithMissingAttributes): + logger.debug( + f"Skipped article at {html.requested_url!r} because attribute(s) " + f"{', '.join(filter_result.missing_attributes)!r} is(are) missing" + ) + else: + logger.debug(f"Skipped article at {html.requested_url!r} because of extraction filter") + return None + + article = Article(html=html, **extraction) + if language_filter and article.lang not in language_filter: + logger.debug( + f"Skipped article at {html.requested_url!r} because article language " + f"{article.lang!r} is not in allowed languages: {language_filter!r}" + ) + return None + + return article + + def run( + self, + raise_on_error: bool, + extraction_filter: Optional[ExtractionFilter] = None, + url_filter: Optional[URLFilter] = None, + language_filter: Optional[List[str]] = None, + ) -> Iterator[Article]: + """Stream Articles by fetching HTML from the source and parsing each record. + + Args: + raise_on_error (bool): If True, parser exceptions propagate; if False they are logged + and the offending article is skipped. + extraction_filter (Optional[ExtractionFilter]): Applied after extraction; a truthy + result drops the article. + url_filter (Optional[URLFilter]): Forwarded to the source's fetch() to skip URLs before + they are downloaded/parsed. + language_filter (Optional[List[str]]): If set, articles whose detected language is not + in this list are skipped. + + Yields: + Article: One per HTML record that parses and passes all filters. + + """ + for html in self.source.fetch(url_filter=url_filter): + if article := self._extract(html, raise_on_error, extraction_filter, language_filter): + yield article diff --git a/src/fundus/scraping/pipeline/source/__init__.py b/src/fundus/scraping/pipeline/source/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/fundus/scraping/pipeline/source/ccnews.py b/src/fundus/scraping/pipeline/source/ccnews.py new file mode 100644 index 000000000..04b2f9ec8 --- /dev/null +++ b/src/fundus/scraping/pipeline/source/ccnews.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, Iterator, Optional +from urllib.parse import urlparse + +import chardet +import requests +import urllib3.exceptions +from fastwarc import ArchiveIterator, WarcRecord, WarcRecordType +from fastwarc.stream_io import StreamError + +from fundus.logging import create_logger +from fundus.publishers.base_objects import Publisher +from fundus.scraping.filter import URLFilter +from fundus.scraping.html import HTML, SourceInfo +from fundus.scraping.session import _default_header + +logger = create_logger(__name__) + + +class WarcFileLoadError(Exception): + """Raised when a CC-NEWS WARC archive cannot be downloaded or its stream is corrupt or truncated.""" + + +@dataclass(frozen=True) +class WarcSourceInfo(SourceInfo): + """Origin metadata attached to an HTML record extracted from a CC-NEWS WARC archive. + + Attributes: + warc_path (str): HTTPS URL of the WARC archive the record came from. + warc_headers (Dict[str, str]): WARC envelope headers (e.g. WARC-Target-URI, WARC-Date). + http_headers (Dict[str, str]): HTTP response headers captured by the original crawl. + """ + + warc_path: str + warc_headers: Dict[str, str] + http_headers: Dict[str, str] + + +class CCNewsSource: + """HTML source backed by a single CC-NEWS WARC archive on Common Crawl. + + Streams the archive once, walks its response records, and yields HTML for those records whose + target URI matches one of the configured publishers' domains. Unlike WebSource, there is no + per-URL network request: the archive contains pages already crawled by Common Crawl, so this + source is effectively a selection-and-decode pipeline over a pre-fetched corpus. + """ + + def __init__(self, *publishers: Publisher, warc_path: str, headers: Optional[Dict[str, str]] = None) -> None: + """Initialize a source over a single CC-NEWS WARC archive. + + Args: + *publishers (Publisher): Publishers whose articles should be extracted. WARC records + whose target URI does not belong to any of these publishers' domains are dropped + during iteration. + warc_path (str): HTTPS URL of the WARC archive to read (e.g. a CC-NEWS .warc.gz path). + headers (Optional[Dict[str, str]]): Request headers for the WARC download. Defaults to + the shared fundus user-agent header. + + """ + self.publishers = publishers + self.warc_path = warc_path + self.headers = headers or _default_header + self._publisher_mapping: Dict[str, Publisher] = { + urlparse(publisher.domain).netloc: publisher for publisher in self.publishers + } + + @staticmethod + def _extract_content(record: WarcRecord, target_url: str) -> Optional[str]: + """Decode the WARC body using the declared charset, falling back to chardet detection.""" + warc_body: bytes = record.reader.read() + try: + return str(warc_body, encoding=record.http_charset) # type: ignore[arg-type] + except (UnicodeDecodeError, TypeError): + encoding: Optional[str] = chardet.detect(warc_body)["encoding"] + if encoding is not None: + logger.debug( + f"Trying to decode record {record.record_id!r} from {target_url!r} " + f"using detected encoding {encoding}." + ) + try: + return str(warc_body, encoding=encoding) + except UnicodeDecodeError: + logger.warning( + f"Couldn't decode record {record.record_id!r} from {target_url!r} with " + f"original charset {record.http_charset!r} using detected charset {encoding!r}." + ) + else: + logger.warning( + f"Couldn't detect charset for record {record.record_id!r} from {target_url!r} " + f"with invalid original charset {record.http_charset!r}." + ) + return None + + def _validate(self, target_url: str, url_filter: Optional[URLFilter]) -> Optional[Publisher]: + """Return the matching publisher, or None if the URL should be skipped.""" + if url_filter is not None and url_filter(target_url): + logger.debug(f"Skipped WARC record with target URI {target_url!r} because of URL filter") + return None + publisher = self._publisher_mapping.get(urlparse(target_url).netloc) + if publisher is None: + return None + if publisher.url_filter is not None and publisher.url_filter(target_url): + logger.debug(f"Skipped WARC record with target URI {target_url!r} because of publisher specific URL filter") + return None + return publisher + + def _record_to_html(self, record: WarcRecord, url_filter: Optional[URLFilter]) -> Optional[HTML]: + """Validate, decode, and assemble a single WARC record. Returns None if skipped.""" + record_date = record.record_date + if record_date is None: + return None + target_url = str(record.headers["WARC-Target-URI"]) + if (publisher := self._validate(target_url, url_filter)) is None: + return None + if (content := self._extract_content(record, target_url)) is None: + return None + return HTML( + requested_url=target_url, + responded_url=target_url, + content=content, + crawl_date=record_date, + source_info=WarcSourceInfo( + publisher=publisher.name, + warc_path=self.warc_path, + warc_headers=dict(record.headers), + http_headers=dict(record.http_headers or {}), + ), + ) + + def _open_stream(self) -> requests.Response: + """Open a streaming GET against the WARC archive. Wraps transport errors in WarcFileLoadError.""" + try: + session = requests.Session() + response = session.get(self.warc_path, stream=True, headers=self.headers) + response.raise_for_status() + return response + except (requests.HTTPError, urllib3.exceptions.HTTPError) as error: + raise WarcFileLoadError(f"{type(error).__name__}: {error}") from error + + @staticmethod + def _iter_warc_records(response: requests.Response) -> Iterator[WarcRecord]: + """Iterate WARC response records from the open stream. Wraps StreamError in WarcFileLoadError.""" + try: + yield from ArchiveIterator(response.raw, record_types=WarcRecordType.response, verify_digests=True) + except StreamError as error: + raise WarcFileLoadError(f"{type(error).__name__}: {error}") from error + + def fetch(self, url_filter: Optional[URLFilter] = None) -> Iterator[HTML]: + """Stream HTML records from the configured WARC archive. + + Walks every response record in the archive, keeps those whose target URI matches a + configured publisher and passes the URL filters, decodes the body, and yields the + resulting HTML record. + + Args: + url_filter (Optional[URLFilter]): Per-call URL filter applied in addition to each + publisher's own url_filter. Truthy means skip the URL. + + Yields: + HTML: One record per kept WARC entry. + + Raises: + WarcFileLoadError: If the archive cannot be downloaded or the WARC stream is corrupt. + + """ + response = self._open_stream() + for record in self._iter_warc_records(response): + if (html := self._record_to_html(record, url_filter)) is not None: + yield html diff --git a/src/fundus/scraping/pipeline/source/web.py b/src/fundus/scraping/pipeline/source/web.py new file mode 100644 index 000000000..a3712e8a0 --- /dev/null +++ b/src/fundus/scraping/pipeline/source/web.py @@ -0,0 +1,298 @@ +from __future__ import annotations + +import threading +import time +from dataclasses import dataclass +from datetime import datetime +from typing import Callable, Dict, Iterable, Iterator, List, Optional, Union +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit + +from curl_cffi.requests import Response +from curl_cffi.requests.exceptions import ConnectionError, HTTPError, ReadTimeout + +from fundus.logging import create_logger +from fundus.publishers.base_objects import Publisher, Robots +from fundus.scraping.delay import Delay +from fundus.scraping.filter import URLFilter +from fundus.scraping.html import HTML, SourceInfo +from fundus.scraping.session import session_handler +from fundus.scraping.url import URLSource, is_valid_url + +logger = create_logger(__name__) + + +@dataclass(frozen=True) +class WebSourceInfo(SourceInfo): + """Origin metadata attached to an HTML record fetched via a URLSource. + + Attributes: + type (str): Class name of the URLSource that produced the URL (e.g. "RSSFeed", "Sitemap"). + url (str): The feed/sitemap URL the article was discovered from. + """ + + type: str + url: str + + +class _Pacer: + """Per-source rate limiter. Sleeps as needed so consecutive calls are at least `delay` apart.""" + + def __init__( + self, delay: Optional[Delay], sleep: Callable[[float], object] = time.sleep, warm_start: bool = True + ) -> None: + """Build a pacer with the given delay and sleep function. warm_start=True skips sleep on first call.""" + self.delay = delay + self.timestamp = time.time() + if warm_start and self.delay is not None: + self.timestamp -= self.delay() + self.sleep = sleep + + def __call__(self) -> None: + """Sleep just long enough to enforce the configured delay since the last call, then reset.""" + if self.delay is None: + return + if delay := max(0.0, self.delay() - time.time() + self.timestamp): + self.sleep(delay) + self.reset() + + def reset(self) -> None: + """Mark the current time as the last call; the next call will wait the full delay from now.""" + self.timestamp = time.time() + + +class WebSource: + """HTML source backed by live HTTP requests over a URLSource (RSSFeed/Sitemap/NewsMap) or any iterable of URLs. + + Iterates URLs one at a time, applies URL filters and robots.txt, rate-limits requests via an + internal pacer, and fetches each URL through an InterruptableSession. Yields one HTML record per + successful fetch. Honors a cooperative stop_event for early cancellation. + """ + + def __init__( + self, + url_source: Union[URLSource, Iterable[str]], + publisher: Publisher, + url_filter: Optional[URLFilter] = None, + query_parameters: Optional[Dict[str, str]] = None, + delay: Optional[Delay] = None, + ignore_robots: bool = False, + ignore_crawl_delay: bool = False, + impersonate: bool = False, + stop_event: Optional[threading.Event] = None, + ): + """Initialize a source that fetches HTML from URLs produced by a URLSource or any iterable. + + Args: + url_source (Union[URLSource, Iterable[str]]): A URLSource (RSSFeed/Sitemap/NewsMap) or + any iterable of URL strings. URLSource instances are passed the publisher's session + and request headers when iterated. + publisher (Publisher): Publisher the URLs belong to. Provides request headers, robots, + impersonate profile, and the publisher-level URL filter. + url_filter (Optional[URLFilter]): Source-level URL filter, OR-combined with any + per-call filter passed to fetch(). Truthy means skip the URL. + query_parameters (Optional[Dict[str, str]]): Query parameters appended to every URL + before it is requested. Existing query strings are preserved. + delay (Optional[Delay]): Per-request delay (seconds). Overridden by robots.txt + crawl-delay unless ignore_crawl_delay=True. + ignore_robots (bool): If True, skip robots.txt checks (both can_fetch and crawl-delay). + ignore_crawl_delay (bool): If True, keep the supplied delay even when robots.txt + declares its own crawl-delay. + stop_event (Optional[threading.Event]): Cooperative-cancellation flag. When set, any + in-flight sleep is interrupted and the source stops iterating URLs. + + """ + self.url_source = url_source + self.publisher = publisher + self.url_filter = url_filter + self.query_parameters = query_parameters or {} + self._impersonate_profile = publisher.impersonate if impersonate else None + self.robots: Optional[Robots] = None if ignore_robots else self.publisher.robots + self.stop_event = stop_event + self._delay = delay + self._ignore_crawl_delay = ignore_crawl_delay + # Built lazily on the first request (see _build_pacer): resolving the crawl-delay may + # read robots.txt, and construction must stay free of I/O. + self.pacer: Optional[_Pacer] = None + + # source_info depends only on url_source's type, which is fixed at construction time. + self.source_info: SourceInfo = ( + WebSourceInfo(publisher.name, type(url_source).__name__, url_source.url) + if isinstance(url_source, URLSource) + else SourceInfo(publisher.name) + ) + + @staticmethod + def _resolve_delay( + robots: Optional[Robots], + user_agent: str, + supplied_delay: Optional[Delay], + ignore_crawl_delay: bool, + publisher_name: str = "", + ) -> Optional[Delay]: + """Return the effective per-request delay. + + Robots' crawl_delay (if present) overrides supplied_delay; the override is skipped + when robots is None or ignore_crawl_delay is True or robots has no crawl_delay set. + """ + if robots is None or ignore_crawl_delay: + return supplied_delay + robots_delay = robots.crawl_delay(user_agent) + if robots_delay is None: + return supplied_delay + logger.debug( + f"Found crawl-delay of {robots_delay} seconds in robots.txt for {publisher_name}. " + f"Overwriting existing delay." + ) + return lambda: robots_delay + + def _build_pacer(self) -> _Pacer: + """Resolve the effective delay (may read robots.txt) and build the rate limiter. + + Deferred out of __init__ so construction performs no I/O; called on the first request. + """ + user_agent = self.publisher.request_header.get("user-agent", "*") + resolved_delay = self._resolve_delay( + self.robots, user_agent, self._delay, self._ignore_crawl_delay, publisher_name=self.publisher.name + ) + # stop_event.wait makes the per-request delay interruptable; time.sleep does not. + sleep: Callable[[float], object] + if self.stop_event is None: + sleep = time.sleep + else: + sleep = self.stop_event.wait + return _Pacer(delay=resolved_delay, sleep=sleep) + + @staticmethod + def _apply_query_parameters(url: str, params: Dict[str, str]) -> str: + """Append query parameters to a URL, preserving existing ones and URL-encoding values.""" + if not params: + return url + parts = urlsplit(url) + existing = parse_qsl(parts.query, keep_blank_values=True) + new_query = urlencode([*existing, *params.items()]) + return urlunsplit(parts._replace(query=new_query)) + + @property + def _is_stopped(self) -> bool: + """True if a stop_event was supplied and has been set.""" + return self.stop_event is not None and self.stop_event.is_set() + + def _pre_validate(self, url: str, url_filter: URLFilter) -> bool: + """Return True if the URL is fit to request. Logs the reason and returns False otherwise.""" + if not is_valid_url(url): + logger.debug(f"Skipped requested URL {url!r} because the URL is malformed") + return False + if url_filter(url): + logger.debug(f"Skipped requested URL {url!r} because of URL filter") + return False + user_agent = self.publisher.request_header.get("user-agent", "*") + if self.robots is not None and not self.robots.can_fetch(user_agent, url): + logger.debug(f"Skipped requested URL {url!r} because of robots.txt") + return False + return True + + def _request(self, url: str) -> Optional[Response]: + """Sleep on the pacer, then GET the URL. Returns None on request error.""" + session = session_handler.get_session(self._impersonate_profile) + pacer = self.pacer + if pacer is None: + pacer = self.pacer = self._build_pacer() + pacer() + try: + return session.get_with_interrupt(url, headers=self.publisher.request_header) + except (HTTPError, ConnectionError, ReadTimeout) as error: + logger.warning(f"Skipped requested URL {url!r} because of {error!r}") + return None + + @staticmethod + def _post_validate(response: Response, url_filter: URLFilter) -> bool: + """Return True if the response should be kept. Logs the reason and returns False otherwise.""" + if url_filter(str(response.url)): + logger.debug(f"Skipped responded URL {str(response.url)!r} because of URL filter") + return False + return True + + def _build_html(self, requested_url: str, response: Response) -> HTML: + """Assemble the HTML record from a successful response.""" + return HTML( + requested_url=requested_url, + responded_url=str(response.url), + content=response.text, + crawl_date=datetime.now(), + source_info=self.source_info, + ) + + def _fetch_one(self, url: str, url_filter: URLFilter) -> Optional[HTML]: + """Run the full per-URL pipeline: pre-validate, request, post-validate, build. None if skipped.""" + if not self._pre_validate(url, url_filter): + return None + url = self._apply_query_parameters(url, self.query_parameters) + response = self._request(url) + if response is None: + return None + if not self._post_validate(response, url_filter): + return None + return self._build_html(url, response) + + def _iter_urls(self) -> Iterator[str]: + """Yield URLs from the configured source, swallowing iterator crashes with a warning.""" + if isinstance(self.url_source, URLSource): + source_iter: Iterator[str] = self.url_source.fetch( + session_handler.get_session(self._impersonate_profile), + self.publisher.request_header, + ) + else: + source_iter = iter(self.url_source) + while True: + try: + url = next(source_iter, None) + except Exception as error: + logger.error( + f"Warning! URLSource {self.url_source!r} crashed because of an unexpected error: {error!r}" + ) + return + if url is None: + return + yield url + + def _build_url_filter(self, url_filter: Optional[URLFilter]) -> URLFilter: + """Combine source-level and per-call URL filters with logical OR. Returns a pass-through if both are None.""" + combined: List[URLFilter] = ([self.url_filter] if self.url_filter else []) + ( + [url_filter] if url_filter else [] + ) + + def combined_url_filter(url: str) -> bool: + return any(f(url) for f in combined) + + return combined_url_filter + + def fetch(self, url_filter: Optional[URLFilter] = None) -> Iterator[HTML]: + """Stream HTML records by iterating url_source and fetching each URL. + + Each URL is gated by the combined URL filter (source-level OR per-call), robots.txt, and + rate-limited by the configured delay. Per-URL request errors (HTTP / connection / timeout) + are logged and skipped; the iteration continues. If stop_event is set, iteration short- + circuits at the next boundary. + + Args: + url_filter (Optional[URLFilter]): Per-call URL filter, OR-combined with the source's + own url_filter. Truthy means skip the URL. + + Yields: + HTML: One record per successfully fetched URL. + + """ + combined_filter = self._build_url_filter(url_filter) + url_iterator = self._iter_urls() + # Check the stop event BEFORE advancing the iterator: pulling the next URL from a + # URLSource triggers its feed/sitemap download, so a stopped source must return without + # ever touching its iterator — otherwise every remaining source is fetched after stop. + while not self._is_stopped: + url = next(url_iterator, None) + if url is None: + return + try: + if html := self._fetch_one(url, combined_filter): + yield html + except Exception as error: + logger.error(f"Warning! Skipped requested URL {url!r} because of an unexpected error {error!r}") diff --git a/src/fundus/scraping/scraper.py b/src/fundus/scraping/scraper.py deleted file mode 100644 index b11a0d2b4..000000000 --- a/src/fundus/scraping/scraper.py +++ /dev/null @@ -1,114 +0,0 @@ -from typing import Dict, Iterator, List, Literal, Optional, Type - -import more_itertools - -from fundus.logging import create_logger -from fundus.parser import ParserProxy -from fundus.publishers.base_objects import Publisher -from fundus.scraping.article import Article -from fundus.scraping.delay import Delay -from fundus.scraping.filter import ( - ExtractionFilter, - FilterResultWithMissingAttributes, - URLFilter, -) -from fundus.scraping.html import CCNewsSource, HTMLSource, WebSource -from fundus.scraping.url import URLSource - -logger = create_logger(__name__) - - -class BaseScraper: - def __init__(self, *sources: HTMLSource, parser_mapping: Dict[str, ParserProxy]): - self.sources = sources - self.parser_mapping = parser_mapping - - def scrape( - self, - error_handling: Literal["suppress", "catch", "raise"], - extraction_filter: Optional[ExtractionFilter] = None, - url_filter: Optional[URLFilter] = None, - language_filter: Optional[List[str]] = None, - ) -> Iterator[Article]: - for source in self.sources: - for html in source.fetch(url_filter=url_filter): - parser = self.parser_mapping[html.source_info.publisher] - - try: - extraction = parser(html.crawl_date).parse(html.content, error_handling) - - except Exception as error: - if error_handling == "raise": - error_message = f"Run into an error processing article {html.requested_url!r}" - logger.error(error_message) - error.args = (str(error) + "\n\n" + error_message,) - raise error - elif error_handling == "catch": - yield Article(html=html, exception=error) - elif error_handling == "suppress": - logger.info(f"Skipped article at {html.requested_url!r} because of: {error!r}") - else: - raise ValueError(f"Unknown value {error_handling!r} for parameter '") - - else: - if extraction_filter and (filter_result := extraction_filter(extraction)): - if isinstance(filter_result, FilterResultWithMissingAttributes): - logger.debug( - f"Skipped article at {html.requested_url!r} because attribute(s) " - f"{', '.join(filter_result.missing_attributes)!r} is(are) missing" - ) - else: - logger.debug(f"Skipped article at {html.requested_url!r} because of extraction filter") - else: - article = Article(html=html, **extraction) - if language_filter and article.lang not in language_filter: - logger.debug( - f"Skipped article at {html.requested_url!r} because article language: " - f"{article.lang!r} is not in allowed languages: {language_filter!r}" - ) - else: - yield article - - -class WebScraper(BaseScraper): - def __init__( - self, - publisher: Publisher, - restrict_sources_to: Optional[List[Type[URLSource]]] = None, - delay: Optional[Delay] = None, - ignore_robots: bool = False, - ignore_crawl_delay: bool = False, - impersonate: bool = False, - ): - if restrict_sources_to: - url_sources = tuple( - more_itertools.flatten( - publisher.source_mapping[source_type] - for source_type in restrict_sources_to - if source_type in publisher.source_mapping - ) - ) - else: - url_sources = tuple(more_itertools.flatten(publisher.source_mapping.values())) - - html_sources = [ - WebSource( - url_source=url_source, - publisher=publisher, - delay=delay, - url_filter=publisher.url_filter, - query_parameters=publisher.query_parameter, - ignore_robots=ignore_robots, - ignore_crawl_delay=ignore_crawl_delay, - impersonate=impersonate, - ) - for url_source in url_sources - ] - parser_mapping: Dict[str, ParserProxy] = {publisher.name: publisher.parser} - super().__init__(*html_sources, parser_mapping=parser_mapping) - - -class CCNewsScraper(BaseScraper): - def __init__(self, source: CCNewsSource): - parser_mapping: Dict[str, ParserProxy] = {publisher.name: publisher.parser for publisher in source.publishers} - super().__init__(source, parser_mapping=parser_mapping) diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index 444ae7071..dbe4d96b1 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -1,8 +1,12 @@ from __future__ import annotations +import random import re import threading +import time from contextlib import contextmanager +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime from queue import Empty, Queue from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union from urllib.parse import urljoin @@ -23,12 +27,14 @@ class CrashThread(BaseException): - """Is raised to end a thread without relying on the thread ending naturally""" + """Raised to terminate a thread without waiting for it to exit naturally.""" pass class _RequestTask(NamedTuple): + """A unit of work handed to the session worker thread: the URL, request kwargs, and a queue for the result.""" + url: str kwargs: Any result_queue: Queue[Union[curl_cffi.requests.Response, Exception]] @@ -69,14 +75,34 @@ class InterruptableSession(curl_cffi.requests.Session[curl_cffi.requests.Respons The daemon thread owns the curl handle for the lifetime of the session, enabling connection reuse across requests. get_with_interrupt() submits work to the daemon thread and polls for a stop event every second, raising CrashThread if interrupted. + 5xx responses are retried in place with interruptable exponential backoff (see + get_with_interrupt); an exhausted retry surfaces as a normal HTTPError. """ - def __init__(self, **kwargs: Any) -> None: - # use_thread_local_curl=True gives the worker thread its own curl handle, separate - # from the caller thread's handle closed in close(). Prevents close() from touching - # a handle that may still be in use by the worker. + def __init__( + self, + *, + max_retries: int = 3, + retry_backoff_base: float = 1.0, + retry_backoff_cap: float = 30.0, + **kwargs: Any, + ) -> None: + """Start the persistent worker thread; forwards kwargs to curl_cffi.Session. + + use_thread_local_curl is forced on so the worker thread gets its own curl handle, + separate from the caller thread's handle that close() tears down; otherwise close() + could touch a handle still in use by the worker. + + Args: + max_retries (int): Number of additional attempts for 5xx responses (0 disables retrying). + retry_backoff_base (float): Base for the full-jitter exponential backoff between retries (seconds). + retry_backoff_cap (float): Upper bound on a single backoff wait, including Retry-After (seconds). + """ kwargs.pop("use_thread_local_curl", None) super().__init__(use_thread_local_curl=True, **kwargs) + self.max_retries = max_retries + self.retry_backoff_base = retry_backoff_base + self.retry_backoff_cap = retry_backoff_cap self._closed = False self._task_queue: Queue[Optional[_RequestTask]] = Queue() self._worker_thread = threading.Thread(target=self._worker_loop, name=f"session-worker-{id(self)}", daemon=True) @@ -84,6 +110,7 @@ def __init__(self, **kwargs: Any) -> None: @staticmethod def _log_response(response: curl_cffi.requests.Response) -> None: + """Debug-log the request method, any redirect chain, the final status, and elapsed time.""" history: List[curl_cffi.requests.Response] = object.__getattribute__(response, "_history") method = getattr(getattr(response, "request", None), "method", "GET") if history: @@ -117,6 +144,7 @@ def _follow_redirects(self, url: str, **kwargs: Any) -> curl_cffi.requests.Respo raise TooManyRedirects(f"Exceeded {self.max_redirects} maximum redirects following {url!r}") def _worker_loop(self) -> None: + """Pull tasks off the queue and run each request, returning the response or the raised error; exit on the None sentinel.""" while True: task = self._task_queue.get() if task is None: @@ -126,17 +154,44 @@ def _worker_loop(self) -> None: except Exception as error: task.result_queue.put(error) - def get_with_interrupt(self, url: str, **kwargs: Any) -> curl_cffi.requests.Response: - """Interruptable GET request. + @staticmethod + def _parse_retry_after(value: str) -> Optional[float]: + """Parse a Retry-After header (delta-seconds or HTTP-date) into seconds from now, or None if unparseable.""" + value = value.strip() + if value.isdigit(): + return float(value) + try: + # Raises TypeError (py<3.10) or ValueError (py>=3.10) on unparseable input. + parsed = parsedate_to_datetime(value) + except (TypeError, ValueError): + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return max(0.0, (parsed - datetime.now(timezone.utc)).total_seconds()) + + def _retry_backoff(self, response: curl_cffi.requests.Response, attempt: int) -> float: + """Seconds to wait before retrying: a valid Retry-After (capped) if present, else full-jitter exponential backoff.""" + retry_after = response.headers.get("retry-after") + if retry_after is not None and (parsed := self._parse_retry_after(retry_after)) is not None: + return min(parsed, self.retry_backoff_cap) + window = min(self.retry_backoff_cap, self.retry_backoff_base * 2**attempt) + return random.uniform(0.0, window) - Submits the request to the persistent daemon thread and polls every second - for a stop event. Raises CrashThread if interrupted. When impersonating a - browser, kwargs are dropped so curl_cffi can apply the full browser - fingerprint unmodified. + @staticmethod + def _sleep_with_interrupt(seconds: float, url: str) -> None: + """Sleep up to `seconds`, waking every second to honor the stop event (raises CrashThread if set).""" + deadline = time.monotonic() + seconds + while (remaining := deadline - time.monotonic()) > 0: + if __EVENTS__.is_event_set("stop"): + logger.debug(f"Interrupt backoff before retrying {url!r}") + raise CrashThread(f"Backoff before retrying {url} was interrupted by stop event") + time.sleep(min(1.0, remaining)) + + def _submit_and_wait(self, url: str, request_kwargs: Dict[str, Any]) -> curl_cffi.requests.Response: + """Submit one request to the worker thread and block until a result, polling the stop event each second. + + Raises any exception the worker raised, or CrashThread if the stop event fires while waiting. """ - if self._closed: - raise RuntimeError("Session is closed") - request_kwargs: Dict[str, Any] = {} if self.impersonate else kwargs response_queue: Queue[Union[curl_cffi.requests.Response, Exception]] = Queue() self._task_queue.put(_RequestTask(url, request_kwargs, response_queue)) @@ -150,10 +205,44 @@ def get_with_interrupt(self, url: str, **kwargs: Any) -> curl_cffi.requests.Resp else: if isinstance(response, Exception): raise response - self._log_response(response) - response.raise_for_status() return response + def get_with_interrupt(self, url: str, **kwargs: Any) -> curl_cffi.requests.Response: + """Interruptable GET request with in-place 5xx retry. + + Submits the request to the persistent daemon thread and polls every second + for a stop event. Raises CrashThread if interrupted. When impersonating a + browser, kwargs are dropped so curl_cffi can apply the full browser + fingerprint unmodified. + + A 5xx response is retried up to max_retries times with interruptable + exponential backoff (honoring Retry-After); once retries are exhausted the + status surfaces as a normal HTTPError via raise_for_status. + """ + if self._closed: + raise RuntimeError("Session is closed") + request_kwargs: Dict[str, Any] = {} if self.impersonate else kwargs + + # Hand-rolled rather than curl_cffi's retry=/RetryStrategy: that only retries transport + # exceptions (not 5xx), ignores Retry-After, and sleeps with a blocking time.sleep that the + # stop event can't interrupt. + for attempt in range(self.max_retries + 1): + response = self._submit_and_wait(url, request_kwargs) + self._log_response(response) + if response.status_code >= 500 and attempt < self.max_retries: + backoff = self._retry_backoff(response, attempt) + logger.debug( + f"Server error {response.status_code} for {url!r}; " + f"retry {attempt + 1}/{self.max_retries} in {backoff:.2f}s" + ) + self._sleep_with_interrupt(backoff, url) + continue + response.raise_for_status() + return response + + # Unreachable: the loop either returns or raises on its final iteration. + raise AssertionError("retry loop exited without returning") + def close(self) -> None: """Signal the worker thread to exit and close this thread's curl handle. @@ -174,9 +263,15 @@ class SessionHandler: session, the old session is closed and replaced. """ - DEFAULT_SESSION_KWARGS: Dict[str, Any] = {"timeout": 30} + DEFAULT_SESSION_KWARGS: Dict[str, Any] = { + "timeout": 30, + "max_retries": 3, + "retry_backoff_base": 1.0, + "retry_backoff_cap": 30.0, + } def __init__(self) -> None: + """Initialize the per-thread session registry with the default session kwargs.""" self._session_kwargs: Dict[str, Any] = dict(self.DEFAULT_SESSION_KWARGS) self._context_lock = threading.RLock() self._sessions: Dict[int, InterruptableSession] = {} diff --git a/src/fundus/scraping/url.py b/src/fundus/scraping/url.py index f38fdb498..cf3b79f66 100644 --- a/src/fundus/scraping/url.py +++ b/src/fundus/scraping/url.py @@ -4,24 +4,25 @@ import lzma from abc import ABC, abstractmethod from dataclasses import dataclass, field -from functools import cached_property, partial from typing import ( + Any, Callable, ClassVar, Dict, Iterable, Iterator, List, + NamedTuple, Optional, Pattern, Set, + Tuple, ) -from urllib.parse import unquote, urlparse +from urllib.parse import unquote, urljoin, urlparse import feedparser -import lxml.html from curl_cffi.requests.exceptions import ConnectionError, HTTPError, ReadTimeout -from lxml.etree import XMLParser, XPath +from lxml.etree import XMLParser, XPath, fromstring from fundus.logging import create_logger from fundus.scraping.filter import URLFilter, inverse @@ -30,89 +31,43 @@ logger = create_logger(__name__) -class CompressionFormat: - def __init__( - self, name: str, decompression: Optional[Callable[[bytes], bytes]] = None, *, byte_mask: Optional[bytes] = None - ) -> None: - self.name = name - self.decompression = decompression - self.byte_mask = byte_mask - - def match(self, compressed_content: bytes) -> bool: - if self.byte_mask: - return compressed_content.startswith(self.byte_mask) - return False - - def __call__(self, compressed_content: bytes) -> bytes: - if self.decompression is None: - raise NotImplementedError(f"Decompression not implemented for {self.name!r}") - return self.decompression(compressed_content) - - def __repr__(self): - if self.decompression is None: - return f"{self.name} -- Not implemented" - return self.name - - -class CompressionFormats: - GZIP = CompressionFormat("gzip", gzip.decompress, byte_mask=b"\x1f\x8b") - BZ2 = CompressionFormat("bz2", bz2.decompress, byte_mask=b"\x42\x5a") - ZIP = CompressionFormat("zip", byte_mask=b"PK\x03\x04") - LZMA = CompressionFormat("lzma", lzma.decompress, byte_mask=b"\x28\xb5\x2f\xfd") - - @classmethod - def iter_formats(cls) -> Iterator[CompressionFormat]: - for obj in cls.__dict__.values(): - if isinstance(obj, CompressionFormat): - yield obj - - @classmethod - def identify(cls, compressed_content: bytes) -> Optional[CompressionFormat]: - for compression_format in cls.iter_formats(): - if compression_format.match(compressed_content): - return compression_format - return None - - -class _ArchiveDecompressor: - def __init__(self): - self.archive_mapping: Dict[str, Callable[[bytes], bytes]] = { - "application/octet-stream": self._decompress_octet_stream, - "application/x-gzip": CompressionFormats.GZIP, - "gzip": CompressionFormats.GZIP, - } - - def _decompress_octet_stream(self, compressed_content: bytes) -> bytes: - if (compression_format := CompressionFormats.identify(compressed_content)) is None: - logger.debug("Could not identify compression format") - raise NotImplementedError - - return compression_format(compressed_content) - - def decompress(self, content: bytes, file_format: "str") -> bytes: - decompress_function = self.archive_mapping[file_format] - return decompress_function(content) - - @cached_property - def supported_file_formats(self) -> List[str]: - return list(self.archive_mapping.keys()) - - def is_valid_url(url: str) -> bool: + """True if the URL has an http/https scheme and a non-empty network location.""" parsed = urlparse(url) return bool(parsed.scheme in ("http", "https") and parsed.netloc) -def clean_url(url: str) -> str: - return unquote(url) +def strip_query_and_fragment(url: str) -> str: + """Return the URL with its query string and fragment removed. + + Intended for *identity* use (dedup keys, equality probes), not for fetching: + the result may resolve to a different resource than the input on servers that + rely on query parameters for routing. + """ + if any(indicator in url for indicator in ("?", "#")): + return urljoin(url, urlparse(url).path) + return url @dataclass class URLSource(Iterable[str], ABC): + """Abstract source of article URLs for a single feed/sitemap endpoint. + + Concrete subclasses (RSSFeed, Sitemap, NewsMap) implement fetch() to stream URLs from + the endpoint at . Iterating the source directly (__iter__) uses a default session + and headers for standalone use; production scraping calls fetch() through WebSource with + publisher-specific session and headers. + + Attributes: + url (str): The feed/sitemap URL to pull article URLs from. + languages (Set[str]): Language codes the source is known to serve, if any. + """ + url: str languages: Set[str] = field(default_factory=set) def __post_init__(self): + """Warn (but don't fail) if the configured URL is malformed.""" if not is_valid_url(self.url): logger.error(f"{type(self).__name__} initialized with invalid URL {self.url}") @@ -154,6 +109,8 @@ def get_urls(self, max_urls: Optional[int] = None) -> Iterator[str]: @dataclass class RSSFeed(URLSource): + """URLSource that yields article links from an RSS/Atom feed.""" + def fetch(self, session: InterruptableSession, headers: Dict[str, str]) -> Iterator[str]: try: response = session.get_with_interrupt(self.url, headers=headers) @@ -173,79 +130,158 @@ def fetch(self, session: InterruptableSession, headers: Dict[str, str]) -> Itera else: urls = filter(bool, (entry.get("link") for entry in rss_feed["entries"])) for url in urls: - yield clean_url(url) + # Some publishers emit URLs with percent-encoded path separators + # (e.g. `https://example.com%2Farticle.html`); see PR #753. + yield unquote(url) + + +class _Codec(NamedTuple): + """A supported compression format: its name, leading magic bytes, and decompress function.""" + + name: str + magic: bytes + decompress: Callable[[bytes], bytes] + + +# Identified by magic-byte sniff rather than headers: the formats we support all carry +# unambiguous signatures, and sniffing handles misadvertised or header-less payloads alike. +_CODECS: Tuple[_Codec, ...] = ( + _Codec("gzip", b"\x1f\x8b", gzip.decompress), + _Codec("bzip2", b"BZh", bz2.decompress), + _Codec("xz", b"\xfd7zXZ\x00", lzma.decompress), +) + + +def decompress(content: bytes) -> bytes: + """Decompress content if its leading bytes match a known codec, else return unchanged.""" + for codec in _CODECS: + if content.startswith(codec.magic): + return codec.decompress(content) + return content + + +def _default_sitemap_filter(url: str) -> bool: + """Default sitemap_filter: drop empty/falsy entries, keep everything else.""" + return not bool(url) @dataclass class Sitemap(URLSource): + """URLSource that yields article links from an XML sitemap, descending into sitemap indexes. + + Attributes: + recursive (bool): If True, follow nested references in a sitemap index. + Defaults to True. + reverse (bool): If True, yield URLs (and descend into sub-sitemaps) in reverse order. + Defaults to False. + sitemap_filter (URLFilter): Filter applied to sub-sitemap values; a truthy result + drops the entry. Defaults to dropping only empty values. + sort_predicate (Optional[Pattern[str]]): If set, sub-sitemap URLs are sorted (descending) + by the matched substring of this pattern; the pattern must match every URL. + """ + recursive: bool = True reverse: bool = False - sitemap_filter: URLFilter = lambda url: not bool(url) + sitemap_filter: URLFilter = _default_sitemap_filter sort_predicate: Optional[Pattern[str]] = None - _decompressor: ClassVar[_ArchiveDecompressor] = _ArchiveDecompressor() _sitemap_selector: ClassVar[XPath] = XPath("//*[local-name()='sitemap']/*[local-name()='loc']") _url_selector: ClassVar[XPath] = XPath("//*[local-name()='url']/*[local-name()='loc']") - _parser = XMLParser(strip_cdata=False, recover=True) + + @staticmethod + def _fetch_bytes( + sitemap_url: str, + session: InterruptableSession, + headers: Dict[str, str], + ) -> Optional[bytes]: + """Fetch sitemap bytes, decompressing if needed. Returns None on any failure. + + Handles HTTP errors, decompression failures, and empty bodies. Each failure + mode is logged at its point of occurrence; callers just check for None. + """ + if not is_valid_url(sitemap_url): + logger.info(f"Skipped sitemap {sitemap_url!r} because the URL is malformed") + return None + try: + response = session.get_with_interrupt(url=sitemap_url, headers=headers) + except (HTTPError, ConnectionError, ReadTimeout) as error: + logger.warning(f"Warning! Couldn't reach sitemap {sitemap_url!r} because of {error!r}") + return None + except Exception as error: + logger.error(f"Warning! Couldn't reach sitemap {sitemap_url!r} because of an unexpected error {error!r}") + return None + + content = response.content.strip() + try: + content = decompress(content) + except Exception as error: + logger.warning(f"Decompression failed for {sitemap_url!r}: {error!r}") + return None + if not content: + logger.warning(f"Warning! Empty sitemap at {sitemap_url!r}") + return None + return content + + def _ordered_sub_locs(self, tree: Any) -> List[str]: + """Extract sub-sitemap values, sorted by sort_predicate and filtered.""" + locs = [node.text for node in self._sitemap_selector(tree)] + + if self.sort_predicate is not None: + pattern = self.sort_predicate + + def key(text: str) -> str: + if match := pattern.search(text): + return match.group() + raise NotImplementedError(" must match in all sitemap URLs") + + locs = sorted(locs, key=key, reverse=True) + + return list(filter(inverse(self.sitemap_filter), locs)) + + def _yield_from_sitemap( + self, + sitemap_url: str, + session: InterruptableSession, + headers: Dict[str, str], + parser: XMLParser, + ) -> Iterator[str]: + # Download (and decompress) the sitemap bytes. + content = self._fetch_bytes(sitemap_url, session, headers) + if content is None: + return + + # Parse the bytes into an XML tree. + tree = fromstring(content, parser=parser) + if tree is None: + logger.warning(f"Warning! Couldn't parse sitemap {sitemap_url!r}") # type: ignore[unreachable] + return + + # Yield the article URLs contained in this sitemap, if any. + urls = [node.text for node in self._url_selector(tree)] + if urls: + for new_url in reversed(urls) if self.reverse else urls: + yield unquote(new_url) + return + + # Otherwise descend into nested sitemap-index references. + if not self.recursive: + return + locs = self._ordered_sub_locs(tree) + for loc in reversed(locs) if self.reverse else locs: + yield from self._yield_from_sitemap(loc, session, headers, parser) def fetch(self, session: InterruptableSession, headers: Dict[str, str]) -> Iterator[str]: - def yield_recursive(sitemap_url: str) -> Iterator[str]: - if not is_valid_url(sitemap_url): - logger.info(f"Skipped sitemap {sitemap_url!r} because the URL is malformed") - try: - response = session.get_with_interrupt(url=sitemap_url, headers=headers) - - except (HTTPError, ConnectionError, ReadTimeout) as error: - logger.warning(f"Warning! Couldn't reach sitemap {sitemap_url!r} because of {error!r}") - return - except Exception as error: - logger.error( - f"Warning! Couldn't reach sitemap {sitemap_url!r} because of an unexpected error {error!r}" - ) - return - - content = response.content.strip() - if (content_type := response.headers.get("content-type")) in self._decompressor.supported_file_formats: - try: - content = self._decompressor.decompress(content, content_type) - except NotImplementedError: - logger.warning(f"No matching decompression found for {sitemap_url!r}") - return - if not content: - logger.warning(f"Warning! Empty sitemap at {sitemap_url!r}") - return - tree = lxml.etree.fromstring(content, parser=self._parser) - if tree is None: - # in case we somehow end up with non xml content - logger.warning(f"Warning! Couldn't parse sitemap {sitemap_url!r}") # type: ignore[unreachable] - return - urls = [node.text for node in self._url_selector(tree)] - if urls: - for new_url in reversed(urls) if self.reverse else urls: - yield clean_url(new_url) - elif self.recursive: - sitemap_locs = [node.text for node in self._sitemap_selector(tree)] - - if self.sort_predicate is not None: - - def _extract_predicate(text: str, pattern: Pattern[str]) -> str: - if match := pattern.search(text): - return match.group() - raise NotImplementedError(" must match in all sitemap URLs") - - sitemap_locs = sorted( - sitemap_locs, - key=partial(_extract_predicate, pattern=self.sort_predicate), - reverse=True, - ) - - filtered_locs = list(filter(inverse(self.sitemap_filter), sitemap_locs)) - for loc in reversed(filtered_locs) if self.reverse else filtered_locs: - yield from yield_recursive(loc) - - yield from yield_recursive(self.url) + # lxml parsers serialize access across threads; construct one per fetch() so + # concurrent sitemap fetches don't contend. Each fetch() generator is consumed + # by a single thread, so the parser stays single-threaded for its lifetime. + parser = XMLParser(strip_cdata=False, recover=True) + yield from self._yield_from_sitemap(self.url, session, headers, parser) @dataclass class NewsMap(Sitemap): - pass + """Marker subclass for Google-News-style sitemaps (recent articles only). + + Parsing is identical to Sitemap; the distinct type lets the scraper prioritize + news sitemaps over full archive sitemaps via __SOURCE_ORDER__ in base_objects.py. + """ diff --git a/src/fundus/utils/concurrency.py b/src/fundus/utils/concurrency.py new file mode 100644 index 000000000..30d46ab95 --- /dev/null +++ b/src/fundus/utils/concurrency.py @@ -0,0 +1,88 @@ +import contextlib +import multiprocessing +from functools import lru_cache +from multiprocessing.managers import BaseManager +from threading import current_thread +from typing import Callable, Generic, Iterator, Optional, Tuple, TypeVar, cast + +import dill +from tqdm import tqdm +from typing_extensions import ParamSpec + +_T = TypeVar("_T") +_P = ParamSpec("_P") + + +def get_execution_context() -> Tuple[str, Optional[int]]: + """Return the name and identifier of the current execution context. + + If running inside a non-main process, returns that process's name and PID; otherwise + returns the current thread's name and thread id. + + Returns: + Tuple[str, Optional[int]]: The context's name and its integer identifier. + """ + if multiprocessing.current_process().name != "MainProcess": + process = multiprocessing.current_process() + return process.name, process.ident + else: + thread = current_thread() + return thread.name, thread.ident + + +class TQDMManager(BaseManager): + """multiprocessing manager exposing a shared tqdm proxy so worker processes drive one progress bar.""" + + def __init__(self, *args, **kwargs): + """Initialize the manager and register tqdm so it can be created behind a proxy.""" + super().__init__(*args, **kwargs) + self.register("_tqdm", tqdm) + + def tqdm(self, *args, **kwargs) -> tqdm: + """Create and return a manager-hosted (proxied) tqdm instance from the given tqdm args.""" + return getattr(self, "_tqdm")(*args, **kwargs) + + +@contextlib.contextmanager +def get_proxy_tqdm(*args, **kwargs) -> Iterator[tqdm]: + """Yield a manager-backed tqdm proxy that can be shared across processes. + + Init args are forwarded verbatim and are the same as for any other tqdm instance. The + backing manager is started on entry and shut down on exit. + + Args: + *args: Positional tqdm arguments. + **kwargs: Keyword tqdm arguments. + + Yields: + tqdm: A self-managed, proxied tqdm instance. + """ + manager = TQDMManager() + try: + manager.start() + yield manager.tqdm(*args, **kwargs) + finally: + manager.shutdown() + + +class dill_wrapper(Generic[_P, _T]): + """Callable wrapper that dill-serializes its target so it survives multiprocessing pickling.""" + + def __init__(self, target: Callable[_P, _T]): + """Wraps function in dill serialization. + + This is in order to use unpickable functions within multiprocessing. + + Args: + target: The function to wrap. + """ + self._serialized_target: bytes = dill.dumps(target) + + @lru_cache + def _deserialize(self) -> Callable[_P, _T]: + """Deserialize and cache the wrapped target on first use (once per process).""" + return cast(Callable[_P, _T], dill.loads(self._serialized_target)) + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: + """Deserialize the target (cached) and invoke it with the given arguments.""" + return self._deserialize()(*args, **kwargs) diff --git a/src/fundus/utils/events.py b/src/fundus/utils/events.py index a9b40eb27..27d86df8f 100644 --- a/src/fundus/utils/events.py +++ b/src/fundus/utils/events.py @@ -7,12 +7,60 @@ from fundus.logging import create_logger +# TODO (planned redesign): replace __EVENTS__ with explicit CancellationToken objects. +# +# Current state. __EVENTS__ is a global registry that maps a string alias (publisher +# name, "main-thread") to a dict of named threading.Event objects, plus a bidict +# linking aliases to thread ids so callers running inside a thread context can resolve +# `key=None` to "their own" events. It mashes three concerns into one mechanism: +# 1. cooperative cancellation (per-publisher stop signal) +# 2. shutdown propagation (system-wide stop via set_for_all(future=True)) +# 3. post-mortem queryability (main thread asking "did publisher X already stop?" +# after its worker exited, hence aliases-persist-after-thread-exit) +# +# Pain points: implicit thread-id resolution, string-keyed events (only "stop" exists +# in practice), the `future=True` hack for shutdown, leaky test setup (every test that +# touches WebSource/CCNewsSource needs __EVENTS__.context("test") aliasing), and an +# unclear seam for multiprocessing (threading.Event does not cross process boundaries). +# +# Planned shape: +# +# class CancellationToken: +# def __init__(self) -> None: +# self._event = threading.Event() +# self._children: list[CancellationToken] = [] +# def cancel(self) -> None: ... +# def is_cancelled(self) -> bool: ... +# def wait(self, timeout: float) -> bool: ... +# def child(self) -> "CancellationToken": ... # cancelled when parent is +# +# Mapping current usage onto tokens: +# - Source classes (WebSource, CCNewsSource): receive a CancellationToken via +# constructor instead of reading from __EVENTS__. +# - Crawler: holds a `dict[Publisher, CancellationToken]`. On per-publisher limit +# reached, calls `tokens[publisher].cancel()`. Replaces __EVENTS__.set_event( +# "stop", publisher_name) and __EVENTS__.is_event_set(...) at the same site. +# - Shutdown: a root token; each publisher's token is `root.child()`. Cancelling +# root cancels all children — replaces set_for_all(future=True) / clear_for_all. +# - queueing.enqueue_results: takes the shutdown token, replaces the +# __EVENTS__.is_event_set("stop", __MAIN_THREAD_ALIAS__) probe in _delivered. +# - Session.get_with_interrupt: takes a CancellationToken, polls +# token.is_cancelled() instead of __EVENTS__. +# +# What disappears: aliases, the thread-id bidict, main_context_lock, string event +# keys, default_events, future=True / clear_for_all, the test-context fixture, and +# every `from fundus.utils.events import __EVENTS__` in non-orchestrator code. + _T = TypeVar("_T") logger = create_logger(__name__) __DEFAULT_EVENTS__: List[str] = ["stop"] +# Alias under which the main thread registers its context in __EVENTS__; crawlers set/probe +# the "stop" event against it to drive cooperative shutdown. +__MAIN_THREAD_ALIAS__ = "main-thread" + _sentinel = object() diff --git a/src/fundus/utils/serialization.py b/src/fundus/utils/serialization.py index 0b15da0a4..9d3286976 100644 --- a/src/fundus/utils/serialization.py +++ b/src/fundus/utils/serialization.py @@ -1,10 +1,13 @@ import inspect import json from dataclasses import asdict, fields, is_dataclass +from datetime import datetime from typing import ( Any, Callable, Dict, + Optional, + Protocol, Sequence, Type, TypeVar, @@ -12,6 +15,7 @@ get_args, get_origin, get_type_hints, + runtime_checkable, ) from typing_extensions import TypeAlias @@ -21,6 +25,43 @@ JSONVal: TypeAlias = Union[None, bool, str, float, int, Sequence["JSONVal"], Dict[str, "JSONVal"]] +@runtime_checkable +class Serializable(Protocol): + """Anything that knows how to convert itself into a JSON-compatible structure. + + Implementing types opt into the export path used by Article.to_json. + """ + + def serialize(self) -> JSONVal: ... + + +def serialize_value(value: Any, field_name: Optional[str] = None) -> JSONVal: + """Recursively convert a value to JSON-compatible form. + + Args: + value: The value to serialize. + field_name: Optional originating field name, used only for error messages. + + Returns: + A JSON-serializable structure. + + Raises: + TypeError: If the value's type has no defined serialization. + """ + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, (list, tuple)): + return [serialize_value(item, field_name) for item in value] + if isinstance(value, dict): + return {str(k): serialize_value(v, field_name) for k, v in value.items()} + if isinstance(value, Serializable): + return value.serialize() + location = f"field {field_name!r}" if field_name else "value" + raise TypeError(f"Cannot serialize {location} of type {type(value).__name__}") + + def is_jsonable(x): try: json.dumps(x) diff --git a/src/fundus/utils/timeout.py b/src/fundus/utils/timeout.py index 92b6e7d48..3e7d3121e 100644 --- a/src/fundus/utils/timeout.py +++ b/src/fundus/utils/timeout.py @@ -4,91 +4,67 @@ import time from typing import Callable, Iterator, Optional -from typing_extensions import ParamSpec -P = ParamSpec("P") - - -class Stopwatch: - def __init__(self): - self._start = time.time() +def _interrupt_handler() -> None: + thread.interrupt_main() - @property - def time(self) -> float: - return max(0.0, time.time() - self._start) - def reset(self): - self._start = time.time() +class ResettableTimer: + class _Stopwatch: + def __init__(self) -> None: + self._start = time.time() + @property + def elapsed(self) -> float: + return max(0.0, time.time() - self._start) -class ResettableTimer(threading.Thread): - def __init__( - self, - seconds: float, - func: Callable[P, None], - interval: float = 0.1, - args: P.args = tuple(), - kwargs: P.kwargs = None, - ) -> None: - """Resettable timer executing after