Skip to content

feat(hydro_test): Example of produce and consume from Kafka#2805

Open
akainth015 wants to merge 1 commit into
hydro-project:mainfrom
akainth015:feat/kafka-example
Open

feat(hydro_test): Example of produce and consume from Kafka#2805
akainth015 wants to merge 1 commit into
hydro-project:mainfrom
akainth015:feat/kafka-example

Conversation

@akainth015

@akainth015 akainth015 commented Apr 23, 2026

Copy link
Copy Markdown
Member

This creates a Kafka example that can do produce and/or consume and print basic throughput information. The performance is not perfect yet, but we'll make changes to the Hydro API that let us do even better.

@akainth015 akainth015 force-pushed the feat/kafka-example branch 6 times, most recently from 0873fbe to ff3c338 Compare April 29, 2026 22:08
@Benjscho Benjscho self-requested a review April 29, 2026 22:27
@akainth015 akainth015 force-pushed the feat/kafka-example branch 7 times, most recently from 82d91db to 9427980 Compare May 1, 2026 17:30
@akainth015 akainth015 changed the title Kafka example Example of produce and consume from Kafka May 1, 2026
@akainth015 akainth015 marked this pull request as ready for review May 1, 2026 18:10
@akainth015 akainth015 requested a review from a team May 1, 2026 18:10
@akainth015 akainth015 force-pushed the feat/kafka-example branch from 9427980 to d05a2d3 Compare May 1, 2026 18:11
@akainth015 akainth015 changed the title Example of produce and consume from Kafka feat: Example of produce and consume from Kafka May 1, 2026
Comment thread hydro_test/Cargo.toml
# https://github.com/GitoxideLabs/cargo-smart-release/issues/36
example_test = { path = "../example_test", version = "^0.0.0", optional = true }
hydro_build_utils = { path = "../hydro_build_utils", version = "^0.0.1", optional = true }
rdkafka = { version = "0.39.0", optional = true, features = ["cmake-build", "ssl-vendored"] }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an option to use aws-lc-sys here? Then wouldn't need to install libcurl?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shadaj found this open PR to fix the issue, but it hasn't gotten traction. That's the reason we're seeing the issue with curl.h even though we're not using curl

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol do we know anyone at confluent who we can ping to merge this PR?

Comment thread rust-toolchain.toml
"clippy",
# https://github.com/dtolnay/trybuild?tab=readme-ov-file#troubleshooting
"rust-src",
"llvm-tools",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this needed for?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linker tools for the statically-linked libraries that get pulled in by my changes

- Add hydro_test/src/kafka/mod.rs with kafka_producer, kafka_consumer,
  dest_kafka, and setup_topic helpers following the SQS PR hydro-project#2746 pattern
- Complete hydro_test/examples/kafka.rs: leader produces 1M financial
  transactions, consumer cluster computes per-account balances
- Add 'kafka' feature flag gating rdkafka and futures-util as optional
  deps
- Add llvm-tools component to rust-toolchain.toml for rust-lld
@akainth015 akainth015 force-pushed the feat/kafka-example branch from d05a2d3 to a9cb533 Compare May 4, 2026 15:40
- name: Install nightly Rust channel
run: rustup toolchain add nightly

- name: Install libcurl dev (needed by rdkafka-sys)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- name: Install libcurl dev (needed by rdkafka-sys)
- name: Install libcurl dev (needed by rdkafka-sys) (REMOVE AFTER https://github.com/confluentinc/librdkafka/pull/5230)

Comment on lines +923 to +926
"metadata_options": {
"http_tokens": "required",
"http_endpoint": "enabled"
},

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"metadata_options": {
"http_tokens": "required",
"http_endpoint": "enabled"
},
// Required for Kafka (?)
"metadata_options": {
"http_tokens": "required",
"http_endpoint": "enabled"
},

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this configures the instance to use IMDSv2. Every EC2 instance should be using IMDSv2 now.

std::thread::spawn(move || {
handle.block_on(setup_topic(&brokers, &topic, num_partitions, &security_protocol));
})
.join()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would spawn_blocking work here?

#[arg(long, default_value = "m7i.large")]
aws_instance_type: String,

/// AWS AMI ID (Amazon Linux 2)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use AL2023 since AL2 is EOL in June, it would also be nice if we could use the ssm parameter to resolve the default AMI instead of pinning to an image: https://docs.aws.amazon.com/linux/al2023/ug/ec2.html#launch-via-aws-cli

Comment on lines +36 to +38
/// Run mode: "produce" (produce only, prints topic name), "consume" (consume only, requires --topic), or "both" (default)
#[arg(long, default_value = "both")]
mode: String,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: enum for Mode

Comment on lines +65 to +68
brokers: &'a str,
group_id: &'a str,
topic: &'a str,
security_protocol: &'a str,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a consumer pov I'd think providing a ClientConfig would be more flexible here

)) => {
producer.poll(std::time::Duration::from_millis(100));
}
Err((e, _)) => panic!("Failed to send message to Kafka: {}", e),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect that we don't want to panic in lib code, I'd like to talk about how we surface errs operationally

Comment on lines +165 to +178
sent.for_each(q!({
let count = std::cell::Cell::new(0usize);
move |producer| {
let c = count.get() + 1;
count.set(c);
if c >= num_messages {
rdkafka::producer::Producer::flush(
&*producer,
std::time::Duration::from_secs(30),
)
.expect("Failed to flush producer");
println!("PRODUCE_DONE {}", c);
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if rdkafka exposes async endpoints, but would be best to put this in a dest_sink, Sink could handle flushing directly

@shadaj shadaj changed the title feat: Example of produce and consume from Kafka feat(hydro_test): Example of produce and consume from Kafka May 4, 2026
@MingweiSamuel MingweiSamuel force-pushed the main branch 2 times, most recently from a15a670 to e70eab6 Compare June 11, 2026 18:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants