Skip to content

feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747

Draft
shashank-reddy-nr wants to merge 11 commits into
newrelic:mainfrom
shashank-reddy-nr:feat/kafka-cluster-id
Draft

feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747
shashank-reddy-nr wants to merge 11 commits into
newrelic:mainfrom
shashank-reddy-nr:feat/kafka-cluster-id

Conversation

@shashank-reddy-nr

Copy link
Copy Markdown

What

Adds per-cluster, per-topic metrics for both kafka-python-ng and confluent-kafka.

kafka-python-ng (messagebroker_kafkapython.py)

  • _fetch_kafka_cluster_id(): uses KafkaAdminClient.describe_cluster() in a daemon
    thread with double-checked-locking + sentinel ("") to prevent duplicate spawns.
  • Full config passed via _ADMIN_CLIENT_ALLOWED_KEYS allowlist — SASL/SSL
    credentials inherited, consumer-only keys excluded to prevent TypeError.
  • wrap_KafkaConsumer_init_cluster: proactive cluster ID fetch on KafkaConsumer
    init (symmetric with producer).
  • Cluster metric recorded in both wrap_KafkaProducer_send and
    wrap_kafkaconsumer_next using _cache_key (not key) — no message routing
    key corruption.

confluent-kafka (messagebroker_confluentkafka.py)

  • _fetch_cluster_id() already uses instance.list_topics() — extended to set
    _nr_bootstrap_servers before fetch for SerializingProducer and
    DeserializingConsumer (was missing, caused uncached thread spawns).

Tests

  • Unit: test_cluster_metrics_unit.py — key preservation regression, auth config
    allowlist, mock target uses kafka.KafkaAdminClient.
  • Integration: kafka-python-producer and kafka-python-consumer validated in
    NR staging (account 11833718).

Before contributing, please read our contributing guidelines and code of conduct.

Overview

Describe the changes present in the pull request

Related Github Issue

Include a link to the related GitHub issue, if applicable

Testing

The agent includes a suite of tests which should be used to
verify your changes don't break existing functionality. These tests will run with
Github Actions when a pull request is made. More details on running the tests locally can be found in our
testing guidelines,
For most contributions it is strongly recommended to add additional tests which
exercise your changes.

Adds per-cluster, per-topic produce and consume metrics that uniquely identify
Kafka clusters by their UUID (cluster ID). These complement the existing per-node
MessageBroker/Kafka/Nodes/{server}/... metrics by collapsing all broker addresses
of the same cluster into a single metric, enabling cluster-level throughput
analysis across MSK, Confluent Cloud, and self-hosted Kafka.

Metric format:
  MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Produce
  MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Consume

The cluster ID is fetched automatically using the client's own authenticated
connection — no extra configuration or credentials needed.

Also includes:
- Unit and integration tests for all new code paths
- Bug fixes identified in code review (volatile fields, thread-safety,
  per-message vs per-poll counting, auth config passthrough)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@shashank-reddy-nr shashank-reddy-nr requested a review from a team as a code owner June 1, 2026 10:44
@shashank-reddy-nr shashank-reddy-nr marked this pull request as draft June 1, 2026 10:52
@mergify mergify Bot added the tests-failing Tests failing in CI. label Jun 1, 2026
@codecov-commenter

codecov-commenter commented Jun 1, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 74.26471% with 35 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.00%. Comparing base (949339a) to head (de6c3fc).
⚠️ Report is 16 commits behind head on main.

Files with missing lines Patch % Lines
newrelic/hooks/messagebroker_confluentkafka.py 67.69% 12 Missing and 9 partials ⚠️
newrelic/hooks/messagebroker_kafkapython.py 80.00% 7 Missing and 7 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1747      +/-   ##
==========================================
- Coverage   82.08%   82.00%   -0.09%     
==========================================
  Files         215      215              
  Lines       26309    26543     +234     
  Branches     4150     4193      +43     
==========================================
+ Hits        21596    21766     +170     
- Misses       3301     3339      +38     
- Partials     1412     1438      +26     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

shashank-reddy-nr and others added 2 commits June 1, 2026 17:51
- .NET: Replace System.Text.Json (unavailable on net462 without NuGet) with
  simple string extraction for newrelic header parsing — no external dependency
- Node.js: Fix lint violations — empty catch blocks now log debug messages,
  extract injectHeaders() helper to reduce cognitive complexity, remove unused
  variables from test, add JSDoc @param description
- Python: Remove proactive cluster ID fetch from KafkaConsumer.__init__ to fix
  race condition where daemon thread overwrote seeded test fixture values;
  reactive fetch on first consumed message is sufficient

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The seeded_cluster_id fixture pre-seeds _kafka_cluster_id_cache to make the
cluster-ID tests deterministic, but constructing a producer/consumer spawns a
daemon thread (_fetch_kafka_cluster_id) that calls describe_cluster() against the
test broker. That returns no cluster_id, so the thread pops the cache key and
deletes the seeded value. The GIL hid this race; on free-threaded builds the
thread runs in parallel and clobbers the seed, failing test_cluster_produce_metric
and test_cluster_id_attribute_on_transaction for the later parametrizations.

Add an autouse fixture that no-ops the async fetch for the integration tests so the
seeded value is the only writer. The real fetch remains covered by
test_cluster_metrics_unit.py, which the fixture skips.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mergify mergify Bot removed the tests-failing Tests failing in CI. label Jun 1, 2026
@hmstepanek hmstepanek self-requested a review June 1, 2026 22:10
The cluster ID belongs only in the metric name for relationship building.
It does not need to be set as a span/transaction attribute.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@mergify mergify Bot added the tests-failing Tests failing in CI. label Jun 4, 2026
…tale tests

Java:
- consumer-2.0.0: remove out-of-scope producer identity (accountId/appId)
  and group_id capture; restore DT header acceptance
- consumer-3.7.0: add DT header acceptance (was missing, regression vs 2.0.0);
  remove dead ClusterIdHelper.java (3.7.0 uses Weaver field access, not reflection)
- spring-kafka-2.2.0: revert to original (producer identity extraction out of scope)

Python:
- fix add_messagebroker_info regression: was incorrectly inside 'if cluster_id:'
  block, meaning library-version attribute dropped until cluster ID cached
- remove test_cluster_id_agent_attribute (consumer) and
  test_cluster_id_attribute_on_transaction (producer) — these tested a span
  attribute that was already removed from production code

Node.js:
- remove stale 'send injects kafka.cluster.id header' test — the header
  injection code was removed; this test was asserting the old behavior

.NET:
- replace undefined 'InternalApi.RecordMetric' with correct fully-qualified
  'NewRelic.Api.Agent.NewRelic.RecordMetric()' — the previous name would
  have been a compile error

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@mergify mergify Bot removed the tests-failing Tests failing in CI. label Jun 4, 2026
shashank-reddy-nr and others added 3 commits June 4, 2026 19:46
…ants

Java:
- Add HeadersWrapper.java to spans-consumer-3.7.0 — module imports it
  for DT header acceptance but source was missing (would not compile)
- Fix blank line in producer KafkaProducer_Instrumentation.java

Python:
- Add KAFKA_CLUSTER_METRIC_PRODUCE/CONSUME module-level constants in both
  messagebroker_kafkapython.py and messagebroker_confluentkafka.py,
  consistent with the existing HEARTBEAT_POLL = '...' pattern
- Use those constants instead of inline f-strings at metric recording sites

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
kafka-python 2.x negotiates MetadataRequest v1 with Kafka 3.x brokers,
which does not include cluster_id in the response. The passive
ClusterMetadata.update_metadata interception therefore never captured
the cluster ID.

Switch to an active approach: spawn a daemon thread that creates a
KafkaAdminClient with the same bootstrap servers and polls for a
metadata response. KafkaAdminClient negotiates a higher MetadataRequest
version that includes cluster_id. Non-fatal if it fails — metrics
simply omit the cluster ID for that session.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
describe_cluster() returns a dict, not an object. Use dict.get().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@mergify mergify Bot added the tests-failing Tests failing in CI. label Jun 4, 2026
shashank-reddy-nr and others added 3 commits June 5, 2026 01:03
When a second instance was created while a fetch was in-flight (cache
held sentinel ""), it returned without _nr_cluster_id being set and
never got it later. Fix: treat sentinel "" as 'in-flight, skip spawn'
and fall back to reading the cache lazily at produce/consume time so
instances created during the in-flight window pick up the value once
the background thread resolves.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
wrap_ClusterMetadata_update_metadata was removed when we switched from
passive MetadataResponse interception to active KafkaAdminClient fetch.
Replace TestClusterIdCapturedFromMetadata with TestFetchClusterIdKafkaPython
which tests _fetch_cluster_id_kafka_python with mocked KafkaAdminClient.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eanup

MagicMock auto-creates truthy return values for any attribute access
including describe_cluster().cluster_id, so raising only on connect()
caused the production code to store a MagicMock in the cache instead
of removing the sentinel. Raising at construction hits the outer except
which correctly pops the sentinel.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants