diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d271fa809d65..e6201da6b83a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,6 +64,10 @@ jobs: - if: matrix.rust_release == 'latest-stable' && matrix.os == 'ubuntu-latest' uses: ./.github/actions/use-sccache + - name: Install libcurl dev (needed by rdkafka-sys) + if: runner.os == 'Linux' + run: sudo apt-get update -qq && sudo apt-get install -y -qq libcurl4-openssl-dev > /dev/null + - run: cargo clippy --all-targets -- -D warnings - run: cargo clippy --all-targets --no-default-features -- -D warnings - run: cargo clippy --all-targets --all-features -- -D warnings diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index a017aeb5f097..97ce0e63a821 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -40,6 +40,9 @@ jobs: - name: Install nightly Rust channel run: rustup toolchain add nightly + - name: Install libcurl dev (needed by rdkafka-sys) + run: sudo apt-get update -qq && sudo apt-get install -y -qq libcurl4-openssl-dev > /dev/null + - env: RUSTDOCFLAGS: --cfg docsrs -Dwarnings run: cargo +nightly doc --no-deps --all-features diff --git a/Cargo.lock b/Cargo.lock index 53d54de90f11..6418953fb4cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2756,6 +2756,7 @@ dependencies = [ "dfir_rs", "example_test", "futures", + "futures-util", "hydro_build_utils", "hydro_deploy", "hydro_lang", @@ -2763,6 +2764,7 @@ dependencies = [ "include_mdtests", "palette", "rand 0.8.5", + "rdkafka", "regex", "serde", "serde_json", @@ -3354,6 +3356,18 @@ dependencies = [ "redox_syscall 0.7.3", ] +[[package]] +name = "libz-sys" +version = "1.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linear-map" version = "1.2.0" @@ -3690,6 +3704,28 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0bca838442ec211fa11de3a8b0e0e8f3a4522575b5c4c06ed722e005036f26" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "680998035259dcfcafe653688bf2aa6d3e2dc05e98be6ab46afb089dc84f1df8" +dependencies = [ + "proc-macro-crate 2.0.0", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -3771,6 +3807,28 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-src" +version = "300.6.0+3.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8e8cbfd3a4a8c8f089147fd7aaa33cf8c7450c4d09f8f80698a0cf093abeff4" +dependencies = [ + "cc", +] + +[[package]] +name = "openssl-sys" +version = "0.9.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13ce1245cd07fcc4cfdb438f7507b0c7e4f3849a69fd84d52374c66d83741bb6" +dependencies = [ + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + [[package]] name = "optfield" version = "0.4.0" @@ -4113,6 +4171,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "plain" version = "0.2.3" @@ -4517,6 +4581,38 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7956f9ac12b5712e50372d9749a3102f4810a8d42481c5eae3748d36d585bcf" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.10.0+2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e234cf318915c1059d4921ef7f75616b5219b10b46e9f3a511a15eb4b56a3f77" +dependencies = [ + "cmake", + "libc", + "libz-sys", + "num_enum", + "openssl-sys", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -6278,6 +6374,12 @@ dependencies = [ "variadics", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/build_docs.bash b/build_docs.bash index 825ff8120c01..52f9cc9d04e5 100755 --- a/build_docs.bash +++ b/build_docs.bash @@ -1,5 +1,7 @@ set -e +sudo apt-get update -qq && sudo apt-get install -y -qq libcurl4-openssl-dev > /dev/null + echo "=========================================" echo "Step 1/7: Downloading LLVM..." echo "=========================================" diff --git a/hydro_deploy/core/src/aws.rs b/hydro_deploy/core/src/aws.rs index fc24e4fbd7b1..d0d812811e10 100644 --- a/hydro_deploy/core/src/aws.rs +++ b/hydro_deploy/core/src/aws.rs @@ -47,6 +47,16 @@ pub struct NetworkResources { security_group: String, } +impl NetworkResources { + pub fn new(vpc: String, subnet: String, security_group: String) -> Self { + Self { + vpc, + subnet, + security_group, + } + } +} + #[derive(Debug)] pub struct AwsNetwork { pub region: String, @@ -910,6 +920,10 @@ echo -e "{cwa_config_esc}" > /opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwa "associate_public_ip_address": true, "iam_instance_profile": iam_instance_profile_ref, // May be `None`. "user_data": user_data_script, // May be `None`. + "metadata_options": { + "http_tokens": "required", + "http_endpoint": "enabled" + }, "tags": { "Name": instance_name } diff --git a/hydro_deploy/core/src/lib.rs b/hydro_deploy/core/src/lib.rs index 14f12c1b44f4..3db40e7083b1 100644 --- a/hydro_deploy/core/src/lib.rs +++ b/hydro_deploy/core/src/lib.rs @@ -29,7 +29,7 @@ pub mod azure; pub use azure::AzureHost; pub mod aws; -pub use aws::{AwsEc2Host, AwsNetwork}; +pub use aws::{AwsEc2Host, AwsNetwork, NetworkResources}; pub mod rust_crate; pub use rust_crate::RustCrate; diff --git a/hydro_test/Cargo.toml b/hydro_test/Cargo.toml index 1ae17a80e166..072f0cfa37d1 100644 --- a/hydro_test/Cargo.toml +++ b/hydro_test/Cargo.toml @@ -12,6 +12,11 @@ docker = ["hydro_lang/docker_deploy"] ecs = ["hydro_lang/ecs_deploy"] maelstrom = ["hydro_lang/maelstrom"] stageleft_macro_entrypoint = ["hydro_lang/stageleft_macro_entrypoint"] +kafka = ["dep:rdkafka", "dep:futures-util"] + +[[example]] +name = "kafka" +required-features = ["kafka"] [dependencies] hydro_lang = { path = "../hydro_lang", version = "^0.16.0" } @@ -33,6 +38,8 @@ bytes = "1.1.0" # https://github.com/GitoxideLabs/cargo-smart-release/issues/36 example_test = { path = "../example_test", version = "^0.0.1", optional = true } hydro_build_utils = { path = "../hydro_build_utils", version = "^0.1.0", optional = true } +rdkafka = { version = "0.39.0", optional = true, features = ["cmake-build", "ssl-vendored"] } +futures-util = { version = "0.3.0", default-features = false, features = ["alloc"], optional = true } [build-dependencies] stageleft_tool.workspace = true diff --git a/hydro_test/examples/kafka.rs b/hydro_test/examples/kafka.rs new file mode 100644 index 000000000000..bc7e04b15f83 --- /dev/null +++ b/hydro_test/examples/kafka.rs @@ -0,0 +1,319 @@ +use std::sync::Arc; + +use clap::{ArgAction, Parser}; +use hydro_deploy::aws::NetworkResources; +use hydro_deploy::{AwsNetwork, Deployment, Host, HostTargetType, LinuxCompileType}; +use hydro_lang::deploy::TrybuildHost; +use hydro_lang::live_collections::stream::{ExactlyOnce, TotalOrder}; +use hydro_lang::location::Location; +use hydro_lang::nondet::nondet; +use hydro_lang::viz::config::GraphConfig; +use hydro_test::kafka::{dest_kafka, kafka_consumer, kafka_producer}; +use stageleft::q; + +type HostCreator = Box Arc>; + +const TOPIC_PREFIX: &str = "financial_transactions"; + +// cargo run -p hydro_test --example kafka --features kafka -- --brokers 'localhost:9092' +#[derive(Parser, Debug)] +struct Args { + #[clap(flatten)] + graph: GraphConfig, + + /// Use AWS, make sure credentials are set up + #[arg(long, action = ArgAction::SetTrue)] + aws: bool, + + /// Kafka bootstrap servers + #[arg(long, default_value = "localhost:9092")] + brokers: String, + + /// Kafka security protocol (plaintext or SSL for MSK) + #[arg(long, default_value = "plaintext")] + security_protocol: String, + + /// Run mode: "produce" (produce only, prints topic name), "consume" (consume only, requires --topic), or "both" (default) + #[arg(long, default_value = "both")] + mode: String, + + /// Topic name for consume-only mode (use the topic printed by a produce run) + #[arg(long)] + topic: Option, + + /// Number of messages to produce + #[arg(long, default_value = "10000")] + num_messages: usize, + + /// Number of Kafka partitions for the topic + #[arg(long, default_value = "10")] + num_partitions: i32, + + /// Number of consumer instances + #[arg(long, default_value = "3")] + num_consumers: usize, + + // --- AWS options --- + /// AWS region + #[arg(long, default_value = "us-west-2")] + aws_region: String, + + /// AWS EC2 instance type + #[arg(long, default_value = "m7i.large")] + aws_instance_type: String, + + /// AWS AMI ID (Amazon Linux 2) + #[arg(long, default_value = "ami-055a9df0c8c9f681c")] + aws_ami: String, + + /// AWS VPC ID (required for --aws) + #[arg(long)] + aws_vpc: Option, + + /// AWS subnet ID (required for --aws) + #[arg(long)] + aws_subnet: Option, + + /// AWS security group ID (required for --aws) + #[arg(long)] + aws_security_group: Option, +} + +enum Leader {} +enum Consumer {} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + let mut deployment = Deployment::new(); + + let create_host: HostCreator = if args.aws { + let vpc = args + .aws_vpc + .as_ref() + .expect("--aws-vpc required with --aws"); + let subnet = args + .aws_subnet + .as_ref() + .expect("--aws-subnet required with --aws"); + let sg = args + .aws_security_group + .as_ref() + .expect("--aws-security-group required with --aws"); + let region = args.aws_region.clone(); + let instance_type = args.aws_instance_type.clone(); + let ami = args.aws_ami.clone(); + let network = AwsNetwork::new( + ®ion, + Some(NetworkResources::new( + vpc.clone(), + subnet.clone(), + sg.clone(), + )), + ); + + Box::new(move |deployment| -> Arc { + deployment + .AwsEc2Host() + .region(®ion) + .instance_type(&instance_type) + .ami(&ami) + .network(network.clone()) + .target_type(HostTargetType::Linux(LinuxCompileType::Glibc)) + .add() + }) + } else { + let localhost = deployment.Localhost(); + Box::new(move |_| -> Arc { localhost.clone() }) + }; + + let num_messages = args.num_messages; + let num_partitions = args.num_partitions; + let num_consumers = args.num_consumers; + + let produce = args.mode == "produce" || args.mode == "both"; + let consume = args.mode == "consume" || args.mode == "both"; + + // For consume-only, require --topic; otherwise generate a unique one. + let topic = if let Some(t) = &args.topic { + t.clone() + } else { + format!("{}_{}", TOPIC_PREFIX, std::process::id()) + }; + + let mut flow = hydro_lang::compile::builder::FlowBuilder::new(); + let leader = flow.process::(); + let consumers = flow.cluster::(); + + // Leader: produce transactions spread across partitions. + if produce { + let producer = kafka_producer( + &leader, + &args.brokers, + &args.security_protocol, + &topic, + num_partitions, + ); + let transactions = leader.source_iter(q!({ + (0..num_messages).map(|i| { + let account = format!("account_{}", i % 100); + let amount = format!("{}", (i % 201) as i64 - 100); // range [-100, 100] + (account, amount) + }) + })); + let sent = dest_kafka(producer, transactions, &topic); + sent.for_each(q!({ + let count = std::cell::Cell::new(0usize); + move |producer| { + let c = count.get() + 1; + count.set(c); + if c >= num_messages { + rdkafka::producer::Producer::flush( + &*producer, + std::time::Duration::from_secs(30), + ) + .expect("Failed to flush producer"); + println!("PRODUCE_DONE {}", c); + } + } + })); + } + + // Consumers: read from topic and maintain per-account balances. + if consume { + kafka_consumer( + &consumers, + &args.brokers, + "kafka_example_consumers", + &topic, + &args.security_protocol, + ) + .assume_ordering::(nondet!(/** Safe: balances are commutative (addition). */)) + .assume_retries::(nondet!(/** Safe: balances are commutative (addition). */)) + .filter_map(q!(|msg| { + let key = + rdkafka::Message::key(&msg).map(|k| String::from_utf8_lossy(k).to_string())?; + let value = + rdkafka::Message::payload(&msg).map(|v| String::from_utf8_lossy(v).to_string())?; + let amount: i64 = value.parse().ok()?; + Some((key, amount)) + })) + .for_each(q!({ + let balances = std::cell::RefCell::new(std::collections::HashMap::::new()); + move |(account, amount)| { + let mut map = balances.borrow_mut(); + let balance = map.entry(account.clone()).or_insert(0); + *balance += amount; + println!("{}: {}", account, balance); + } + })); + } + + // Extract the IR BEFORE the builder is consumed by deployment methods + let built = flow.finalize(); + + // Generate graph visualizations based on command line arguments + if built.generate_graph(&args.graph)?.is_some() { + return Ok(()); + } + + // Deploy + let mut hosts_builder = built.with_default_optimize(); + hosts_builder = hosts_builder.with_process( + &leader, + TrybuildHost::new(create_host(&mut deployment)).features(vec!["kafka".to_owned()]), + ); + hosts_builder = hosts_builder.with_cluster( + &consumers, + (0..num_consumers).map(|_| { + TrybuildHost::new(create_host(&mut deployment)).features(vec!["kafka".to_owned()]) + }), + ); + let nodes = hosts_builder.deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + deployment.start().await.unwrap(); + + println!( + "Running Kafka example (mode={}, topic={topic}, {num_messages} messages)...", + args.mode + ); + + let start = std::time::Instant::now(); + let total = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); + let (produce_done_tx, produce_done_rx) = tokio::sync::oneshot::channel::<()>(); + { + use hydro_lang::deploy::DeployCrateWrapper; + + if produce { + let leader_node = nodes.get_process(&leader); + let mut leader_out = leader_node.stdout(); + let produce_done_tx = std::sync::Mutex::new(Some(produce_done_tx)); + tokio::spawn(async move { + while let Some(line) = leader_out.recv().await { + if line.starts_with("PRODUCE_DONE") { + if let Some(tx) = produce_done_tx.lock().unwrap().take() { + let _ = tx.send(()); + } + } else { + println!("[Leader] {line}"); + } + } + }); + } + + if consume { + for (i, member) in nodes + .get_cluster(&consumers) + .members() + .into_iter() + .enumerate() + { + let mut member_out = member.stdout(); + let total = total.clone(); + let done_tx = done_tx.clone(); + tokio::spawn(async move { + while let Some(_line) = member_out.recv().await { + let t = total.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + if t.is_multiple_of(1_000) { + println!("[Consumer {i}] ... {t} total messages consumed so far"); + } + if t >= num_messages { + let _ = done_tx.send(()).await; + return; + } + } + }); + } + } + } + drop(done_tx); + + if produce { + let _ = produce_done_rx.await; + let produce_elapsed = start.elapsed(); + println!( + "Produce: {num_messages} messages in {:.2?} ({:.0} msgs/sec)", + produce_elapsed, + num_messages as f64 / produce_elapsed.as_secs_f64() + ); + if !consume { + println!("Topic: {topic}"); + println!("Run consume with: --mode consume --topic {topic}"); + return Ok(()); + } + } + + if consume { + done_rx.recv().await; + let elapsed = start.elapsed(); + println!( + "Consume: {num_messages} messages in {:.2?} ({:.0} msgs/sec)", + elapsed, + num_messages as f64 / elapsed.as_secs_f64() + ); + } + + Ok(()) +} diff --git a/hydro_test/src/kafka/mod.rs b/hydro_test/src/kafka/mod.rs new file mode 100644 index 000000000000..c298404e679c --- /dev/null +++ b/hydro_test/src/kafka/mod.rs @@ -0,0 +1,188 @@ +use hydro_lang::live_collections::boundedness::Boundedness; +use hydro_lang::live_collections::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Ordering}; +use hydro_lang::location::Location; +use hydro_lang::location::tick::{NoAtomic, NoTick}; +use hydro_lang::prelude::*; +use rdkafka::message::OwnedMessage; +use rdkafka::producer::BaseProducer; + +type SharedProducer = std::sync::Arc; + +#[ctor::ctor] +fn init_rewrites() { + stageleft::add_private_reexport( + vec!["rdkafka", "producer", "base_producer"], + vec!["rdkafka", "producer"], + ); + stageleft::add_private_reexport( + vec!["rdkafka", "consumer", "stream_consumer"], + vec!["rdkafka", "consumer"], + ); + stageleft::add_private_reexport( + vec!["rdkafka", "message", "owned_message"], + vec!["rdkafka", "message"], + ); + stageleft::add_private_reexport( + vec!["futures_util", "stream", "stream"], + vec!["futures_util", "stream"], + ); + stageleft::add_private_reexport( + vec!["futures_util", "stream", "unfold"], + vec!["futures_util", "stream"], + ); +} + +/// Creates a Kafka `BaseProducer` singleton wrapped in `Arc` for sharing. +/// +/// The topic will be created on the broker before the producer is returned. +/// This runs on the deployed host, so it works even when brokers are in a +/// private network unreachable from the local machine. +pub fn kafka_producer<'a, Loc>( + location: &Loc, + brokers: &'a str, + security_protocol: &'a str, + topic: &'a str, + num_partitions: i32, +) -> Singleton +where + Loc: Location<'a> + NoTick + NoAtomic, +{ + location.singleton(q!({ + self::setup_topic_blocking(brokers, topic, num_partitions, security_protocol); + std::sync::Arc::new( + rdkafka::config::ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("security.protocol", security_protocol) + .create::() + .expect("Failed to create Kafka producer"), + ) + })) +} + +/// Consumes messages from a Kafka topic. Returns at-least-once, unordered delivery. +pub fn kafka_consumer<'a, Loc>( + location: &Loc, + brokers: &'a str, + group_id: &'a str, + topic: &'a str, + security_protocol: &'a str, +) -> Stream +where + Loc: Location<'a> + NoTick + NoAtomic, +{ + location + .singleton(q!({ + let consumer: rdkafka::consumer::StreamConsumer = rdkafka::config::ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("group.id", group_id) + .set("auto.offset.reset", "earliest") + .set("security.protocol", security_protocol) + .create() + .expect("Failed to create Kafka consumer"); + rdkafka::consumer::Consumer::subscribe(&consumer, &[topic]) + .expect("Failed to subscribe to topic"); + std::sync::Arc::new(consumer) + })) + .into_stream() + .flat_map_stream_blocking(q!(|consumer: std::sync::Arc< + rdkafka::consumer::StreamConsumer, + >| { + futures_util::stream::unfold(consumer, |consumer| async move { + loop { + match rdkafka::consumer::StreamConsumer::recv(&*consumer).await { + Ok(msg) => { + return Some(( + rdkafka::message::BorrowedMessage::detach(&msg), + consumer, + )); + } + Err(e) => { + eprintln!("Kafka consumer error: {}", e); + continue; + } + } + } + }) + })) + .weaken_retries() + .weaken_ordering() +} + +/// Sends `(key, payload)` pairs to a Kafka topic using `BaseProducer`. +/// Messages are queued without waiting for acks. `poll(Duration::ZERO)` is +/// called after each send to drive delivery callbacks. +pub fn dest_kafka<'a, Loc, Bound: Boundedness, Order: Ordering>( + producer: Singleton, + input: Stream<(String, String), Loc, Bound, Order, ExactlyOnce>, + topic: &'a str, +) -> Stream +where + Loc: Location<'a>, +{ + input + .cross_singleton(producer) + .map(q!(|((key, payload), producer)| { + loop { + let record = rdkafka::producer::BaseRecord::to(topic) + .key(&key) + .payload(&payload); + match producer.send(record) { + Ok(()) => break, + Err(( + rdkafka::error::KafkaError::MessageProduction( + rdkafka::types::RDKafkaErrorCode::QueueFull, + ), + _, + )) => { + producer.poll(std::time::Duration::from_millis(100)); + } + Err((e, _)) => panic!("Failed to send message to Kafka: {}", e), + } + } + producer.poll(std::time::Duration::ZERO); + producer + })) +} + +/// Admin helper: create a topic with the given number of partitions. +pub async fn setup_topic(brokers: &str, topic: &str, num_partitions: i32, security_protocol: &str) { + use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; + use rdkafka::config::ClientConfig; + + let admin: AdminClient = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("security.protocol", security_protocol) + .create() + .expect("Failed to create Kafka admin client"); + + let new_topic = NewTopic::new(topic, num_partitions, TopicReplication::Fixed(1)); + admin + .create_topics(&[new_topic], &AdminOptions::new()) + .await + .expect("Failed to create Kafka topic"); +} + +/// Blocking version of [`setup_topic`] for use inside synchronous `q!()` blocks. +/// Uses the existing tokio runtime handle from the Hydro process. +pub fn setup_topic_blocking( + brokers: &str, + topic: &str, + num_partitions: i32, + security_protocol: &str, +) { + let handle = tokio::runtime::Handle::current(); + let brokers = brokers.to_owned(); + let topic = topic.to_owned(); + let security_protocol = security_protocol.to_owned(); + // Spawn a separate thread to avoid calling block_on from within an async context. + std::thread::spawn(move || { + handle.block_on(setup_topic( + &brokers, + &topic, + num_partitions, + &security_protocol, + )); + }) + .join() + .expect("Topic setup thread panicked"); +} diff --git a/hydro_test/src/lib.rs b/hydro_test/src/lib.rs index 07df255174ea..6392f24467b8 100644 --- a/hydro_test/src/lib.rs +++ b/hydro_test/src/lib.rs @@ -9,6 +9,9 @@ pub mod local; pub mod maelstrom; pub mod tutorials; +#[cfg(feature = "kafka")] +pub mod kafka; + #[doc(hidden)] #[cfg(doctest)] mod docs { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index ce8d449d33da..df80702fb6f5 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -5,5 +5,6 @@ components = [ "clippy", # https://github.com/dtolnay/trybuild?tab=readme-ov-file#troubleshooting "rust-src", + "llvm-tools", ] targets = ["wasm32-unknown-unknown", "x86_64-unknown-linux-musl"] \ No newline at end of file