Skip to content

Add MLTransform embedding benchmarks#38917

Open
aIbrahiim wants to merge 2 commits into
apache:masterfrom
aIbrahiim:mltransform-embedding-benchmarks
Open

Add MLTransform embedding benchmarks#38917
aIbrahiim wants to merge 2 commits into
apache:masterfrom
aIbrahiim:mltransform-embedding-benchmarks

resolved code review comments

a5c98fb
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
GitHub Actions / Python 3.13 Test Results (ubuntu-latest) failed Jun 12, 2026 in 0s

17 errors, 12 skipped, 1 pass in 11m 22s

 2 files  ±0  2 suites  ±0   11m 22s ⏱️ - 9m 50s
30 tests ±0  1 ✅ ±0  12 💤 ±0  0 ❌ ±0  17 🔥 ±0 
37 runs  ±0  1 ✅ ±0  19 💤 ±0  0 ❌ ±0  17 🔥 ±0 

Results for commit a5c98fb. ± Comparison against earlier commit 19769a4.

Annotations

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_empty_input_chunks (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 15s]
Raw output
failed on setup with "ConnectionError: Port mapping for container 4811d6bf03fd2423d787c664cb0b72e19d32836ea16cac190245deb7c67b8b00 and port 41951 is not available"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:332: in get_exposed_port
    return self._get_exposed_port(port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:336: in _get_exposed_port
    return int(self.get_docker_client().port(self.get_container_id(), port))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.docker_client.DockerClient object at 0x7f81b3eed480>
container_id = '4811d6bf03fd2423d787c664cb0b72e19d32836ea16cac190245deb7c67b8b00'
port = 41951

    def port(self, container_id: str, port: int) -> str:
        """
        Lookup the public-facing port that is NAT-ed to :code:`port`.
        """
        port_mappings = self.client.api.port(container_id, port)
        if not port_mappings:
>           raise ConnectionError(f"Port mapping for container {container_id} and port {port} is not available")
E           ConnectionError: Port mapping for container 4811d6bf03fd2423d787c664cb0b72e19d32836ea16cac190245deb7c67b8b00 and port 41951 is not available

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/docker_client.py:372: ConnectionError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_query_on_non_existent_collection (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 15s]
Raw output
failed on setup with "ConnectionError: Port mapping for container 5851869f645f9b61a24d5b8d3d4f40d35b9576efca539a46a5b8096446f0c2c1 and port 55387 is not available"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:332: in get_exposed_port
    return self._get_exposed_port(port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:336: in _get_exposed_port
    return int(self.get_docker_client().port(self.get_container_id(), port))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.docker_client.DockerClient object at 0x7fa38dc52780>
container_id = '5851869f645f9b61a24d5b8d3d4f40d35b9576efca539a46a5b8096446f0c2c1'
port = 55387

    def port(self, container_id: str, port: int) -> str:
        """
        Lookup the public-facing port that is NAT-ed to :code:`port`.
        """
        port_mappings = self.client.api.port(container_id, port)
        if not port_mappings:
>           raise ConnectionError(f"Port mapping for container {container_id} and port {port} is not available")
E           ConnectionError: Port mapping for container 5851869f645f9b61a24d5b8d3d4f40d35b9576efca539a46a5b8096446f0c2c1 and port 55387 is not available

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/docker_client.py:372: ConnectionError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_filtered_search_with_bm25_full_text_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "ConnectionError: Port mapping for container 4811d6bf03fd2423d787c664cb0b72e19d32836ea16cac190245deb7c67b8b00 and port 41951 is not available"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:332: in get_exposed_port
    return self._get_exposed_port(port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:336: in _get_exposed_port
    return int(self.get_docker_client().port(self.get_container_id(), port))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.docker_client.DockerClient object at 0x7f81b3eed480>
container_id = '4811d6bf03fd2423d787c664cb0b72e19d32836ea16cac190245deb7c67b8b00'
port = 41951

    def port(self, container_id: str, port: int) -> str:
        """
        Lookup the public-facing port that is NAT-ed to :code:`port`.
        """
        port_mappings = self.client.api.port(container_id, port)
        if not port_mappings:
>           raise ConnectionError(f"Port mapping for container {container_id} and port {port} is not available")
E           ConnectionError: Port mapping for container 4811d6bf03fd2423d787c664cb0b72e19d32836ea16cac190245deb7c67b8b00 and port 41951 is not available

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/docker_client.py:372: ConnectionError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_query_on_non_existent_field (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "ConnectionError: Port mapping for container 5851869f645f9b61a24d5b8d3d4f40d35b9576efca539a46a5b8096446f0c2c1 and port 55387 is not available"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:332: in get_exposed_port
    return self._get_exposed_port(port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:336: in _get_exposed_port
    return int(self.get_docker_client().port(self.get_container_id(), port))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.docker_client.DockerClient object at 0x7fa38dc52780>
container_id = '5851869f645f9b61a24d5b8d3d4f40d35b9576efca539a46a5b8096446f0c2c1'
port = 55387

    def port(self, container_id: str, port: int) -> str:
        """
        Lookup the public-facing port that is NAT-ed to :code:`port`.
        """
        port_mappings = self.client.api.port(container_id, port)
        if not port_mappings:
>           raise ConnectionError(f"Port mapping for container {container_id} and port {port} is not available")
E           ConnectionError: Port mapping for container 5851869f645f9b61a24d5b8d3d4f40d35b9576efca539a46a5b8096446f0c2c1 and port 55387 is not available

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/docker_client.py:372: ConnectionError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_keyword_search_with_inner_product_sparse_embedding (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
    result = operation()
             ^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
    grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
    self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f1058b4b2c0>, 0)>
timeout = 9.999996423721313

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                   Failed: Timeout (>600.0s) from pytest-timeout.

/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_filtered_search_with_cosine_similarity_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
    result = operation()
             ^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
    grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
    self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f782f8bc480>, 0)>
timeout = 9.99999713897705

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                   Failed: Timeout (>600.0s) from pytest-timeout.

/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_write_on_missing_primary_key_in_entity (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
    result = operation()
             ^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
    grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
    self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fbbdb787400>, 0)>
timeout = 9.999996900558472

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                   Failed: Timeout (>600.0s) from pytest-timeout.

/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_vector_search_with_inner_product_similarity (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
    result = operation()
             ^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
    grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
    self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fa062071b00>, 0)>
timeout = 9.999996423721313

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                   Failed: Timeout (>600.0s) from pytest-timeout.

/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_vector_search_with_euclidean_distance (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
    result = operation()
             ^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
    grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
    self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f1058b4b2c0>, 0)>
timeout = 9.999996423721313

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                   Failed: Timeout (>600.0s) from pytest-timeout.

/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_hybrid_search (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
    result = operation()
             ^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
    grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
    self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f782f8bc480>, 0)>
timeout = 9.99999713897705

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                   Failed: Timeout (>600.0s) from pytest-timeout.

/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_write_on_non_existent_collection (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
apache_beam/ml/rag/utils.py:200: in retry_with_backoff
    result = operation()
             ^^^^^^^^^^^
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: in _wait_for_channel_ready
    grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:106: in _block
    self._condition.wait(timeout=remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7fbbdb787400>, 0)>
timeout = 9.999996900558472

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                   Failed: Timeout (>600.0s) from pytest-timeout.

/opt/hostedtoolcache/Python/3.13.13/x64/lib/python3.13/threading.py:363: Failed

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_idempotent_write (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 3s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7fa0662b27b0>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7fa0662cc8a0>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_with_custom_column_specifications (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 3s]
Raw output
failed on setup with "ConnectionError: Port mapping for container eb3421c9500e1d381630a6005ac56017efb610ba6f470b1c7227db5ee35761fe and port 37507 is not available"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:332: in get_exposed_port
    return self._get_exposed_port(port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:336: in _get_exposed_port
    return int(self.get_docker_client().port(self.get_container_id(), port))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.docker_client.DockerClient object at 0x7f782b3195b0>
container_id = 'eb3421c9500e1d381630a6005ac56017efb610ba6f470b1c7227db5ee35761fe'
port = 37507

    def port(self, container_id: str, port: int) -> str:
        """
        Lookup the public-facing port that is NAT-ed to :code:`port`.
        """
        port_mappings = self.client.api.port(container_id, port)
        if not port_mappings:
>           raise ConnectionError(f"Port mapping for container {container_id} and port {port} is not available")
E           ConnectionError: Port mapping for container eb3421c9500e1d381630a6005ac56017efb610ba6f470b1c7227db5ee35761fe and port 37507 is not available

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/docker_client.py:372: ConnectionError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_on_auto_id_primary_key (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa38da05f40>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7fa38d7ef930>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa4c5f94900>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa38da05f40>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:37317, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa4c5f94900>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_write_on_non_existent_partition (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f81b3c5ea80>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7f81b3997e30>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f81b3f30540>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f81b3c5ea80>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:51263, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f81b3f30540>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_on_existent_collection_with_default_schema (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa38da05f40>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7fa38d7ef930>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa4c5f94900>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa38da05f40>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:37317, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa4c5f94900>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_with_batching (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f81b3c5ea80>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7f81b3997e30>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f81b3f30540>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f81b3c5ea80>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:51263, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f81b3f30540>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check notice on line 0 in .github

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

12 skipped tests found

There are 12 skipped tests, see "Raw output" for the full list of skipped tests.
Raw output
apache_beam.ml.inference.anthropic_inference_it_test
apache_beam.ml.inference.anthropic_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_both_dense_and_sparse
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_dense_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_sparse_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_byte_size_limit
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test

Check notice on line 0 in .github

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

30 tests found

There are 30 tests, see "Raw output" for the full list of tests.
Raw output
apache_beam.ml.inference.anthropic_inference_it_test
apache_beam.ml.inference.anthropic_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_empty_input_chunks
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_bm25_full_text_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_cosine_similarity_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_hybrid_search
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_collection
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_field
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_keyword_search_with_inner_product_sparse_embedding
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_euclidean_distance
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_inner_product_similarity
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_idempotent_write
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_missing_primary_key_in_entity
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_non_existent_collection
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_non_existent_partition
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_on_auto_id_primary_key
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_on_existent_collection_with_default_schema
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_with_custom_column_specifications
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_both_dense_and_sparse
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_dense_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_on_non_existent_collection
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_sparse_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_byte_size_limit
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test