diff --git a/hydro_lang/src/compile/ir/mod.rs b/hydro_lang/src/compile/ir/mod.rs index 905792a74433..72aadbea0c2c 100644 --- a/hydro_lang/src/compile/ir/mod.rs +++ b/hydro_lang/src/compile/ir/mod.rs @@ -30,10 +30,10 @@ use crate::compile::builder::StmtId; use crate::compile::builder::{CycleId, ExternalPortId}; #[cfg(feature = "build")] use crate::compile::deploy_provider::{Deploy, Node, RegisterPort}; +#[cfg(feature = "build")] +use crate::handoff_ref::handoff_ref_ident; use crate::location::dynamic::{ClusterConsistency, LocationId}; use crate::location::{LocationKey, NetworkHint}; -#[cfg(feature = "build")] -use crate::singleton_ref::singleton_ref_ident; pub mod backtrace; use backtrace::Backtrace; @@ -45,8 +45,8 @@ use backtrace::Backtrace; /// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`). pub struct ClosureExpr { pub(crate) expr: DebugExpr, - /// Each entry is `(singleton_node, is_mut)`. - /// The index in the Vec determines the ident name via [`singleton_ref_ident`]. + /// Each entry is `(reference_node, is_mut)`. + /// The index in the Vec determines the ident name via [`handoff_ref_ident`]. pub(crate) singleton_refs: Vec<(HydroNode, bool)>, } @@ -58,12 +58,18 @@ impl Clone for ClosureExpr { .singleton_refs .iter() .map(|(node, is_mut)| { - let HydroNode::Singleton { inner, metadata } = node else { - panic!("singleton_refs should only contain HydroNode::Singleton"); + let HydroNode::Reference { + inner, + kind, + metadata, + } = node + else { + panic!("singleton_refs should only contain HydroNode::Reference"); }; ( - HydroNode::Singleton { + HydroNode::Reference { inner: SharedNode(Rc::clone(&inner.0)), + kind: *kind, metadata: metadata.clone(), }, *is_mut, @@ -191,8 +197,8 @@ impl ClosureExpr { for ((i, (node, is_mut)), ref_ident) in self.singleton_refs.iter().enumerate().zip(ref_idents) { - let HydroNode::Singleton { inner, .. } = node else { - panic!("singleton_refs should only contain HydroNode::Singleton"); + let HydroNode::Reference { inner, .. } = node else { + panic!("singleton_refs should only contain HydroNode::Reference"); }; let ptr = inner.0.as_ref() as *const RefCell; let counter = access_counters.entry(ptr).or_insert(0); @@ -206,7 +212,7 @@ impl ClosureExpr { }; // TODO(mingwei): proper spanning? - let local_ident = singleton_ref_ident(i); + let local_ident = handoff_ref_ident(i); let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone); let group_lit = proc_macro2::Literal::u32_unsuffixed(group); let mut_token = is_mut.then(|| quote!(mut)); @@ -2347,15 +2353,17 @@ pub enum HydroNode { metadata: HydroIrMetadata, }, - /// A singleton materialization point. Wraps a SharedNode so that: - /// - The pipe output delivers the single item to one consumer - /// - `#var` references can borrow the value from the singleton slot + /// A reference materialization point. Wraps a SharedNode so that: + /// - The pipe output delivers data to one consumer + /// - `#var` references can borrow the value from the slot /// - /// In DFIR codegen, emits `ident = inner_ident -> singleton()`. + /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or + /// `-> handoff()` depending on `kind`. /// /// Uses the same `built_tees` dedup pattern as `Tee`. - Singleton { + Reference { inner: SharedNode, + kind: crate::handoff_ref::HandoffRefKind, metadata: HydroIrMetadata, }, @@ -2645,7 +2653,7 @@ impl HydroNode { | HydroNode::CycleSource { .. } | HydroNode::ExternalInput { .. } => {} - HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => { + HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => { if let Some(transformed) = seen_tees.get(&inner.as_ptr()) { *inner = SharedNode(transformed.clone()); } else { @@ -2811,7 +2819,10 @@ impl HydroNode { cycle_id: *cycle_id, metadata: metadata.clone(), }, - HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => { + HydroNode::Tee { inner, metadata } + | HydroNode::Reference { + inner, metadata, .. + } => { let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) { SharedNode(transformed.clone()) } else { @@ -2821,9 +2832,10 @@ impl HydroNode { *new_rc.borrow_mut() = cloned; SharedNode(new_rc) }; - if matches!(self, HydroNode::Singleton { .. }) { - HydroNode::Singleton { + if let HydroNode::Reference { kind, .. } = self { + HydroNode::Reference { inner: cloned_inner, + kind: *kind, metadata: metadata.clone(), } } else { @@ -3595,7 +3607,7 @@ impl HydroNode { ident_stack.push(ret_ident); } - HydroNode::Singleton { inner, .. } => { + HydroNode::Reference { inner, kind, .. } => { let ret_ident = if let Some(built_idents) = built_tees.get(&(inner.0.as_ref() as *const RefCell)) { @@ -3603,20 +3615,28 @@ impl HydroNode { } else { let inner_ident = ident_stack.pop().unwrap(); - let singleton_ident = + let ref_ident = syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site()); built_tees.insert( inner.0.as_ref() as *const RefCell, - vec![singleton_ident.clone()], + vec![ref_ident.clone()], ); match builders_or_callback { BuildersOrCallback::Builders(graph_builders) => { let builder = graph_builders.get_dfir_mut(&out_location); + let op_ident = syn::Ident::new( + match kind { + crate::handoff_ref::HandoffRefKind::Singleton => "singleton", + crate::handoff_ref::HandoffRefKind::Optional => "optional", + crate::handoff_ref::HandoffRefKind::Vec => "handoff", + }, + Span::call_site(), + ); builder.add_dfir( parse_quote! { - #singleton_ident = #inner_ident -> singleton(); + #ref_ident = #inner_ident -> #op_ident(); }, None, Some(&next_stmt_id.to_string()), @@ -3627,11 +3647,11 @@ impl HydroNode { } } - singleton_ident + ref_ident }; - // we consume a stmt id regardless of if we emit the singleton() operator, - // so that during rewrites we touch all recipients of the singleton() + // we consume a stmt id regardless of if we emit the operator, + // so that during rewrites we touch all recipients let _ = next_stmt_id.get_and_increment(); ident_stack.push(ret_ident); } @@ -4918,7 +4938,7 @@ impl HydroNode { } HydroNode::CycleSource { .. } | HydroNode::Tee { .. } - | HydroNode::Singleton { .. } + | HydroNode::Reference { .. } | HydroNode::YieldConcat { .. } | HydroNode::BeginAtomic { .. } | HydroNode::EndAtomic { .. } @@ -4998,7 +5018,7 @@ impl HydroNode { | HydroNode::SingletonSource { metadata, .. } | HydroNode::CycleSource { metadata, .. } | HydroNode::Tee { metadata, .. } - | HydroNode::Singleton { metadata, .. } + | HydroNode::Reference { metadata, .. } | HydroNode::Partition { metadata, .. } | HydroNode::YieldConcat { metadata, .. } | HydroNode::BeginAtomic { metadata, .. } @@ -5056,7 +5076,7 @@ impl HydroNode { | HydroNode::SingletonSource { metadata, .. } | HydroNode::CycleSource { metadata, .. } | HydroNode::Tee { metadata, .. } - | HydroNode::Singleton { metadata, .. } + | HydroNode::Reference { metadata, .. } | HydroNode::Partition { metadata, .. } | HydroNode::YieldConcat { metadata, .. } | HydroNode::BeginAtomic { metadata, .. } @@ -5107,7 +5127,7 @@ impl HydroNode { | HydroNode::ExternalInput { .. } | HydroNode::CycleSource { .. } | HydroNode::Tee { .. } - | HydroNode::Singleton { .. } + | HydroNode::Reference { .. } | HydroNode::Partition { .. } => { // Tee/Partition should find their input in separate special ways vec![] @@ -5186,9 +5206,9 @@ impl HydroNode { HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => { Rc::strong_count(&inner.0) > 1 } - // A zero-output singleton() is valid in DFIR (it drains itself at + // A zero-output reference node is valid in DFIR (it drains itself at // end of tick), so it doesn't need to be driven by another consumer. - HydroNode::Singleton { .. } => false, + HydroNode::Reference { .. } => false, _ => false, } } @@ -5215,8 +5235,8 @@ impl HydroNode { HydroNode::Tee { inner, .. } => { format!("Tee({})", inner.0.borrow().print_root()) } - HydroNode::Singleton { inner, .. } => { - format!("Singleton({})", inner.0.borrow().print_root()) + HydroNode::Reference { inner, kind, .. } => { + format!("Reference({:?}, {})", kind, inner.0.borrow().print_root()) } HydroNode::Partition { f, is_true, .. } => { format!("Partition({:?}, is_true={})", f, is_true) diff --git a/hydro_lang/src/handoff_ref.rs b/hydro_lang/src/handoff_ref.rs new file mode 100644 index 000000000000..3d95156e26b1 --- /dev/null +++ b/hydro_lang/src/handoff_ref.rs @@ -0,0 +1,405 @@ +//! Reference handles for capturing singletons, optionals, and streams in `q!()` closures. +//! +//! Each handle type wraps a `&RefCell` and, when captured inside a `q!()` closure, +//! registers itself with the current capture scope. At codegen time, the IR node is lowered +//! to the corresponding DFIR pseudo-operator (`singleton()`, `optional()`, or `handoff()`), +//! and the reference resolves to the appropriate borrow type. + +use std::cell::RefCell; +use std::marker::PhantomData; +use std::rc::Rc; + +use proc_macro2::Span; +use quote::quote; +use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens}; + +use crate::compile::ir::{HydroNode, SharedNode}; +use crate::location::Location; + +/// Determines which DFIR pseudo-operator a reference node lowers to. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub enum HandoffRefKind { + /// `-> singleton()` — exactly one item, `#var` gives `&T`. + Singleton, + /// `-> optional()` — zero or one item, `#var` gives `&Option`. + Optional, + /// `-> handoff()` — zero or more items, `#var` gives `&Vec`. + Vec, +} + +// Thread-local storage for handoff references captured during `q!()` expansion. +// Stores the HydroNode `(node, is_mut)` for each reference captured in the current closure. +// The index determines the ident name via `handoff_ref_ident`. +thread_local! { + static CAPTURED_REFS: RefCell>> = const { RefCell::new(None) }; +} + +/// Returns the canonical ident for a captured ref at the given index within a closure. +pub(crate) fn handoff_ref_ident(index: usize) -> syn::Ident { + syn::Ident::new( + &format!("__hydro_singleton_ref_{}", index), + Span::call_site(), + ) +} + +/// Activate the reference capture context. Must be called before `q!()` expansion +/// that may capture handoff references. Returns a `ClosureExpr` bundling the expression with any +/// captured references. +pub fn with_ref_capture( + f: impl FnOnce() -> crate::compile::ir::DebugExpr, +) -> crate::compile::ir::ClosureExpr { + CAPTURED_REFS.with(|cell| { + let prev = cell.borrow_mut().replace(Vec::new()); + assert!( + prev.is_none(), + "nested handoff reference capture scopes are not supported" + ); + }); + let expr = (f)(); + let captured_refs = CAPTURED_REFS.with(|cell| cell.borrow_mut().take().unwrap()); + crate::compile::ir::ClosureExpr::new(expr, captured_refs) +} + +/// Shared registration logic: wraps the IR node in `HydroNode::Reference` if needed, +/// pushes it to the capture list, and returns the ident to use in the closure body. +fn register_handoff_ref( + ir_node: &RefCell, + is_mut: bool, + kind: HandoffRefKind, +) -> syn::Ident { + CAPTURED_REFS.with(|cell| { + let mut guard = cell.borrow_mut(); + let refs = guard.as_mut().expect( + "HandoffRef used inside q!() but no reference capture scope is active. \ + This is a bug — reference capture should be set up by the operator that uses q!().", + ); + + let index = refs.len(); + let ident = handoff_ref_ident(index); + + let metadata = ir_node.borrow().metadata().clone(); + + // Wrap in HydroNode::Reference for materialization + identity tracking. + // If already a Reference node, reuse it. + if !matches!(&*ir_node.borrow(), HydroNode::Reference { .. }) { + let orig = ir_node.replace(HydroNode::Placeholder); + *ir_node.borrow_mut() = HydroNode::Reference { + inner: SharedNode(Rc::new(RefCell::new(orig))), + kind, + metadata: metadata.clone(), + }; + } + + let borrow: std::cell::Ref<'_, HydroNode> = ir_node.borrow(); + let HydroNode::Reference { inner, .. } = &*borrow else { + unreachable!() + }; + + refs.push(( + HydroNode::Reference { + inner: SharedNode(Rc::clone(&inner.0)), + kind, + metadata, + }, + is_mut, + )); + + ident + }) +} + +/// Macro to define a handoff reference struct with all necessary trait impls. +macro_rules! define_handoff_ref { + ( + $(#[$meta:meta])* + $name:ident, $is_mut:expr, $kind:expr, $output:ty + ) => { + $(#[$meta])* + pub struct $name<'a, 'slf, T, L> { + pub(crate) ir_node: &'slf RefCell, + _phantom: PhantomData<(&'a T, L)>, + } + + impl<'slf, T, L> $name<'_, 'slf, T, L> { + /// Creates a new reference handle from an IR node cell. + pub(crate) fn new(ir_node: &'slf RefCell) -> Self { + Self { + ir_node, + _phantom: PhantomData, + } + } + } + + impl Copy for $name<'_, '_, T, L> {} + impl Clone for $name<'_, '_, T, L> { + fn clone(&self) -> Self { + *self + } + } + + impl<'a, 'slf, T: 'a, L> FreeVariableWithContextWithProps for $name<'a, 'slf, T, L> + where + L: Location<'a>, + { + type O = $output; + + fn to_tokens(self, _ctx: &L) -> (QuoteTokens, ()) { + let ident = register_handoff_ref(self.ir_node, $is_mut, $kind); + ( + QuoteTokens { + prelude: None, + expr: Some(quote!(#ident)), + }, + (), + ) + } + } + }; +} + +define_handoff_ref!( + /// A shared reference handle to a singleton, resolves to `&T` at runtime. + /// + /// Created via [`Singleton::by_ref()`](crate::live_collections::Singleton::by_ref). + SingletonRef, false, HandoffRefKind::Singleton, &'a T +); + +define_handoff_ref!( + /// A mutable reference handle to a singleton, resolves to `&mut T` at runtime. + /// + /// Created via [`Singleton::by_mut()`](crate::live_collections::Singleton::by_mut). + SingletonMut, true, HandoffRefKind::Singleton, &'a mut T +); + +define_handoff_ref!( + /// A shared reference handle to an optional, resolves to `&Option` at runtime. + /// + /// Created via [`Optional::by_ref()`](crate::live_collections::Optional::by_ref). + OptionalRef, false, HandoffRefKind::Optional, &'a Option +); + +define_handoff_ref!( + /// A mutable reference handle to an optional, resolves to `&mut Option` at runtime. + /// + /// Created via [`Optional::by_mut()`](crate::live_collections::Optional::by_mut). + OptionalMut, true, HandoffRefKind::Optional, &'a mut Option +); + +define_handoff_ref!( + /// A shared reference handle to a stream's handoff buffer, resolves to `&Vec` at runtime. + /// + /// Created via [`Stream::by_ref()`](crate::live_collections::Stream::by_ref). + StreamRef, false, HandoffRefKind::Vec, &'a Vec +); + +define_handoff_ref!( + /// A mutable reference handle to a stream's handoff buffer, resolves to `&mut Vec` at runtime. + /// + /// Created via [`Stream::by_mut()`](crate::live_collections::Stream::by_mut). + StreamMut, true, HandoffRefKind::Vec, &'a mut Vec +); + +#[cfg(test)] +#[cfg(feature = "build")] +mod tests { + use stageleft::q; + + use crate::compile::builder::FlowBuilder; + use crate::location::Location; + + struct P1 {} + + /// Compile-only test: verifies that `by_ref()` + `q!()` produces valid IR. + #[test] + fn singleton_by_ref_compiles() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let my_count = node + .source_iter(q!(0..5i32)) + .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); + let count_ref = my_count.by_ref(); + + node.source_iter(q!(1..=3i32)) + .map(q!(|x| x + *count_ref)) + .for_each(q!(|_| {})); + + my_count.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Test with a non-Copy type (Vec) to ensure we're borrowing, not copying. + #[test] + fn singleton_by_ref_non_copy() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let my_vec = node.source_iter(q!(0..5i32)).fold( + q!(|| Vec::::new()), + q!(|acc: &mut Vec, x| acc.push(x)), + ); + let vec_ref = my_vec.by_ref(); + + node.source_iter(q!(1..=3i32)) + .map(q!(|x| x + vec_ref.len() as i32)) + .for_each(q!(|_| {})); + + my_vec.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only: singleton ref inside filter closure. + #[test] + fn singleton_by_ref_filter() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let threshold = node + .source_iter(q!(0..5i32)) + .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); + let threshold_ref = threshold.by_ref(); + + node.source_iter(q!(1..=10i32)) + .filter(q!(|x| *x > *threshold_ref)) + .for_each(q!(|_| {})); + + threshold.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only: singleton ref inside flat_map closure. + #[test] + fn singleton_by_ref_flat_map() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let count = node + .source_iter(q!(0..3i32)) + .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1)); + let count_ref = count.by_ref(); + + node.source_iter(q!(1..=2i32)) + .flat_map_ordered(q!(|x| (0..*count_ref).map(move |i| x + i))) + .for_each(q!(|_| {})); + + count.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only: singleton ref inside inspect closure. + #[test] + fn singleton_by_ref_inspect() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let count = node + .source_iter(q!(0..5i32)) + .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1)); + let count_ref = count.by_ref(); + + node.source_iter(q!(1..=3i32)) + .inspect(q!(|x| println!("count={}, x={}", *count_ref, x))) + .for_each(q!(|_| {})); + + count.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only: singleton ref inside partition predicate. + #[test] + fn singleton_by_ref_partition() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let threshold = node + .source_iter(q!(0..5i32)) + .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); + let threshold_ref = threshold.by_ref(); + + let (above, below) = node + .source_iter(q!(1..=10i32)) + .partition(q!(|x| *x > *threshold_ref)); + + above.for_each(q!(|_| {})); + below.for_each(q!(|_| {})); + threshold.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only: singleton ref inside partition with downstream operators on both branches. + #[test] + fn singleton_by_ref_partition_with_downstream_ops() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let threshold = node + .source_iter(q!(0..5i32)) + .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); + let threshold_ref = threshold.by_ref(); + + let (above, below) = node + .source_iter(q!(1..=10i32)) + .partition(q!(|x| *x > *threshold_ref)); + + above.map(q!(|x| x * 2)).for_each(q!(|_| {})); + below.map(q!(|x| x + 100)).for_each(q!(|_| {})); + threshold.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only test: singleton by_mut. + #[test] + fn singleton_by_mut_compiles() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let my_count = node + .source_iter(q!(0..5i32)) + .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); + let count_mut = my_count.by_mut(); + + node.source_iter(q!(1..=3i32)) + .map(q!(|x| { + *count_mut += x; + x + })) + .for_each(q!(|_| {})); + + my_count.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only test: optional by_ref. + #[test] + fn optional_by_ref_compiles() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let my_opt = node.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b)); + let opt_ref = my_opt.by_ref(); + + node.source_iter(q!(1..=3i32)) + .map(q!(|x| x + opt_ref.unwrap_or(0))) + .for_each(q!(|_| {})); + + my_opt.into_stream().for_each(q!(|_| {})); + let _built = flow.finalize(); + } + + /// Compile-only test: stream by_ref. + #[test] + fn stream_by_ref_compiles() { + let mut flow = FlowBuilder::new(); + let node = flow.process::(); + + let my_stream = node.source_iter(q!(0..5i32)); + let stream_ref = my_stream.by_ref(); + + node.source_iter(q!(1..=3i32)) + .map(q!(|x| x + stream_ref.len() as i32)) + .for_each(q!(|_| {})); + + my_stream.for_each(q!(|_| {})); + let _built = flow.finalize(); + } +} diff --git a/hydro_lang/src/lib.rs b/hydro_lang/src/lib.rs index 3ca08c3c5b16..7a0fa2e5cdb4 100644 --- a/hydro_lang/src/lib.rs +++ b/hydro_lang/src/lib.rs @@ -108,7 +108,7 @@ pub mod forward_handle; pub mod compile; -pub mod singleton_ref; +pub mod handoff_ref; mod manual_expr; @@ -135,6 +135,11 @@ fn init_rewrites() { vec!["tokio_util", "codec", "lines_codec"], vec!["tokio_util", "codec"], ); + // TODO: remove once stageleft is updated with this rewrite built-in + stageleft::add_private_reexport( + vec!["core", "iter", "sources", "empty"], + vec!["std", "iter"], + ); } #[cfg(all(test, feature = "trybuild"))] diff --git a/hydro_lang/src/live_collections/optional.rs b/hydro_lang/src/live_collections/optional.rs index 7353a7826357..56cffb7eaa01 100644 --- a/hydro_lang/src/live_collections/optional.rs +++ b/hydro_lang/src/live_collections/optional.rs @@ -294,6 +294,26 @@ where &self.location } + /// Creates a shared reference handle to this optional that can be captured inside `q!()` + /// closures. The handle resolves to `&Option` at runtime. + /// + /// The optional must be bounded, otherwise reading it would be non-deterministic. + pub fn by_ref(&self) -> crate::handoff_ref::OptionalRef<'a, '_, T, L> + where + B: IsBounded, + { + crate::handoff_ref::OptionalRef::new(&self.ir_node) + } + + /// Returns a mutable reference handle to this optional that can be captured inside `q!()` + /// closures. The handle resolves to `&mut Option` at runtime. + pub fn by_mut(&self) -> crate::handoff_ref::OptionalMut<'a, '_, T, L> + where + B: IsBounded, + { + crate::handoff_ref::OptionalMut::new(&self.ir_node) + } + /// Weakens the consistency of this live collection to not guarantee any consistency across /// cluster members (if this collection is on a cluster). pub fn weaken_consistency(self) -> Optional diff --git a/hydro_lang/src/live_collections/singleton.rs b/hydro_lang/src/live_collections/singleton.rs index ee9f9b1d385d..e734624a819f 100644 --- a/hydro_lang/src/live_collections/singleton.rs +++ b/hydro_lang/src/live_collections/singleton.rs @@ -337,11 +337,11 @@ where /// # }); /// # } /// ``` - pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, '_, T, L> + pub fn by_ref(&self) -> crate::handoff_ref::SingletonRef<'a, '_, T, L> where B: IsBounded, { - crate::singleton_ref::SingletonRef::new(&self.ir_node) + crate::handoff_ref::SingletonRef::new(&self.ir_node) } /// Returns a mutable reference handle to this singleton that can be captured inside `q!()` @@ -386,11 +386,11 @@ where /// # }); /// # } /// ``` - pub fn by_mut(&self) -> crate::singleton_ref::SingletonMut<'a, '_, T, L> + pub fn by_mut(&self) -> crate::handoff_ref::SingletonMut<'a, '_, T, L> where B: IsBounded, { - crate::singleton_ref::SingletonMut::new(&self.ir_node) + crate::handoff_ref::SingletonMut::new(&self.ir_node) } /// Weakens the consistency of this live collection to not guarantee any consistency across diff --git a/hydro_lang/src/live_collections/stream/mod.rs b/hydro_lang/src/live_collections/stream/mod.rs index f5f557924955..07bc802cf292 100644 --- a/hydro_lang/src/live_collections/stream/mod.rs +++ b/hydro_lang/src/live_collections/stream/mod.rs @@ -429,6 +429,26 @@ where &self.location } + /// Creates a shared reference handle to this stream's handoff buffer that can be captured + /// inside `q!()` closures. The handle resolves to `&Vec` at runtime. + /// + /// The stream must be bounded, otherwise reading it would be non-deterministic. + pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L> + where + B: IsBounded, + { + crate::handoff_ref::StreamRef::new(&self.ir_node) + } + + /// Returns a mutable reference handle to this stream's handoff buffer that can be captured + /// inside `q!()` closures. The handle resolves to `&mut Vec` at runtime. + pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L> + where + B: IsBounded, + { + crate::handoff_ref::StreamMut::new(&self.ir_node) + } + /// Weakens the consistency of this live collection to not guarantee any consistency across /// cluster members (if this collection is on a cluster). pub fn weaken_consistency(self) -> Stream @@ -559,7 +579,7 @@ where C: ValidMutCommutativityFor, I: ValidMutIdempotenceFor, { - let f = crate::singleton_ref::with_singleton_capture(|| { + let f = crate::handoff_ref::with_ref_capture(|| { let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location); proof.register_proof(&expr); expr.into() @@ -605,9 +625,7 @@ where I: IntoIterator, F: Fn(T) -> I + 'a, { - let f = crate::singleton_ref::with_singleton_capture(|| { - f.splice_fn1_ctx(&self.location).into() - }); + let f = crate::handoff_ref::with_ref_capture(|| f.splice_fn1_ctx(&self.location).into()); Stream::new( self.location.clone(), HydroNode::FlatMap { @@ -654,9 +672,7 @@ where I: IntoIterator, F: Fn(T) -> I + 'a, { - let f = crate::singleton_ref::with_singleton_capture(|| { - f.splice_fn1_ctx(&self.location).into() - }); + let f = crate::handoff_ref::with_ref_capture(|| f.splice_fn1_ctx(&self.location).into()); Stream::new( self.location.clone(), HydroNode::FlatMap { @@ -793,9 +809,8 @@ where where F: Fn(&T) -> bool + 'a, { - let f = crate::singleton_ref::with_singleton_capture(|| { - f.splice_fn1_borrow_ctx(&self.location).into() - }); + let f = + crate::handoff_ref::with_ref_capture(|| f.splice_fn1_borrow_ctx(&self.location).into()); Stream::new( self.location.clone(), HydroNode::Filter { @@ -851,9 +866,8 @@ where where F: Fn(&T) -> bool + 'a, { - let f = crate::singleton_ref::with_singleton_capture(|| { - f.splice_fn1_borrow_ctx(&self.location).into() - }); + let f = + crate::handoff_ref::with_ref_capture(|| f.splice_fn1_borrow_ctx(&self.location).into()); let shared = SharedNode(Rc::new(RefCell::new( self.ir_node.replace(HydroNode::Placeholder), ))); @@ -904,7 +918,9 @@ where where F: Fn(T) -> Option + 'a, { - let f = f.splice_fn1_ctx(&self.location).into(); + let f = crate::singleton_ref::with_singleton_capture(|| { + f.splice_fn1_ctx(&self.location).into() + }); Stream::new( self.location.clone(), HydroNode::FilterMap { @@ -1220,7 +1236,7 @@ where where F: Fn(&T) + 'a, { - let f = crate::singleton_ref::with_singleton_capture(|| { + let f = crate::handoff_ref::with_ref_capture(|| { f.splice_fn1_borrow_ctx(&self.location.drop_consistency()) .into() }); diff --git a/hydro_lang/src/singleton_ref.rs b/hydro_lang/src/singleton_ref.rs deleted file mode 100644 index 5a01df38fd57..000000000000 --- a/hydro_lang/src/singleton_ref.rs +++ /dev/null @@ -1,328 +0,0 @@ -//! Singleton reference handle for capturing singletons in `q!()` closures. - -use std::cell::RefCell; -use std::marker::PhantomData; -use std::rc::Rc; - -use proc_macro2::Span; -use quote::quote; -use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens}; - -use crate::compile::ir::{HydroNode, SharedNode}; -use crate::location::Location; - -/// A lightweight handle to a singleton that can be captured inside `q!()` closures. -/// -/// Created via [`Singleton::by_ref()`](crate::live_collections::Singleton::by_ref). When used -/// inside a `q!()` closure, resolves to a reference to the singleton's value (`&T`) at runtime. -/// -/// This type is `Copy` (required by `q!()` macro internals). -/// TODO(mingwei): -pub struct SingletonRef<'a, 'slf, T, L, const IS_MUT: bool = false> { - /// Will be updated to `HydroNode::Singleton` when used, if not already. - pub(crate) ir_node: &'slf RefCell, - _phantom: PhantomData<(&'a T, L)>, -} -/// Alias for [`SingletonRef`] with `IS_MUT = true`. -pub type SingletonMut<'a, 'slf, T, L> = SingletonRef<'a, 'slf, T, L, true>; - -impl<'slf, T, L, const IS_MUT: bool> SingletonRef<'_, 'slf, T, L, IS_MUT> { - /// Creates a `SingletonRef` from a shared node. - pub(crate) fn new(ir_node: &'slf RefCell) -> Self { - Self { - ir_node, - _phantom: PhantomData, - } - } - - /// Converts this singleton into a shared (non-`mut`) `SingletonRef`. - pub fn as_ref(&self) -> SingletonRef<'_, 'slf, T, L, false> { - SingletonRef { - ir_node: self.ir_node, - _phantom: PhantomData, - } - } - - /// Converts this singleton into an exclusive (`mut`) `SingletonRef`. - pub fn as_mut(&self) -> SingletonRef<'_, 'slf, T, L, true> { - SingletonRef { - ir_node: self.ir_node, - _phantom: PhantomData, - } - } -} - -impl Copy for SingletonRef<'_, '_, T, L, IS_MUT> {} -impl Clone for SingletonRef<'_, '_, T, L, IS_MUT> { - fn clone(&self) -> Self { - *self - } -} - -// Thread-local storage for singleton references captured during `q!()` expansion. -// Stores the HydroNode `(SharedNode, is_mut)` for each singleton captured in the current closure. -// The index in the Vec determines the ident name via `singleton_ref_ident`. -thread_local! { - static SINGLETON_REFS: RefCell>> = const { RefCell::new(None) }; -} - -/// Returns the canonical ident for a singleton ref at the given index within a closure. -pub(crate) fn singleton_ref_ident(index: usize) -> syn::Ident { - syn::Ident::new( - &format!("__hydro_singleton_ref_{}", index), - Span::call_site(), - ) -} - -/// Activate the singleton reference capture context. Must be called before `q!()` expansion -/// that may capture singletons. Returns a `ClosureExpr` bundling the expression with any -/// captured singleton references. -pub fn with_singleton_capture( - f: impl FnOnce() -> crate::compile::ir::DebugExpr, -) -> crate::compile::ir::ClosureExpr { - SINGLETON_REFS.with(|cell| { - let prev = cell.borrow_mut().replace(Vec::new()); - assert!( - prev.is_none(), - "nested singleton capture scopes are not supported" - ); - }); - let expr = (f)(); - let singleton_refs = SINGLETON_REFS.with(|cell| cell.borrow_mut().take().unwrap()); - crate::compile::ir::ClosureExpr::new(expr, singleton_refs) -} - -impl<'a, 'slf, T: 'a, L, const IS_MUT: bool> SingletonRef<'a, 'slf, T, L, IS_MUT> -where - L: Location<'a>, -{ - fn to_tokens_helper(self, _ctx: &L) -> (QuoteTokens, ()) { - let ident = SINGLETON_REFS.with(|cell| { - let mut guard = cell.borrow_mut(); - let refs = guard.as_mut().expect( - "SingletonRef used inside q!() but no singleton capture scope is active. \ - This is a bug — singleton capture should be set up by the operator that uses q!().", - ); - - let index = refs.len(); - let ident = singleton_ref_ident(index); - - let metadata = self.ir_node.borrow().metadata().clone(); - - // Wrap in HydroNode::Singleton for materialization + identity tracking. If already a Singleton node, - // reuse it. - if !matches!(&*self.ir_node.borrow(), HydroNode::Singleton { .. }) { - let orig = self.ir_node.replace(HydroNode::Placeholder); - *self.ir_node.borrow_mut() = HydroNode::Singleton { - inner: SharedNode(Rc::new(RefCell::new(orig))), - metadata: metadata.clone(), - }; - } - - let borrow: std::cell::Ref<'_, HydroNode> = self.ir_node.borrow(); - let HydroNode::Singleton { inner, .. } = &*borrow else { - unreachable!() - }; - - refs.push(( - HydroNode::Singleton { - inner: SharedNode(Rc::clone(&inner.0)), - metadata, - }, - IS_MUT, - )); - - ident - }); - - ( - QuoteTokens { - prelude: None, - expr: Some(quote!(#ident)), - }, - (), - ) - } -} - -impl<'a, 'slf, T: 'a, L> FreeVariableWithContextWithProps for SingletonRef<'a, 'slf, T, L> -where - L: Location<'a>, -{ - type O = &'a T; - - fn to_tokens(self, ctx: &L) -> (QuoteTokens, ()) { - self.to_tokens_helper(ctx) - } -} - -impl<'a, 'slf, T: 'a, L> FreeVariableWithContextWithProps for SingletonMut<'a, 'slf, T, L> -where - L: Location<'a>, -{ - type O = &'a mut T; - - fn to_tokens(self, ctx: &L) -> (QuoteTokens, ()) { - self.to_tokens_helper(ctx) - } -} - -#[cfg(test)] -#[cfg(feature = "build")] -mod tests { - use stageleft::q; - - use crate::compile::builder::FlowBuilder; - use crate::location::Location; - - struct P1 {} - - /// Compile-only test: verifies that `by_ref()` + `q!()` produces valid IR - /// that can be finalized without panicking. - #[test] - fn singleton_by_ref_compiles() { - let mut flow = FlowBuilder::new(); - let node = flow.process::(); - - let my_count = node - .source_iter(q!(0..5i32)) - .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); - let count_ref = my_count.by_ref(); - - node.source_iter(q!(1..=3i32)) - .map(q!(|x| x + *count_ref)) - .for_each(q!(|_| {})); - - // Also consume the singleton via pipe (tests Tee works correctly). - my_count.into_stream().for_each(q!(|_| {})); - - // If this doesn't panic, the IR was built successfully with singleton refs. - let _built = flow.finalize(); - } - - /// Test with a non-Copy type (Vec) to ensure we're borrowing, not copying. - #[test] - fn singleton_by_ref_non_copy() { - let mut flow = FlowBuilder::new(); - let node = flow.process::(); - - let my_vec = node.source_iter(q!(0..5i32)).fold( - q!(|| Vec::::new()), - q!(|acc: &mut Vec, x| acc.push(x)), - ); - let vec_ref = my_vec.by_ref(); - - node.source_iter(q!(1..=3i32)) - .map(q!(|x| x + vec_ref.len() as i32)) - .for_each(q!(|_| {})); - - // Also consume the singleton via pipe. - my_vec.into_stream().for_each(q!(|_| {})); - - let _built = flow.finalize(); - } - - /// Compile-only: singleton ref inside filter closure. - #[test] - fn singleton_by_ref_filter() { - let mut flow = FlowBuilder::new(); - let node = flow.process::(); - - let threshold = node - .source_iter(q!(0..5i32)) - .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); - let threshold_ref = threshold.by_ref(); - - node.source_iter(q!(1..=10i32)) - .filter(q!(|x| *x > *threshold_ref)) - .for_each(q!(|_| {})); - - threshold.into_stream().for_each(q!(|_| {})); - let _built = flow.finalize(); - } - - /// Compile-only: singleton ref inside flat_map closure. - #[test] - fn singleton_by_ref_flat_map() { - let mut flow = FlowBuilder::new(); - let node = flow.process::(); - - let count = node - .source_iter(q!(0..3i32)) - .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1)); - let count_ref = count.by_ref(); - - node.source_iter(q!(1..=2i32)) - .flat_map_ordered(q!(|x| (0..*count_ref).map(move |i| x + i))) - .for_each(q!(|_| {})); - - count.into_stream().for_each(q!(|_| {})); - let _built = flow.finalize(); - } - - /// Compile-only: singleton ref inside inspect closure. - #[test] - fn singleton_by_ref_inspect() { - let mut flow = FlowBuilder::new(); - let node = flow.process::(); - - let count = node - .source_iter(q!(0..5i32)) - .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1)); - let count_ref = count.by_ref(); - - node.source_iter(q!(1..=3i32)) - .inspect(q!(|x| println!("count={}, x={}", *count_ref, x))) - .for_each(q!(|_| {})); - - count.into_stream().for_each(q!(|_| {})); - let _built = flow.finalize(); - } - - /// Compile-only: singleton ref inside partition predicate. - #[test] - fn singleton_by_ref_partition() { - let mut flow = FlowBuilder::new(); - let node = flow.process::(); - - let threshold = node - .source_iter(q!(0..5i32)) - .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); - let threshold_ref = threshold.by_ref(); - - let (above, below) = node - .source_iter(q!(1..=10i32)) - .partition(q!(|x| *x > *threshold_ref)); - - above.for_each(q!(|_| {})); - below.for_each(q!(|_| {})); - threshold.into_stream().for_each(q!(|_| {})); - let _built = flow.finalize(); - } - - /// Compile-only: singleton ref inside partition with downstream operators on both branches. - /// - /// This exercises the ident_stack pop logic in the "already built" path of Partition - /// code generation. When the second branch is processed, singleton ref idents pushed by - /// transform_children must be popped to keep the stack consistent for downstream ops. - #[test] - fn singleton_by_ref_partition_with_downstream_ops() { - let mut flow = FlowBuilder::new(); - let node = flow.process::(); - - let threshold = node - .source_iter(q!(0..5i32)) - .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x)); - let threshold_ref = threshold.by_ref(); - - let (above, below) = node - .source_iter(q!(1..=10i32)) - .partition(q!(|x| *x > *threshold_ref)); - - // Downstream operators on both branches — if the pop is missing, these will fail - above.map(q!(|x| x * 2)).for_each(q!(|_| {})); - below.map(q!(|x| x + 100)).for_each(q!(|_| {})); - threshold.into_stream().for_each(q!(|_| {})); - let _built = flow.finalize(); - } -} diff --git a/hydro_lang/src/viz/render.rs b/hydro_lang/src/viz/render.rs index 8b5ad0dcdf40..34e57a7ccab3 100644 --- a/hydro_lang/src/viz/render.rs +++ b/hydro_lang/src/viz/render.rs @@ -1118,7 +1118,10 @@ impl HydroNode { cycle_id, metadata, .. } => build_source_node(structure, metadata, format!("cycle_source({})", cycle_id)), - HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => { + HydroNode::Tee { inner, metadata } + | HydroNode::Reference { + inner, metadata, .. + } => { let ptr = inner.as_ptr(); if let Some(&existing_id) = seen_tees.get(&ptr) { return existing_id; @@ -1128,7 +1131,7 @@ impl HydroNode { .0 .borrow() .build_graph_structure(structure, seen_tees, config); - let node_type = if matches!(self, HydroNode::Singleton { .. }) { + let node_type = if matches!(self, HydroNode::Reference { .. }) { HydroNodeType::Aggregation } else { HydroNodeType::Tee diff --git a/hydro_lang/tests/compile-fail/stable/stream_by_ref_unbounded.stderr b/hydro_lang/tests/compile-fail/stable/stream_by_ref_unbounded.stderr new file mode 100644 index 000000000000..4e4a40aea6f7 --- /dev/null +++ b/hydro_lang/tests/compile-fail/stable/stream_by_ref_unbounded.stderr @@ -0,0 +1,16 @@ +error[E0277]: The input collection must be bounded (`Bounded`), but has bound `hydro_lang::prelude::Unbounded`. Strengthen the boundedness upstream or consider a different API. + --> tests/compile-fail/stable/stream_by_ref_unbounded.rs:7:23 + | +7 | let _ = unbounded.by_ref(); + | ^^^^^^ required here + | + = help: the trait `IsBounded` is not implemented for `hydro_lang::prelude::Unbounded` + = note: To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary. +note: required by a bound in `hydro_lang::prelude::Stream::::by_ref` + --> src/live_collections/stream/mod.rs + | + | pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L> + | ------ required by a bound in this associated function + | where + | B: IsBounded, + | ^^^^^^^^^ required by this bound in `Stream::::by_ref` diff --git a/hydro_lang/tests/compile-fail/stream_by_ref_unbounded.rs b/hydro_lang/tests/compile-fail/stream_by_ref_unbounded.rs new file mode 100644 index 000000000000..f8aeca7cf384 --- /dev/null +++ b/hydro_lang/tests/compile-fail/stream_by_ref_unbounded.rs @@ -0,0 +1,10 @@ +use hydro_lang::prelude::*; + +struct P1 {} + +fn test<'a>(p1: &Process<'a, P1>) { + let unbounded: Stream<_, _> = p1.source_iter(q!(0..10)).into(); + let _ = unbounded.by_ref(); +} + +fn main() {} diff --git a/hydro_test/src/local/mod.rs b/hydro_test/src/local/mod.rs index 887a6e3b474e..5f88fa86d807 100644 --- a/hydro_test/src/local/mod.rs +++ b/hydro_test/src/local/mod.rs @@ -3,6 +3,8 @@ pub mod chat_app; pub mod count_elems; pub mod futures; pub mod graph_reachability; +pub mod optional_ref; pub mod singleton_input; pub mod singleton_mut; pub mod singleton_ref; +pub mod stream_ref; diff --git a/hydro_test/src/local/optional_ref.rs b/hydro_test/src/local/optional_ref.rs new file mode 100644 index 000000000000..83ebdae7428e --- /dev/null +++ b/hydro_test/src/local/optional_ref.rs @@ -0,0 +1,129 @@ +#[cfg(test)] +mod tests { + use futures::StreamExt; + use hydro_deploy::Deployment; + use hydro_lang::compile::builder::FlowBuilder; + use stageleft::q; + + #[tokio::test] + async fn test_optional_ref() { + let mut deployment = Deployment::new(); + + let mut builder = FlowBuilder::new(); + let external = builder.external::<()>(); + let p1 = builder.process::<()>(); + + // Create an optional: reduce 0..5 => Some(10) (sum via reduce) + let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b)); + + let opt_ref = my_opt.by_ref(); + + // Use the optional ref in a map: unwrap_or(0) + x + let out_port = p1 + .source_iter(q!(1..=3i32)) + .map(q!(|x| x + opt_ref.unwrap_or(0))) + .send_bincode_external(&external); + + let nodes = builder + .with_default_optimize() + .with_process(&p1, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut out_recv = nodes.connect(out_port).await; + + deployment.start().await.unwrap(); + + let mut results = Vec::new(); + for _ in 0..3 { + results.push(out_recv.next().await.unwrap()); + } + results.sort(); + // reduce(0..5) = 10, so results should be 11, 12, 13 + assert_eq!(results, vec![11, 12, 13]); + } + + #[tokio::test] + async fn test_optional_ref_none() { + let mut deployment = Deployment::new(); + + let mut builder = FlowBuilder::new(); + let external = builder.external::<()>(); + let p1 = builder.process::<()>(); + + // Create an optional from an empty source => None + let my_opt = p1 + .source_iter(q!(std::iter::empty::())) + .reduce(q!(|a, b| *a += b)); + + let opt_ref = my_opt.by_ref(); + + // Use the optional ref: should be None, so unwrap_or(99) + let out_port = p1 + .source_iter(q!(1..=2i32)) + .map(q!(|x| x + opt_ref.unwrap_or(99))) + .send_bincode_external(&external); + + let nodes = builder + .with_default_optimize() + .with_process(&p1, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut out_recv = nodes.connect(out_port).await; + + deployment.start().await.unwrap(); + + let mut results = Vec::new(); + for _ in 0..2 { + results.push(out_recv.next().await.unwrap()); + } + results.sort(); + // optional is None, so unwrap_or(99) => 100, 101 + assert_eq!(results, vec![100, 101]); + } + + #[tokio::test] + async fn test_optional_ref_and_consume() { + let mut deployment = Deployment::new(); + + let mut builder = FlowBuilder::new(); + let external = builder.external::<()>(); + let p1 = builder.process::<()>(); + + // Use reduce to produce an Optional + let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b)); + + let opt_ref = my_opt.by_ref(); + + // Reference path + let out_port_ref = p1 + .source_iter(q!(1..=2i32)) + .map(q!(|x| x + opt_ref.unwrap_or(0))) + .send_bincode_external(&external); + + let nodes = builder + .with_default_optimize() + .with_process(&p1, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut out_recv_ref = nodes.connect(out_port_ref).await; + + deployment.start().await.unwrap(); + + let mut ref_results = Vec::new(); + for _ in 0..2 { + ref_results.push(out_recv_ref.next().await.unwrap()); + } + ref_results.sort(); + // reduce(0..5) = 10, so 1+10=11, 2+10=12 + assert_eq!(ref_results, vec![11, 12]); + } +} diff --git a/hydro_test/src/local/stream_ref.rs b/hydro_test/src/local/stream_ref.rs new file mode 100644 index 000000000000..4f709f087d7d --- /dev/null +++ b/hydro_test/src/local/stream_ref.rs @@ -0,0 +1,159 @@ +#[cfg(test)] +mod tests { + use futures::StreamExt; + use hydro_deploy::Deployment; + use hydro_lang::compile::builder::FlowBuilder; + use stageleft::q; + + #[tokio::test] + async fn test_stream_ref() { + let mut deployment = Deployment::new(); + + let mut builder = FlowBuilder::new(); + let external = builder.external::<()>(); + let p1 = builder.process::<()>(); + + // Create a bounded stream (source_iter is bounded within a tick) + let my_stream = p1.source_iter(q!(1..=5i32)); + + let stream_ref = my_stream.by_ref(); + + // Use the stream ref to get the vec's length + let out_port = p1 + .source_iter(q!([()])) + .map(q!(|_| stream_ref.len() as i32)) + .send_bincode_external(&external); + + // Also consume the stream via pipe + my_stream.for_each(q!(|_| {})); + + let nodes = builder + .with_default_optimize() + .with_process(&p1, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut out_recv = nodes.connect(out_port).await; + + deployment.start().await.unwrap(); + + let result = out_recv.next().await.unwrap(); + // stream has 5 elements + assert_eq!(result, 5); + } + + #[tokio::test] + async fn test_stream_ref_contents() { + let mut deployment = Deployment::new(); + + let mut builder = FlowBuilder::new(); + let external = builder.external::<()>(); + let p1 = builder.process::<()>(); + + // Create a bounded stream + let my_stream = p1.source_iter(q!(1..=3i32)); + + let stream_ref = my_stream.by_ref(); + + // Sum the referenced vec's contents + let out_port = p1 + .source_iter(q!([()])) + .map(q!(|_| stream_ref.iter().sum::())) + .send_bincode_external(&external); + + my_stream.for_each(q!(|_| {})); + + let nodes = builder + .with_default_optimize() + .with_process(&p1, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut out_recv = nodes.connect(out_port).await; + + deployment.start().await.unwrap(); + + let result = out_recv.next().await.unwrap(); + // sum of 1+2+3 = 6 + assert_eq!(result, 6); + } + + #[tokio::test] + async fn test_stream_ref_no_consumer() { + let mut deployment = Deployment::new(); + + let mut builder = FlowBuilder::new(); + let external = builder.external::<()>(); + let p1 = builder.process::<()>(); + + // Create a bounded stream — no pipe consumer, only ref + let my_stream = p1.source_iter(q!(1..=4i32)); + + let stream_ref = my_stream.by_ref(); + + let out_port = p1 + .source_iter(q!([()])) + .map(q!(|_| stream_ref.len() as i32)) + .send_bincode_external(&external); + + let nodes = builder + .with_default_optimize() + .with_process(&p1, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut out_recv = nodes.connect(out_port).await; + + deployment.start().await.unwrap(); + + let result = out_recv.next().await.unwrap(); + assert_eq!(result, 4); + } + + #[tokio::test] + async fn test_stream_mut() { + let mut deployment = Deployment::new(); + + let mut builder = FlowBuilder::new(); + let external = builder.external::<()>(); + let p1 = builder.process::<()>(); + + // Create a bounded stream + let my_stream = p1.source_iter(q!(1..=5i32)); + + let stream_mut = my_stream.by_mut(); + + // Mutably reference the buffer to retain only items > 3 + let out_port = p1 + .source_iter(q!([()])) + .map(q!(|_| { + stream_mut.retain(|x| *x > 3); + stream_mut.len() as i32 + })) + .send_bincode_external(&external); + + my_stream.for_each(q!(|_| {})); + + let nodes = builder + .with_default_optimize() + .with_process(&p1, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut out_recv = nodes.connect(out_port).await; + + deployment.start().await.unwrap(); + + let result = out_recv.next().await.unwrap(); + // After retain(> 3): [4, 5] => len = 2 + assert_eq!(result, 2); + } +}