Bump torch from 2.7.1 to 2.12.0 in /sdks/python/container/ml/py312#38908
Bump torch from 2.7.1 to 2.12.0 in /sdks/python/container/ml/py312#38908dependabot[bot] wants to merge 1 commit into
17 errors, 12 skipped, 1 pass in 21m 3s
Annotations
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_invalid_query_on_non_existent_collection (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 16s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:217: in start_db_container
raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
port = running_container.get_exposed_port(service_container_port)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7faca9c84c30>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7faca9ce5450>
def wait_until_ready(self, container: WaitStrategyTarget) -> None:
result = self._poll(lambda: self.running(self.get_status(container)))
if not result:
> raise TimeoutError("container did not become running")
E TimeoutError: container did not become running
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_filtered_search_with_cosine_similarity_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 16s]
Raw output
failed on setup with "ConnectionError: Port mapping for container bfe1e5d64015ec7df2697f5bc7a7dadaaa2b2f4006bef331aa008bdff7ccc271 and port 44297 is not available"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:217: in start_db_container
raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
port = running_container.get_exposed_port(service_container_port)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:332: in get_exposed_port
return self._get_exposed_port(port)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:336: in _get_exposed_port
return int(self.get_docker_client().port(self.get_container_id(), port))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <testcontainers.core.docker_client.DockerClient object at 0x7f8ba0accfc0>
container_id = 'bfe1e5d64015ec7df2697f5bc7a7dadaaa2b2f4006bef331aa008bdff7ccc271'
port = 44297
def port(self, container_id: str, port: int) -> str:
"""
Lookup the public-facing port that is NAT-ed to :code:`port`.
"""
port_mappings = self.client.api.port(container_id, port)
if not port_mappings:
> raise ConnectionError(f"Port mapping for container {container_id} and port {port} is not available")
E ConnectionError: Port mapping for container bfe1e5d64015ec7df2697f5bc7a7dadaaa2b2f4006bef331aa008bdff7ccc271 and port 44297 is not available
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/docker_client.py:372: ConnectionError
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_invalid_query_on_non_existent_field (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:217: in start_db_container
raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
port = running_container.get_exposed_port(service_container_port)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7faca9c84c30>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7faca9ce5450>
def wait_until_ready(self, container: WaitStrategyTarget) -> None:
result = self._poll(lambda: self.running(self.get_status(container)))
if not result:
> raise TimeoutError("container did not become running")
E TimeoutError: container did not become running
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_hybrid_search (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "ConnectionError: Port mapping for container bfe1e5d64015ec7df2697f5bc7a7dadaaa2b2f4006bef331aa008bdff7ccc271 and port 44297 is not available"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:217: in start_db_container
raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
port = running_container.get_exposed_port(service_container_port)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:332: in get_exposed_port
return self._get_exposed_port(port)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:336: in _get_exposed_port
return int(self.get_docker_client().port(self.get_container_id(), port))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <testcontainers.core.docker_client.DockerClient object at 0x7f8ba0accfc0>
container_id = 'bfe1e5d64015ec7df2697f5bc7a7dadaaa2b2f4006bef331aa008bdff7ccc271'
port = 44297
def port(self, container_id: str, port: int) -> str:
"""
Lookup the public-facing port that is NAT-ed to :code:`port`.
"""
port_mappings = self.client.api.port(container_id, port)
if not port_mappings:
> raise ConnectionError(f"Port mapping for container {container_id} and port {port} is not available")
E ConnectionError: Port mapping for container bfe1e5d64015ec7df2697f5bc7a7dadaaa2b2f4006bef331aa008bdff7ccc271 and port 44297 is not available
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/docker_client.py:372: ConnectionError
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_invalid_write_on_missing_primary_key_in_entity (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fbdc573bc80>, 0)>
timeout = 9.999996900558472
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_keyword_search_with_inner_product_sparse_embedding (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fb49c1bb5c0>, 0)>
timeout = 9.999996662139893
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_empty_input_chunks (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f8600c11280>, 0)>
timeout = 9.999996900558472
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_vector_search_with_inner_product_similarity (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f145871a6c0>, 0)>
timeout = 9.999996662139893
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_filtered_search_with_bm25_full_text_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f8600c11280>, 0)>
timeout = 9.999996900558472
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_vector_search_with_euclidean_distance (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fb49c1bb5c0>, 0)>
timeout = 9.999996662139893
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_invalid_write_on_non_existent_collection (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fbdc573bc80>, 0)>
timeout = 9.999996900558472
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_write_with_custom_column_specifications (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fbdc573bc80>, 0)>
timeout = 9.999996900558472
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
> gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E Failed: Timeout (>600.0s) from pytest-timeout.
/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_invalid_write_on_non_existent_partition (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7faca9bd2300>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
> grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <grpc._utilities._ChannelReadyFuture object at 0x7faca97d94f0>
timeout = 10
def _block(self, timeout: Optional[float]) -> None:
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._cancelled:
raise grpc.FutureCancelledError()
if self._matured:
return
if until is None:
self._condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
> raise grpc.FutureTimeoutError()
E grpc.FutureTimeoutError
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError
The above exception was the direct cause of the following exception:
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7faca9c4da80>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
> result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/utils.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7faca9bd2300>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
except grpc.FutureTimeoutError as e:
self.close()
> raise MilvusException(
code=Status.CONNECT_FAILED,
message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
) from e
E pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:34723, illegal connection params or server unavailable)>
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException
During handling of the above exception, another exception occurred:
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7faca9c4da80>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
result = operation()
_LOGGER.info(
"Successfully completed %s on attempt %d",
operation_name,
attempt + 1)
return result
except exception_types as e:
last_exception = e
if attempt < max_retries:
delay = retry_delay * (retry_backoff_factor**attempt)
_LOGGER.warning(
"%s attempt %d failed: %s. Retrying in %.2f seconds...",
operation_name,
attempt + 1,
e,
delay)
> time.sleep(delay)
E Failed: Timeout (>600.0s) from pytest-timeout.
apache_beam/ml/rag/utils.py:216: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_write_on_auto_id_primary_key (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f8b9c644230>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
> grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <grpc._utilities._ChannelReadyFuture object at 0x7f8b9c6913b0>
timeout = 10
def _block(self, timeout: Optional[float]) -> None:
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._cancelled:
raise grpc.FutureCancelledError()
if self._matured:
return
if until is None:
self._condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
> raise grpc.FutureTimeoutError()
E grpc.FutureTimeoutError
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError
The above exception was the direct cause of the following exception:
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f8cd4055bc0>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
> result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/utils.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f8b9c644230>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
except grpc.FutureTimeoutError as e:
self.close()
> raise MilvusException(
code=Status.CONNECT_FAILED,
message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
) from e
E pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:36659, illegal connection params or server unavailable)>
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException
During handling of the above exception, another exception occurred:
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f8cd4055bc0>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
result = operation()
_LOGGER.info(
"Successfully completed %s on attempt %d",
operation_name,
attempt + 1)
return result
except exception_types as e:
last_exception = e
if attempt < max_retries:
delay = retry_delay * (retry_backoff_factor**attempt)
_LOGGER.warning(
"%s attempt %d failed: %s. Retrying in %.2f seconds...",
operation_name,
attempt + 1,
e,
delay)
> time.sleep(delay)
E Failed: Timeout (>600.0s) from pytest-timeout.
apache_beam/ml/rag/utils.py:216: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_write_on_existent_collection_with_default_schema (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f8b9c644230>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
> grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <grpc._utilities._ChannelReadyFuture object at 0x7f8b9c6913b0>
timeout = 10
def _block(self, timeout: Optional[float]) -> None:
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._cancelled:
raise grpc.FutureCancelledError()
if self._matured:
return
if until is None:
self._condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
> raise grpc.FutureTimeoutError()
E grpc.FutureTimeoutError
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError
The above exception was the direct cause of the following exception:
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f8cd4055bc0>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
> result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/utils.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f8b9c644230>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
except grpc.FutureTimeoutError as e:
self.close()
> raise MilvusException(
code=Status.CONNECT_FAILED,
message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
) from e
E pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:36659, illegal connection params or server unavailable)>
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException
During handling of the above exception, another exception occurred:
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f8cd4055bc0>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
result = operation()
_LOGGER.info(
"Successfully completed %s on attempt %d",
operation_name,
attempt + 1)
return result
except exception_types as e:
last_exception = e
if attempt < max_retries:
delay = retry_delay * (retry_backoff_factor**attempt)
_LOGGER.warning(
"%s attempt %d failed: %s. Retrying in %.2f seconds...",
operation_name,
attempt + 1,
e,
delay)
> time.sleep(delay)
E Failed: Timeout (>600.0s) from pytest-timeout.
apache_beam/ml/rag/utils.py:216: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_write_with_batching (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7faca9bd2300>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
> grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <grpc._utilities._ChannelReadyFuture object at 0x7faca97d94f0>
timeout = 10
def _block(self, timeout: Optional[float]) -> None:
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._cancelled:
raise grpc.FutureCancelledError()
if self._matured:
return
if until is None:
self._condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
> raise grpc.FutureTimeoutError()
E grpc.FutureTimeoutError
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError
The above exception was the direct cause of the following exception:
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7faca9c4da80>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
> result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/utils.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7faca9bd2300>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
except grpc.FutureTimeoutError as e:
self.close()
> raise MilvusException(
code=Status.CONNECT_FAILED,
message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
) from e
E pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:34723, illegal connection params or server unavailable)>
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException
During handling of the above exception, another exception occurred:
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7faca9c4da80>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
result = operation()
_LOGGER.info(
"Successfully completed %s on attempt %d",
operation_name,
attempt + 1)
return result
except exception_types as e:
last_exception = e
if attempt < max_retries:
delay = retry_delay * (retry_backoff_factor**attempt)
_LOGGER.warning(
"%s attempt %d failed: %s. Retrying in %.2f seconds...",
operation_name,
attempt + 1,
e,
delay)
> time.sleep(delay)
E Failed: Timeout (>600.0s) from pytest-timeout.
apache_beam/ml/rag/utils.py:216: Failed
github-actions / Python 3.13 Test Results (ubuntu-latest)
test_idempotent_write (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error
sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f1458641f40>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
> grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <grpc._utilities._ChannelReadyFuture object at 0x7f14585282f0>
timeout = 10
def _block(self, timeout: Optional[float]) -> None:
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._cancelled:
raise grpc.FutureCancelledError()
if self._matured:
return
if until is None:
self._condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
> raise grpc.FutureTimeoutError()
E grpc.FutureTimeoutError
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError
The above exception was the direct cause of the following exception:
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f145bba1800>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
> result = operation()
^^^^^^^^^^^
apache_beam/ml/rag/utils.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
client = MilvusClient(uri=uri)
^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
return self._create_shared(config, client, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f1458641f40>
timeout = None
def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel first",
)
# grpc.Future.result(timeout=None) blocks indefinitely. Normalise None
# to the default 10 s so that an unreachable URI raises MilvusException
# instead of hanging forever (mirrors async ensure_channel_ready behaviour).
effective_timeout = timeout if timeout is not None else 10
try:
grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
except grpc.FutureTimeoutError as e:
self.close()
> raise MilvusException(
code=Status.CONNECT_FAILED,
message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
) from e
E pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:37911, illegal connection params or server unavailable)>
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException
During handling of the above exception, another exception occurred:
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusTestHelpers.start_db_container()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/test_utils.py:191: in start_db_container
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f145bba1800>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)
def retry_with_backoff(
operation: Callable[[], Any],
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff_factor: float = 2.0,
operation_name: str = "operation",
exception_types: tuple[type[BaseException], ...] = (Exception, )
) -> Any:
"""Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may
fail transiently. It retries the operation with exponential backoff between
attempts.
Note:
This utility is designed for one-time setup operations and complements
Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
* Establishing client connections in __enter__() methods (e.g., creating
MilvusClient instances, database connections) before processing elements
* One-time setup/teardown operations in DoFn lifecycle methods
* Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__),
use RequestResponseIO which already provides automatic retry with
exponential backoff, failure handling, caching, and other features.
See: https://beam.apache.org/documentation/io/built-in/webapis/
Args:
operation: Callable that performs the operation to retry. Should return
the result of the operation.
max_retries: Maximum number of retry attempts. Default is 3.
retry_delay: Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor: Multiplier for the delay after each retry. Default
is 2.0 (exponential backoff).
operation_name: Name of the operation for logging purposes. Default is
"operation".
exception_types: Tuple of exception types to catch and retry. Default is
(Exception,) which catches all exceptions.
Returns:
The result of the operation if successful.
Raises:
The last exception encountered if all retry attempts fail.
Example:
>>> def connect_to_service():
... return service.connect(host="localhost")
>>> client = retry_with_backoff(
... connect_to_service,
... max_retries=5,
... retry_delay=2.0,
... operation_name="service connection")
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
result = operation()
_LOGGER.info(
"Successfully completed %s on attempt %d",
operation_name,
attempt + 1)
return result
except exception_types as e:
last_exception = e
if attempt < max_retries:
delay = retry_delay * (retry_backoff_factor**attempt)
_LOGGER.warning(
"%s attempt %d failed: %s. Retrying in %.2f seconds...",
operation_name,
attempt + 1,
e,
delay)
> time.sleep(delay)
E Failed: Timeout (>600.0s) from pytest-timeout.
apache_beam/ml/rag/utils.py:216: Failed
Check notice on line 0 in .github
github-actions / Python 3.13 Test Results (ubuntu-latest)
12 skipped tests found
There are 12 skipped tests, see "Raw output" for the full list of skipped tests.
Raw output
apache_beam.ml.inference.anthropic_inference_it_test
apache_beam.ml.inference.anthropic_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_both_dense_and_sparse
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_dense_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_sparse_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_byte_size_limit
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test
Check notice on line 0 in .github
github-actions / Python 3.13 Test Results (ubuntu-latest)
30 tests found
There are 30 tests, see "Raw output" for the full list of tests.
Raw output
apache_beam.ml.inference.anthropic_inference_it_test
apache_beam.ml.inference.anthropic_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_empty_input_chunks
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_bm25_full_text_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_cosine_similarity_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_hybrid_search
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_collection
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_field
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_keyword_search_with_inner_product_sparse_embedding
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_euclidean_distance
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_inner_product_similarity
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_idempotent_write
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_missing_primary_key_in_entity
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_non_existent_collection
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_non_existent_partition
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_on_auto_id_primary_key
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_on_existent_collection_with_default_schema
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_with_custom_column_specifications
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_both_dense_and_sparse
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_dense_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_on_non_existent_collection
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_sparse_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_byte_size_limit
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test