Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,6 @@ async def reclaim_request(
@override
async def is_empty(self) -> bool:
return await self._implementation.is_empty()

async def is_finished(self) -> bool:
return await self._implementation.is_finished()
26 changes: 26 additions & 0 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from cachetools import LRUCache

from crawlee.storage_clients._base import RequestQueueClient
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

from ._models import ApifyRequestQueueMetadata, CachedRequest, RequestQueueHead
Expand All @@ -23,6 +24,9 @@
logger = getLogger(__name__)


_CRAWLEE_SUPPORTS_IS_FINISHED = hasattr(RequestQueueClient, 'is_finished')


class ApifyRequestQueueSharedClient:
"""Internal request queue client implementation for multi-consumer scenarios on the Apify platform.

Expand Down Expand Up @@ -289,8 +293,30 @@ async def reclaim_request(

async def is_empty(self) -> bool:
"""Specific implementation of this method for the RQ shared access mode."""
if not _CRAWLEE_SUPPORTS_IS_FINISHED:
return await self._old_is_empty()

# Check _list_head.
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
async with self._fetch_lock:
return await self._is_empty()

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ shared access mode."""
if not _CRAWLEE_SUPPORTS_IS_FINISHED:
return await self._old_is_empty()

async with self._fetch_lock:
# Order of operations is important here, because affects on `_queue_has_locked_requests`.
return await self._is_empty() and not self._queue_has_locked_requests
Comment thread
vdusek marked this conversation as resolved.

async def _is_empty(self) -> bool:
"""Check whether anything is available to fetch. Lock-free core of `is_empty`, caller must hold the lock."""
head = await self._list_head(limit=1)
return len(head.items) == 0

async def _old_is_empty(self) -> bool:
"""Temporary workaround for compatibility with Crawlee versions earlier than 1.8.0."""
async with self._fetch_lock:
head = await self._list_head(limit=1)
return len(head.items) == 0 and not self._queue_has_locked_requests
Expand Down
20 changes: 19 additions & 1 deletion src/apify/storage_clients/_apify/_request_queue_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from cachetools import LRUCache

from crawlee.storage_clients._base import RequestQueueClient
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

from ._utils import to_crawlee_request, unique_key_to_request_id
Expand All @@ -21,6 +22,9 @@
logger = getLogger(__name__)


_CRAWLEE_SUPPORTS_IS_FINISHED = hasattr(RequestQueueClient, 'is_finished')


class ApifyRequestQueueSingleClient:
"""Internal request queue client implementation for single-consumer scenarios on the Apify platform.

Expand Down Expand Up @@ -277,7 +281,21 @@ async def reclaim_request(

async def is_empty(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
if not _CRAWLEE_SUPPORTS_IS_FINISHED:
return await self._old_is_empty()

await self._ensure_head_is_non_empty()
return not self._head_requests

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
if not _CRAWLEE_SUPPORTS_IS_FINISHED:
return await self._old_is_empty()

return await self.is_empty() and not self._requests_in_progress

async def _old_is_empty(self) -> bool:
"""Temporary workaround for compatibility with Crawlee versions earlier than 1.8.0."""
await self._ensure_head_is_non_empty()
return not self._head_requests and not self._requests_in_progress

Expand Down
30 changes: 26 additions & 4 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from apify_client._models import BatchAddResult, RequestDraft
from crawlee import service_locator
from crawlee.crawlers import BasicCrawler
from crawlee.storage_clients._base import RequestQueueClient

from .._utils import generate_unique_resource_name, poll_until_condition
from apify import Actor, Request
Expand Down Expand Up @@ -1227,10 +1228,18 @@ async def test_force_cloud(
assert str(request_queue_request.url) == 'http://example.com'


async def test_request_queue_is_finished(
async def test_request_queue_is_finished_and_is_empty(
request_queue_apify: RequestQueue,
rq_poll_timeout: int,
) -> None:
"""Test that `is_empty` and `is_finished` behave correctly with Apify request queue."""

assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2), (
'RequestQueue should be empty initially.'
)
assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2), (
'RequestQueue should be finished initially.'
)

await request_queue_apify.add_request(Request.from_url('http://example.com'))
assert not await request_queue_apify.is_finished()
Expand All @@ -1239,11 +1248,24 @@ async def test_request_queue_is_finished(
request_queue_apify.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2
)
assert fetched is not None
assert not await request_queue_apify.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)

if hasattr(RequestQueueClient, 'is_finished'):
assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2), (
'RequestQueue should be empty because queue does not contain any requests for fetching.'
)
assert not await request_queue_apify.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)
else:
assert not await poll_until_condition(
request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2
), 'RequestQueue should not be empty because queue contains a request in progress.'
assert not await request_queue_apify.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)

await request_queue_apify.mark_request_as_handled(fetched)
assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2)


Expand Down
Loading