Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ apify-client = false
apify_fingerprint_datapoints = false
crawlee = false

[tool.uv.sources]
crawlee = { git = "https://github.com/Mantisus/crawlee-python", branch = "queue-client-is-finished" }

# Run tasks with: uv run poe <task>
[tool.poe.tasks]
clean = "rm -rf .coverage .pytest_cache .ruff_cache .ty_cache build dist htmlcov"
Expand Down
4 changes: 4 additions & 0 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,7 @@ async def reclaim_request(
@override
async def is_empty(self) -> bool:
return await self._implementation.is_empty()

@override
async def is_finished(self) -> bool:
return await self._implementation.is_finished()
14 changes: 12 additions & 2 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,18 @@ async def is_empty(self) -> bool:
# Check _list_head.
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
async with self._fetch_lock:
head = await self._list_head(limit=1)
return len(head.items) == 0 and not self._queue_has_locked_requests
return await self._is_empty()

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ shared access mode."""
async with self._fetch_lock:
# Order of operations is important here, because affects on `_queue_has_locked_requests`.
return await self._is_empty() and not self._queue_has_locked_requests
Comment thread
vdusek marked this conversation as resolved.

async def _is_empty(self) -> bool:
"""Check whether anything is available to fetch. Lock-free core of `is_empty`, caller must hold the lock."""
head = await self._list_head(limit=1)
return len(head.items) == 0

async def _get_metadata_estimate(self) -> RequestQueueMetadata:
"""Try to get cached metadata first. If multiple clients, fuse with global metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,12 @@ async def reclaim_request(

async def is_empty(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
await self._ensure_head_is_non_empty()
return not self._head_requests and not self._requests_in_progress
return not self._head_requests

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
return await self.is_empty() and not self._requests_in_progress

async def _ensure_head_is_non_empty(self) -> None:
"""Ensure that the queue head has requests if they are available in the queue."""
Expand Down
14 changes: 13 additions & 1 deletion tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -1227,10 +1227,18 @@ async def test_force_cloud(
assert str(request_queue_request.url) == 'http://example.com'


async def test_request_queue_is_finished(
async def test_request_queue_is_finished_and_is_empty(
request_queue_apify: RequestQueue,
rq_poll_timeout: int,
) -> None:
"""Test that `is_empty` and `is_finished` behave correctly with Apify request queue."""

assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2), (
'RequestQueue should be empty initially.'
)
assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2), (
'RequestQueue should be finished initially.'
)

await request_queue_apify.add_request(Request.from_url('http://example.com'))
assert not await request_queue_apify.is_finished()
Expand All @@ -1239,11 +1247,15 @@ async def test_request_queue_is_finished(
request_queue_apify.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2
)
assert fetched is not None
assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2), (
'RequestQueue should be empty because queue does not contain any requests for fetching.'
)
assert not await request_queue_apify.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)

await request_queue_apify.mark_request_as_handled(fetched)
assert await poll_until_condition(request_queue_apify.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2)


Expand Down
12 changes: 4 additions & 8 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading