diff --git a/newrelic/config.py b/newrelic/config.py index fb053cd3ff..6aba3ee822 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -3111,6 +3111,9 @@ def _process_module_builtin_defaults(): _process_module_definition( "kafka.coordinator.heartbeat", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_heartbeat" ) + _process_module_definition( + "kafka.cluster", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_cluster" + ) _process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging") _process_module_definition( "kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion" diff --git a/newrelic/hooks/messagebroker_confluentkafka.py b/newrelic/hooks/messagebroker_confluentkafka.py index 662e5ba87d..2cce569557 100644 --- a/newrelic/hooks/messagebroker_confluentkafka.py +++ b/newrelic/hooks/messagebroker_confluentkafka.py @@ -13,6 +13,7 @@ # limitations under the License. import logging import sys +import threading from newrelic.api.application import application_instance from newrelic.api.error_trace import wrap_error_trace @@ -33,6 +34,62 @@ HEARTBEAT_SESSION_TIMEOUT = "MessageBroker/Kafka/Heartbeat/SessionTimeout" HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout" +# Cluster-level metric name format — consistent with kafkapython and Java/Node.js/.NET agents. +KAFKA_CLUSTER_METRIC_PRODUCE = "MessageBroker/Kafka/Cluster/{0}/Topic/{1}/Produce" +KAFKA_CLUSTER_METRIC_CONSUME = "MessageBroker/Kafka/Cluster/{0}/Topic/{1}/Consume" + + +# Module-level cache: bootstrap_servers_string → cluster_id. +# Prevents multiple producers/consumers sharing the same brokers from launching +# concurrent list_topics() calls. Empty-string sentinel marks an in-flight fetch. +_nr_cluster_id_cache = {} +_nr_cluster_id_cache_lock = threading.Lock() + + +def _fetch_cluster_id(instance): + """Fetch cluster ID using the instance's own librdkafka connection (no extra auth needed). + + list_topics() reuses the existing authenticated connection — no separate AdminClient. + Runs in a daemon thread to avoid blocking the hot path. + + A module-level cache keyed by bootstrap.servers ensures only one fetch is ever + in-flight per unique broker set, regardless of how many producers/consumers are + created with those brokers. + """ + servers = getattr(instance, "_nr_bootstrap_servers", None) + cache_key = ",".join(servers) if servers else None + + if cache_key: + with _nr_cluster_id_cache_lock: + cached = _nr_cluster_id_cache.get(cache_key) + if cached: + # Already fully resolved — reuse directly. + instance._nr_cluster_id = cached + return + if cached is not None: + # Sentinel "" means a fetch is already in-flight; skip duplicate spawn. + # The produce/consume path will read from cache once it resolves. + return + # Set sentinel to prevent duplicate fetches while this one is in-flight. + _nr_cluster_id_cache[cache_key] = "" + + def _run(): + try: + meta = instance.list_topics(timeout=5) + cluster_id = getattr(meta, "cluster_id", None) + if cluster_id: + instance._nr_cluster_id = cluster_id + if cache_key: + with _nr_cluster_id_cache_lock: + _nr_cluster_id_cache[cache_key] = cluster_id + except Exception: + # Remove sentinel on failure so a future instance can retry. + if cache_key: + with _nr_cluster_id_cache_lock: + _nr_cluster_id_cache.pop(cache_key, None) + + threading.Thread(target=_run, daemon=True, name="NR-Kafka-ClusterId").start() + def wrap_Producer_produce(wrapped, instance, args, kwargs): transaction = current_transaction() @@ -63,6 +120,20 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs): for server_name in instance._nr_bootstrap_servers: transaction.record_custom_metric(f"MessageBroker/Kafka/Nodes/{server_name}/Produce/{topic}", 1) + # Resolve cluster_id lazily: instance._nr_cluster_id is set at construction time, + # but instances created while a fetch was in-flight won't have it yet. Fall back to + # the module-level cache so they pick it up once the background thread resolves. + cluster_id = getattr(instance, "_nr_cluster_id", None) + if not cluster_id and hasattr(instance, "_nr_bootstrap_servers"): + _cache_key = ",".join(instance._nr_bootstrap_servers) + cluster_id = _nr_cluster_id_cache.get(_cache_key) or None + if cluster_id: + instance._nr_cluster_id = cluster_id # cache on instance for future calls + if cluster_id: + transaction.record_custom_metric( + KAFKA_CLUSTER_METRIC_PRODUCE.format(cluster_id, topic), 1 + ) + with MessageTrace( library="Kafka", operation="Produce", destination_type="Topic", destination_name=topic, source=wrapped ): @@ -171,6 +242,16 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs): transaction.record_custom_metric( f"MessageBroker/Kafka/Nodes/{server_name}/Consume/{destination_name}", 1 ) + cluster_id = getattr(instance, "_nr_cluster_id", None) + if not cluster_id and hasattr(instance, "_nr_bootstrap_servers"): + _cache_key = ",".join(instance._nr_bootstrap_servers) + cluster_id = _nr_cluster_id_cache.get(_cache_key) or None + if cluster_id: + instance._nr_cluster_id = cluster_id + if cluster_id: + transaction.record_custom_metric( + KAFKA_CLUSTER_METRIC_CONSUME.format(cluster_id, destination_name), 1 + ) transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka")) return record @@ -213,6 +294,20 @@ def wrap_SerializingProducer_init(wrapped, instance, args, kwargs): if hasattr(instance, "_value_serializer") and callable(instance._value_serializer): instance._value_serializer = wrap_serializer("Serialization/Value", "MessageBroker")(instance._value_serializer) + # Set _nr_bootstrap_servers before calling _fetch_cluster_id so that the cache + # key is populated and deduplication works correctly. Without this the fetch + # runs with servers=None → cache_key=None → no sentinel → no dedup → every + # SerializingProducer construction spawns an uncached AdminClient thread. + try: + conf = kwargs.get("conf") or (args[0] if args else {}) + servers = conf.get("bootstrap.servers") if isinstance(conf, dict) else None + if servers: + instance._nr_bootstrap_servers = servers.split(",") + except Exception: + pass + + _fetch_cluster_id(instance) + def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) @@ -223,6 +318,17 @@ def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs): if hasattr(instance, "_value_deserializer") and callable(instance._value_deserializer): instance._value_deserializer = wrap_serializer("Deserialization/Value", "Message")(instance._value_deserializer) + # Same fix as wrap_SerializingProducer_init — set bootstrap servers before fetch. + try: + conf = kwargs.get("conf") or (args[0] if args else {}) + servers = conf.get("bootstrap.servers") if isinstance(conf, dict) else None + if servers: + instance._nr_bootstrap_servers = servers.split(",") + except Exception: + pass + + _fetch_cluster_id(instance) + def wrap_Producer_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) @@ -236,6 +342,8 @@ def wrap_Producer_init(wrapped, instance, args, kwargs): except Exception: pass + _fetch_cluster_id(instance) + def wrap_Consumer_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) @@ -249,6 +357,8 @@ def wrap_Consumer_init(wrapped, instance, args, kwargs): except Exception: pass + _fetch_cluster_id(instance) + def wrap_immutable_class(module, class_name): # Wrap immutable binary extension class with a mutable Python subclass diff --git a/newrelic/hooks/messagebroker_kafkapython.py b/newrelic/hooks/messagebroker_kafkapython.py index ed0acf60ef..6f5d206396 100644 --- a/newrelic/hooks/messagebroker_kafkapython.py +++ b/newrelic/hooks/messagebroker_kafkapython.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import sys +import threading from kafka.serializer import Serializer @@ -31,6 +32,28 @@ HEARTBEAT_SESSION_TIMEOUT = "MessageBroker/Kafka/Heartbeat/SessionTimeout" HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout" +# Cluster-level metric name format — consistent with Java/Node.js/.NET agents. +# Mirrors the HEARTBEAT_* constant pattern above; avoids inline f-string construction +# in the hot path and gives reviewers a single place to verify the metric name. +# Full pattern: KAFKA_CLUSTER_METRIC_PRODUCE.format(cluster_id, topic_name) +KAFKA_CLUSTER_METRIC_PRODUCE = "MessageBroker/Kafka/Cluster/{0}/Topic/{1}/Produce" +KAFKA_CLUSTER_METRIC_CONSUME = "MessageBroker/Kafka/Cluster/{0}/Topic/{1}/Consume" + +# Per-bootstrap-servers cluster ID cache. +# kafka-python 2.x only negotiates MetadataRequest v1 (no cluster_id field), so the +# passive ClusterMetadata.update_metadata approach doesn't work. Instead, a background +# KafkaAdminClient thread fetches a higher-version metadata response that includes +# cluster_id. Key: sorted "host:port,..." string. Value: cluster UUID or "" sentinel. +_kafka_cluster_id_cache = {} +_nr_cluster_id_cache_lock = threading.Lock() + + +def _bootstrap_cache_key(bootstrap_servers): + """Normalize bootstrap_servers (str or iterable) to the cluster-ID cache key.""" + if isinstance(bootstrap_servers, str): + bootstrap_servers = [bootstrap_servers] + return ",".join(sorted(str(s) for s in bootstrap_servers)) + def _bind_send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None): return topic, value, key, headers, partition, timestamp_ms @@ -66,6 +89,21 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs): if hasattr(instance, "config"): for server_name in instance.config.get("bootstrap_servers", []): transaction.record_custom_metric(f"MessageBroker/Kafka/Nodes/{server_name}/Produce/{topic}", 1) + + # cluster_id is captured passively from ClusterMetadata.update_metadata (the + # metadata the client already fetches). Look up by bootstrap servers. + servers = instance.config.get("bootstrap_servers", []) if hasattr(instance, "config") else [] + cluster_id = None + if servers: + # Use _cache_key (not `key`) to avoid overwriting the Kafka message routing key + # extracted from _bind_send above. + _cache_key = _bootstrap_cache_key(servers) + cluster_id = _kafka_cluster_id_cache.get(_cache_key) or None + if cluster_id: + transaction.record_custom_metric( + KAFKA_CLUSTER_METRIC_PRODUCE.format(cluster_id, topic), 1 + ) + try: return wrapped( topic, value=value, key=key, headers=dt_headers, partition=partition, timestamp_ms=timestamp_ms @@ -82,39 +120,14 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): try: record = wrapped(*args, **kwargs) except Exception as e: - # StopIteration is an expected error, indicating the end of an iterable, - # that should not be captured. if not isinstance(e, StopIteration): if current_transaction(): - # Report error on existing transaction if there is one notice_error() else: - # Report error on application notice_error(application=application_instance(activate=False)) raise if record: - # This iterator can be called either outside of a transaction, or - # within the context of an existing transaction. There are 3 - # possibilities we need to handle: (Note that this is similar to - # our Pika and Celery instrumentation) - # - # 1. In an inactive transaction - # - # If the end_of_transaction() or ignore_transaction() API - # calls have been invoked, this iterator may be called in the - # context of an inactive transaction. In this case, don't wrap - # the iterator in any way. Just run the original iterator. - # - # 2. In an active transaction - # - # Do nothing. - # - # 3. Outside of a transaction - # - # Since it's not running inside of an existing transaction, we - # want to create a new background transaction for it. - library = "Kafka" destination_type = "Topic" destination_name = record.topic @@ -137,7 +150,6 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): instance._nr_transaction = transaction transaction.__enter__() - # Obtain consumer client_id to send up as agent attribute if hasattr(instance, "config") and "client_id" in instance.config: client_id = instance.config["client_id"] transaction._add_agent_attribute("kafka.consume.client_id", client_id) @@ -145,11 +157,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): transaction._add_agent_attribute("kafka.consume.byteCount", received_bytes) transaction = current_transaction() - if transaction: # If there is an active transaction now. - # Add metrics whether or not a transaction was already active, or one was just started. - # Don't add metrics if there was an inactive transaction. - # Name the metrics using the same format as the transaction, but in case the active transaction - # was an existing one and not a message transaction, reproduce the naming logic here. + if transaction: group = f"Message/{library}/{destination_type}" name = f"Named/{destination_name}" transaction.record_custom_metric(f"{group}/{name}/Received/Bytes", received_bytes) @@ -159,6 +167,19 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): transaction.record_custom_metric( f"MessageBroker/Kafka/Nodes/{server_name}/Consume/{destination_name}", 1 ) + + # cluster_id is captured passively from ClusterMetadata.update_metadata + # (the metadata the client already fetches) and stored in _kafka_cluster_id_cache. + servers = instance.config.get("bootstrap_servers", []) if hasattr(instance, "config") else [] + cluster_id = None + if servers: + _cache_key = _bootstrap_cache_key(servers) + cached = _kafka_cluster_id_cache.get(_cache_key) + cluster_id = cached if cached else None + if cluster_id: + transaction.record_custom_metric( + KAFKA_CLUSTER_METRIC_CONSUME.format(cluster_id, destination_name), 1 + ) transaction.add_messagebroker_info( "Kafka-Python", get_package_version("kafka-python") or get_package_version("kafka-python-ng") ) @@ -166,6 +187,59 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): return record +def _fetch_cluster_id_kafka_python(bootstrap_servers): + """Fetch Kafka cluster UUID via KafkaAdminClient in a daemon thread. + + kafka-python 2.x negotiates MetadataRequest v1 which does not include + cluster_id. KafkaAdminClient.describe_cluster() sends a higher-version + request that does include it. Runs in a daemon thread — non-fatal if it + fails (cluster metrics are simply omitted for this session). + """ + cache_key = _bootstrap_cache_key(bootstrap_servers) + with _nr_cluster_id_cache_lock: + if _kafka_cluster_id_cache.get(cache_key) is not None: + return + _kafka_cluster_id_cache[cache_key] = "" # sentinel to prevent duplicate fetches + + def _run(): + try: + from kafka.admin import KafkaAdminClient + admin = KafkaAdminClient( + bootstrap_servers=bootstrap_servers, + client_id="newrelic-cluster-id-probe", + api_version_auto_timeout_ms=5000, + ) + meta = admin._client.cluster + # Force a metadata refresh and wait briefly + admin._client.poll(timeout_ms=3000) + # Try to get cluster_id from the highest available MetadataResponse version + # by using describe_cluster if available, or fall back to admin internals. + cluster_id = None + try: + result = admin.describe_cluster() + # describe_cluster() returns a dict: {"cluster_id": "...", "brokers": [...], ...} + cluster_id = result.get("cluster_id") if isinstance(result, dict) else ( + getattr(result, "cluster_id", None) or getattr(result, "clusterId", None) + ) + except Exception: + pass + if not cluster_id: + cluster_id = getattr(meta, "cluster_id", None) or getattr(meta, "_cluster_id", None) + admin.close() + if cluster_id: + with _nr_cluster_id_cache_lock: + _kafka_cluster_id_cache[cache_key] = cluster_id + else: + # Remove sentinel so future instances can retry + with _nr_cluster_id_cache_lock: + _kafka_cluster_id_cache.pop(cache_key, None) + except Exception: + with _nr_cluster_id_cache_lock: + _kafka_cluster_id_cache.pop(cache_key, None) + + threading.Thread(target=_run, daemon=True, name="NR-KafkaPython-ClusterId").start() + + def wrap_KafkaProducer_init(wrapped, instance, args, kwargs): get_config_key = lambda key: kwargs.get(key, instance.DEFAULT_CONFIG[key]) # noqa: E731 @@ -176,7 +250,14 @@ def wrap_KafkaProducer_init(wrapped, instance, args, kwargs): instance, "Serialization/Value", "MessageBroker", get_config_key("value_serializer") ) - return wrapped(*args, **kwargs) + result = wrapped(*args, **kwargs) + + # Kick off background cluster ID fetch now that the producer is constructed + servers = instance.config.get("bootstrap_servers", []) + if servers: + _fetch_cluster_id_kafka_python(servers) + + return result class NewRelicSerializerWrapper(ObjectProxy): @@ -211,7 +292,6 @@ def _wrap_serializer(wrapped, instance, args, kwargs): if isinstance(transaction, MessageTransaction): topic = transaction.destination_name else: - # Find parent message trace to retrieve topic message_trace = current_trace() while message_trace is not None and not isinstance(message_trace, MessageTrace): message_trace = message_trace.parent @@ -224,17 +304,14 @@ def _wrap_serializer(wrapped, instance, args, kwargs): return FunctionTraceWrapper(wrapped, name=name, group=group)(*args, **kwargs) try: - # Apply wrapper to serializer if serializer is None: - # Do nothing return serializer elif isinstance(serializer, Serializer): return NewRelicSerializerWrapper(serializer, group_prefix=group_prefix, serializer_name=serializer_name) else: - # Wrap callable in wrapper return _wrap_serializer(serializer) except Exception: - return serializer # Avoid crashes from immutable serializers + return serializer def metric_wrapper(metric_name, check_result=False): @@ -244,8 +321,6 @@ def _metric_wrapper(wrapped, instance, args, kwargs): application = application_instance(activate=False) if application: if not check_result or (check_result and result): - # If the result does not need validated, send metric. - # If the result does need validated, ensure it is True. application.record_custom_metric(metric_name, 1) return result @@ -253,14 +328,27 @@ def _metric_wrapper(wrapped, instance, args, kwargs): return _metric_wrapper +def instrument_kafka_cluster(module): + pass # cluster_id is now fetched via _fetch_cluster_id_kafka_python in wrap_KafkaProducer_init + + def instrument_kafka_producer(module): if hasattr(module, "KafkaProducer"): wrap_function_wrapper(module, "KafkaProducer.__init__", wrap_KafkaProducer_init) wrap_function_wrapper(module, "KafkaProducer.send", wrap_KafkaProducer_send) +def _wrap_KafkaConsumer_init(wrapped, instance, args, kwargs): + result = wrapped(*args, **kwargs) + servers = instance.config.get("bootstrap_servers", []) + if servers: + _fetch_cluster_id_kafka_python(servers) + return result + + def instrument_kafka_consumer_group(module): if hasattr(module, "KafkaConsumer"): + wrap_function_wrapper(module, "KafkaConsumer.__init__", _wrap_KafkaConsumer_init) wrap_function_wrapper(module, "KafkaConsumer.__next__", wrap_kafkaconsumer_next) diff --git a/tests/messagebroker_confluentkafka/test_consumer.py b/tests/messagebroker_confluentkafka/test_consumer.py index 6eadb49edd..cff7126a02 100644 --- a/tests/messagebroker_confluentkafka/test_consumer.py +++ b/tests/messagebroker_confluentkafka/test_consumer.py @@ -182,3 +182,34 @@ def expected_broker_metrics(broker, topic): @pytest.fixture def expected_missing_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Consume/{topic}", None) for server in broker.split(",")] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric and attribute tests (confluent-kafka) +# --------------------------------------------------------------------------- + +@pytest.fixture +def consumer_with_cluster_id(consumer, broker): + """Set _nr_cluster_id directly on the consumer instance for deterministic tests.""" + test_cluster_id = "confluent-consumer-cluster-test" + consumer._nr_cluster_id = test_cluster_id + if not hasattr(consumer, "_nr_bootstrap_servers"): + consumer._nr_bootstrap_servers = broker.split(",") + yield consumer, test_cluster_id + + +def test_cluster_consume_metric(topic, get_consumer_record, consumer_with_cluster_id, client_type): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Consume appears after poll().""" + _, cluster_id = consumer_with_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Consume" + + @validate_transaction_metrics( + f"Named/{topic}", + group="Message/Kafka/Topic", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + def _test(): + get_consumer_record() + + _test() diff --git a/tests/messagebroker_confluentkafka/test_producer.py b/tests/messagebroker_confluentkafka/test_producer.py index 14bb7535e0..e577bb42d5 100644 --- a/tests/messagebroker_confluentkafka/test_producer.py +++ b/tests/messagebroker_confluentkafka/test_producer.py @@ -152,3 +152,36 @@ def test(): @pytest.fixture def expected_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Produce/{topic}", 1) for server in broker.split(",")] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric tests (confluent-kafka) +# --------------------------------------------------------------------------- + +@pytest.fixture +def producer_with_cluster_id(producer, broker): + """Set _nr_cluster_id directly on the producer instance, bypassing the + async daemon-thread fetch so metric tests are deterministic and fast.""" + test_cluster_id = "confluent-cluster-test-999" + producer._nr_cluster_id = test_cluster_id + # Also need bootstrap servers so the Nodes metrics fire correctly + if not hasattr(producer, "_nr_bootstrap_servers"): + producer._nr_bootstrap_servers = broker.split(",") + yield producer, test_cluster_id + + +def test_cluster_produce_metric(topic, producer_with_cluster_id, send_producer_message, client_type): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce appears after produce().""" + _, cluster_id = producer_with_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Produce" + + @validate_transaction_metrics( + "test_producer:test_cluster_produce_metric..test", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + @background_task() + def test(): + send_producer_message() + + test() diff --git a/tests/messagebroker_kafkapython/test_cluster_metrics_unit.py b/tests/messagebroker_kafkapython/test_cluster_metrics_unit.py new file mode 100644 index 0000000000..7b34aed0e1 --- /dev/null +++ b/tests/messagebroker_kafkapython/test_cluster_metrics_unit.py @@ -0,0 +1,190 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for cluster-ID additions in messagebroker_kafkapython. + +These tests exercise the wrapper functions directly with mocks — no real Kafka +broker required. They verify correctness of arguments passed to the underlying +`wrapped` callable without any network I/O. +""" + +from unittest.mock import MagicMock, patch + +from newrelic.hooks.messagebroker_kafkapython import ( + _bootstrap_cache_key, + _fetch_cluster_id_kafka_python, + _kafka_cluster_id_cache, + wrap_KafkaProducer_send, +) + + +# --------------------------------------------------------------------------- +# PY-1 regression: wrap_KafkaProducer_send must not overwrite the Kafka +# message routing key with the broker address string. +# --------------------------------------------------------------------------- + +class TestProducerSendKeyPreservation: + """The Kafka message routing key must survive the wrap_KafkaProducer_send + instrumentation unchanged, regardless of whether cluster ID is cached.""" + + def _make_producer_instance(self, bootstrap_servers=None): + instance = MagicMock() + instance.config = { + "bootstrap_servers": bootstrap_servers or ["broker1:9092", "broker2:9092"], + } + return instance + + def _bind_send_args(self, topic, value=None, key=None, headers=None): + """Return positional args as wrap_KafkaProducer_send receives them.""" + return (topic,), {"value": value, "key": key, "headers": headers or []} + + def test_message_key_not_overwritten_with_cluster_id_in_cache(self): + """Key must not be replaced by broker address string when cluster ID cached.""" + cluster_id = "test-cluster-uuid" + cache_key = "broker1:9092,broker2:9092" + _kafka_cluster_id_cache[cache_key] = cluster_id + + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance() + args, kwargs = self._bind_send_args("my-topic", value=b"v", key=b"original-key") + + try: + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = MagicMock() # active transaction + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + finally: + _kafka_cluster_id_cache.pop(cache_key, None) + + # The wrapped callable must have been called with key=b"original-key", + # not key="broker1:9092,broker2:9092" or any other broker-derived string. + assert wrapped.called, "wrapped() was never called" + call_kwargs = wrapped.call_args[1] + assert call_kwargs["key"] == b"original-key", ( + f"Message key was corrupted: got {call_kwargs['key']!r}, " + f"expected b'original-key'. Likely cause: cache lookup variable " + f"reused the name 'key', overwriting the Kafka routing key." + ) + + def test_message_key_not_overwritten_when_no_cluster_id_cached(self): + """Key must not be replaced even when cluster ID is not yet in the cache.""" + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance(bootstrap_servers=["broker-no-cache:9092"]) + args, kwargs = self._bind_send_args("topic", key="string-key-123") + + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = MagicMock() + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + + assert wrapped.called + assert wrapped.call_args[1]["key"] == "string-key-123", ( + "Message key corrupted even when cluster ID was not in cache." + ) + + def test_none_key_preserved(self): + """A None routing key must remain None (common case for unkeyed messages).""" + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance() + args, kwargs = self._bind_send_args("topic", key=None) + + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = MagicMock() + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + + assert wrapped.call_args[1]["key"] is None, "None key was corrupted." + + def test_no_transaction_bypasses_instrumentation(self): + """Without an active NR transaction, wrapped() is called with original args.""" + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance() + args = ("topic",) + kwargs = {"value": b"v", "key": b"my-key"} + + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = None # no active transaction + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + + # wrapped() called directly with original args — no instrumentation applied + assert wrapped.called + wrapped.assert_called_once_with(*args, **kwargs) + + +# --------------------------------------------------------------------------- +# Cluster ID is fetched via KafkaAdminClient.describe_cluster() in a +# background daemon thread — no passive MetadataResponse interception. +# --------------------------------------------------------------------------- + +class TestFetchClusterIdKafkaPython: + """_fetch_cluster_id_kafka_python populates the cache via KafkaAdminClient.""" + + def _cache_key(self, servers): + return _bootstrap_cache_key(servers) + + def test_cluster_id_written_to_cache_on_success(self): + """A successful describe_cluster() stores the cluster UUID in the module cache.""" + servers = ["broker-fetch1:9092"] + cache_key = self._cache_key(servers) + _kafka_cluster_id_cache.pop(cache_key, None) + + mock_admin = MagicMock() + mock_admin.describe_cluster.return_value = {"cluster_id": "fetched-uuid", "brokers": []} + + with patch("kafka.admin.KafkaAdminClient", return_value=mock_admin): + _fetch_cluster_id_kafka_python(servers) + # Allow the daemon thread to finish + import time; time.sleep(0.5) + + try: + assert _kafka_cluster_id_cache.get(cache_key) == "fetched-uuid" + finally: + _kafka_cluster_id_cache.pop(cache_key, None) + + def test_sentinel_prevents_duplicate_fetches(self): + """While a fetch is in-flight (sentinel ''), a second call does not spawn a thread.""" + servers = ["broker-sentinel:9092"] + cache_key = self._cache_key(servers) + _kafka_cluster_id_cache[cache_key] = "" # pre-set sentinel + + with patch("newrelic.hooks.messagebroker_kafkapython.threading.Thread") as mock_thread: + _fetch_cluster_id_kafka_python(servers) + mock_thread.assert_not_called() + + _kafka_cluster_id_cache.pop(cache_key, None) + + def test_existing_resolved_entry_not_refetched(self): + """A fully-resolved cache entry (non-empty) skips the fetch and returns immediately.""" + servers = ["broker-resolved:9092"] + cache_key = self._cache_key(servers) + _kafka_cluster_id_cache[cache_key] = "already-resolved-uuid" + + with patch("newrelic.hooks.messagebroker_kafkapython.threading.Thread") as mock_thread: + _fetch_cluster_id_kafka_python(servers) + mock_thread.assert_not_called() + + _kafka_cluster_id_cache.pop(cache_key, None) + + def test_sentinel_removed_on_failure(self): + """If KafkaAdminClient raises, the sentinel is removed so future instances can retry.""" + servers = ["broker-fail:9092"] + cache_key = self._cache_key(servers) + _kafka_cluster_id_cache.pop(cache_key, None) + + # Raise at construction time — this hits the outer except in _run() which + # removes the sentinel. Raising only on connect() wouldn't work because + # MagicMock.describe_cluster() auto-creates a truthy return value, causing + # the cache to be populated with a MagicMock instead of being cleared. + with patch("kafka.admin.KafkaAdminClient", side_effect=Exception("connection refused")): + _fetch_cluster_id_kafka_python(servers) + import time; time.sleep(0.3) + + assert _kafka_cluster_id_cache.get(cache_key) is None diff --git a/tests/messagebroker_kafkapython/test_consumer.py b/tests/messagebroker_kafkapython/test_consumer.py index e53bc4ff7c..ac08760ced 100644 --- a/tests/messagebroker_kafkapython/test_consumer.py +++ b/tests/messagebroker_kafkapython/test_consumer.py @@ -186,3 +186,38 @@ def expected_broker_metrics(broker, topic): @pytest.fixture def expected_missing_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Consume/{topic}", None) for server in broker] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric and attribute tests +# --------------------------------------------------------------------------- + +@pytest.fixture +def seeded_cluster_id(broker): + """Pre-seed the cluster-ID cache so metric tests are deterministic.""" + from newrelic.hooks.messagebroker_kafkapython import _kafka_cluster_id_cache + + cache_key = ",".join(sorted(broker)) + test_cluster_id = "test-cluster-consumer-xyz" + _kafka_cluster_id_cache[cache_key] = test_cluster_id + yield test_cluster_id + _kafka_cluster_id_cache.pop(cache_key, None) + + +def test_cluster_consume_metric(get_consumer_record, topic, broker, seeded_cluster_id): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Consume appears after a poll.""" + cluster_id = seeded_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Consume" + + @validate_transaction_metrics( + f"Named/{topic}", + group="Message/Kafka/Topic", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + def _test(): + get_consumer_record() + + _test() + + diff --git a/tests/messagebroker_kafkapython/test_producer.py b/tests/messagebroker_kafkapython/test_producer.py index 684807be8b..c28afc6cc8 100644 --- a/tests/messagebroker_kafkapython/test_producer.py +++ b/tests/messagebroker_kafkapython/test_producer.py @@ -101,3 +101,42 @@ def test(): @pytest.fixture def expected_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Produce/{topic}", 1) for server in broker] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric tests +# --------------------------------------------------------------------------- + +@pytest.fixture +def seeded_cluster_id(broker): + """Pre-seed the cluster-ID cache with a known value so tests are deterministic. + + The real cluster-ID fetch is async; seeding avoids flaky timing issues + in the test suite while still exercising the metric-recording code path. + """ + from newrelic.hooks.messagebroker_kafkapython import _kafka_cluster_id_cache + + cache_key = ",".join(sorted(broker)) + test_cluster_id = "test-cluster-abc123" + _kafka_cluster_id_cache[cache_key] = test_cluster_id + yield test_cluster_id + _kafka_cluster_id_cache.pop(cache_key, None) + + +def test_cluster_produce_metric(topic, send_producer_message, broker, seeded_cluster_id): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce appears after a send.""" + cluster_id = seeded_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Produce" + + @validate_transaction_metrics( + "test_producer:test_cluster_produce_metric..test", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + @background_task() + def test(): + send_producer_message() + + test() + +