diff --git a/hydro_lang/src/compile/ir/mod.rs b/hydro_lang/src/compile/ir/mod.rs index c2de373de8a6..7bc03cd92518 100644 --- a/hydro_lang/src/compile/ir/mod.rs +++ b/hydro_lang/src/compile/ir/mod.rs @@ -1491,6 +1491,16 @@ impl HydroRoot { fold_hooked_idents, ); + let input_ident = maybe_observe_for_mut( + f, + input_ident, + &input.metadata().location_id, + &input.metadata().collection_kind, + &input.metadata().op, + builders_or_callback, + next_stmt_id, + ); + let stmt_id = next_stmt_id.get_and_increment(); match builders_or_callback { diff --git a/hydro_lang/src/live_collections/stream/mod.rs b/hydro_lang/src/live_collections/stream/mod.rs index 89bf21a6534a..63491039c047 100644 --- a/hydro_lang/src/live_collections/stream/mod.rs +++ b/hydro_lang/src/live_collections/stream/mod.rs @@ -1294,20 +1294,31 @@ where /// Executes the provided closure for every element in this stream. /// - /// Because the closure may have side effects, the stream must have deterministic order - /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate - /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and - /// [`Stream::assume_retries`] with an explanation for why this is the case. + /// If the stream is unordered or has retries, the closure must demonstrate commutativity + /// and/or idempotence via annotations: + /// ```rust,ignore + /// stream.for_each(q!( + /// |x| *flag_mut |= x, + /// commutative = manual_proof!(/** boolean OR is commutative */), + /// idempotent = manual_proof!(/** boolean OR is idempotent */) + /// )); + /// ``` /// - /// The closure may capture singletons via `by_ref()` or `by_mut()`. No commutativity - /// or idempotence proofs are needed because the `TotalOrder + ExactlyOnce` requirements - /// already guarantee deterministic execution. - pub fn for_each(self, f: impl IntoQuotedMut<'a, F, L>) - where - O: IsOrdered, - R: IsExactlyOnce, + /// On a `TotalOrder + ExactlyOnce` stream, no annotations are needed. + /// + /// The closure may capture singletons via `by_ref()` or `by_mut()`. + pub fn for_each( + self, + f: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra>, + ) where + C: ValidCommutativityFor, + I: ValidIdempotenceFor, { - let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into()); + let f = crate::handoff_ref::with_ref_capture(|| { + let (f, proof) = f.splice_fnmut1_ctx_props(&self.location); + proof.register_proof(&f); + f.into() + }); self.location .flow_state() .borrow_mut() diff --git a/hydro_lang/tests/compile-fail/nightly/non_commutative_for_each.stderr b/hydro_lang/tests/compile-fail/nightly/non_commutative_for_each.stderr new file mode 100644 index 000000000000..116ab01b9bea --- /dev/null +++ b/hydro_lang/tests/compile-fail/nightly/non_commutative_for_each.stderr @@ -0,0 +1,22 @@ +error[E0277]: Because the input stream has ordering `hydro_lang::live_collections::stream::NoOrder`, the closure must demonstrate commutativity with a `commutative = ...` annotation. + --> tests/compile-fail/nightly/non_commutative_for_each.rs:11:10 + | +11 | .for_each(q!(|x| println!("{}", x))); + | ^^^^^^^^ required for this call + | + = note: To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary. +help: the trait `ValidCommutativityFor` is not implemented for `NotProved` + but trait `ValidCommutativityFor` is implemented for it + --> src/properties/mod.rs + | + | impl ValidCommutativityFor for NotProved {} + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + = help: for that trait implementation, expected `hydro_lang::live_collections::stream::TotalOrder`, found `hydro_lang::live_collections::stream::NoOrder` +note: required by a bound in `hydro_lang::prelude::Stream::::for_each` + --> src/live_collections/stream/mod.rs + | + | pub fn for_each( + | -------- required by a bound in this associated function +... + | C: ValidCommutativityFor, + | ^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Stream::::for_each` diff --git a/hydro_lang/tests/compile-fail/non_commutative_for_each.rs b/hydro_lang/tests/compile-fail/non_commutative_for_each.rs new file mode 100644 index 000000000000..be54051da2f7 --- /dev/null +++ b/hydro_lang/tests/compile-fail/non_commutative_for_each.rs @@ -0,0 +1,14 @@ +#![allow(unexpected_cfgs)] + +use hydro_lang::live_collections::stream::NoOrder; +use hydro_lang::prelude::*; + +struct P1 {} + +fn test<'a>(p1: &Process<'a, P1>) { + p1.source_iter(q!(0..10)) + .weaken_ordering::() + .for_each(q!(|x| println!("{}", x))); +} + +fn main() {} diff --git a/hydro_lang/tests/compile-fail/stable/non_commutative_for_each.stderr b/hydro_lang/tests/compile-fail/stable/non_commutative_for_each.stderr new file mode 100644 index 000000000000..2e234680d4de --- /dev/null +++ b/hydro_lang/tests/compile-fail/stable/non_commutative_for_each.stderr @@ -0,0 +1,22 @@ +error[E0277]: Because the input stream has ordering `hydro_lang::live_collections::stream::NoOrder`, the closure must demonstrate commutativity with a `commutative = ...` annotation. + --> tests/compile-fail/stable/non_commutative_for_each.rs:11:10 + | +11 | .for_each(q!(|x| println!("{}", x))); + | ^^^^^^^^ required for this call + | + = note: To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary. +help: the trait `ValidCommutativityFor` is not implemented for `NotProved` + but trait `ValidCommutativityFor` is implemented for it + --> src/properties/mod.rs + | + | impl ValidCommutativityFor for NotProved {} + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + = help: for that trait implementation, expected `hydro_lang::live_collections::stream::TotalOrder`, found `hydro_lang::live_collections::stream::NoOrder` +note: required by a bound in `hydro_lang::prelude::Stream::::for_each` + --> src/live_collections/stream/mod.rs + | + | pub fn for_each( + | -------- required by a bound in this associated function +... + | C: ValidCommutativityFor, + | ^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Stream::::for_each` diff --git a/hydro_test/src/cluster/snapshots/compute_pi_ir.snap b/hydro_test/src/cluster/snapshots/compute_pi_ir.snap index a319cb325b65..01c33e7919c8 100644 --- a/hydro_test/src/cluster/snapshots/compute_pi_ir.snap +++ b/hydro_test/src/cluster/snapshots/compute_pi_ir.snap @@ -215,12 +215,12 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_906_20 ! ([] [| _ | ()]) }), input: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }), input: Scan { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), input: Batch { inner: Source { source: Stream( diff --git a/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap b/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap index 7c7135facb40..c7230368db79 100644 --- a/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap +++ b/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap @@ -7,9 +7,9 @@ _2v1 = map (| res | { let (id , b) = res . unwrap () ; (hydro_lang :: __staged : _3v1 = map (stageleft :: runtime_support :: fnmut1_type_hint :: < (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: compute_pi :: Worker > , (u64 , u64)) , (u64 , u64) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_keyed_stream_mod_rs_747_30 ! ([] [| (_ , v) | v]) })); _4v1 = reduce :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , (u64 , u64) , () > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: compute_pi :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_compute_pi_rs_37_12 ! ([] [| (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; }]) })); _5v1 = source_stream ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: location :: * ; hydro_lang :: __stageleft_quote_src_location_mod_rs_1415_30 ! ([interval__free = { use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: compute_pi :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_compute_pi_rs_46_15 ! ([] [Duration :: from_secs (1)]) } ,] [tokio_stream :: StreamExt :: map (tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval__free)) , | _ | ())]) }); -_6v1 = scan :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) })); -_7v1 = flat_map (stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) })); -_8v1 = reduce :: < 'tick > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) })); +_6v1 = scan :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) })); +_7v1 = flat_map (stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) })); +_8v1 = reduce :: < 'tick > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) })); _9v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_906_20 ! ([] [| _ | ()]) })); _10v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_879_20 ! ([] [| v | Some (v)]) })); _11v1 = source_iter ([:: std :: option :: Option :: None]); diff --git a/hydro_test/src/cluster/snapshots/map_reduce_ir.snap b/hydro_test/src/cluster/snapshots/map_reduce_ir.snap index 8cf15332b625..b199ef315409 100644 --- a/hydro_test/src/cluster/snapshots/map_reduce_ir.snap +++ b/hydro_test/src/cluster/snapshots/map_reduce_ir.snap @@ -109,8 +109,8 @@ expression: built.ir() }, right: Cast { inner: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1701_15 ! ([] [| | vec ! []]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > , hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1702_15 ! ([] [| acc , v | { acc . push (v) ; }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1712_15 ! ([] [| | vec ! []]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > , hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1713_15 ! ([] [| acc , v | { acc . push (v) ; }]) }), input: ObserveNonDet { inner: Map { f: stageleft :: runtime_support :: fnmut1_type_hint :: < (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , bool) , hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; hydro_lang :: __stageleft_quote_src_live_collections_keyed_singleton_rs_1369_30 ! ([] [| (k , _) | k]) }), diff --git a/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap b/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap index e618e6af1f57..52985b2248da 100644 --- a/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap +++ b/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap @@ -10,7 +10,7 @@ _5v1 = map (stageleft :: runtime_support :: fnmut1_type_hint :: < (hydro_test :: _6v1 = fold_keyed :: < 'static > (stageleft :: runtime_support :: fn0_type_hint :: < bool > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: networking :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_networking_rs_36_11 ! ([] [| | false]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < bool , hydro_test :: __staged :: __deps :: hydro_lang :: location :: MembershipEvent , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: networking :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_networking_rs_37_11 ! ([] [| present , event | { match event { MembershipEvent :: Joined => * present = true , MembershipEvent :: Left => * present = false , } }]) })); _7v1 = filter (stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , bool) , bool > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; hydro_lang :: __stageleft_quote_src_live_collections_keyed_singleton_rs_1807_26 ! ([f__free = stageleft :: runtime_support :: fn1_borrow_type_hint :: < bool , bool > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: networking :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_networking_rs_713_27 ! ([] [| b | * b]) }) ,] [{ let orig = f__free ; move | t : & (_ , _) | orig (& t . 1) }]) })); _8v1 = map (stageleft :: runtime_support :: fnmut1_type_hint :: < (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , bool) , hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; hydro_lang :: __stageleft_quote_src_live_collections_keyed_singleton_rs_1369_30 ! ([] [| (k , _) | k]) })); -_9v1 = fold :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1701_15 ! ([] [| | vec ! []]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > , hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1702_15 ! ([] [| acc , v | { acc . push (v) ; }]) })); +_9v1 = fold :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1712_15 ! ([] [| | vec ! []]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > > , hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1713_15 ! ([] [| acc , v | { acc . push (v) ; }]) })); _10v1 = cross_singleton (); _11v1 = filter_map (stageleft :: runtime_support :: fnmut1_type_hint :: < ((usize , std :: string :: String) , std :: vec :: Vec < hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > >) , core :: option :: Option < (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: map_reduce :: Worker > , std :: string :: String) > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: networking :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_networking_rs_720_31 ! ([] [| (data , members) | { if members . is_empty () { None } else { Some ((members [data . 0 % members . len ()] . clone () , data . 1)) } }]) })); _12v1 = map (hydro_lang :: runtime_support :: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: __staged :: location :: MemberId < _ > , std :: string :: String) , _ > (| (id , data) | { (id . into_tagless () , hydro_lang :: runtime_support :: bincode :: serialize (& data) . unwrap () . into ()) })); diff --git a/hydro_test/src/cluster/snapshots/paxos_ir.snap b/hydro_test/src/cluster/snapshots/paxos_ir.snap index dc8300320291..d44434eae89d 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir.snap @@ -278,7 +278,7 @@ expression: built.ir() inner: ChainFirst { first: Batch { inner: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1519_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1530_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), input: ObserveNonDet { inner: ObserveNonDet { inner: Chain { @@ -854,12 +854,12 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_906_20 ! ([] [| _ | ()]) }), input: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }), input: Scan { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), input: Batch { inner: Source { source: Stream( @@ -1346,11 +1346,11 @@ expression: built.ir() left: Batch { inner: YieldConcat { inner: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < hydro_test :: __staged :: __deps :: tokio :: time :: Instant > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2027_27 ! ([duration__free = { use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_rs_438_15 ! ([i_am_leader_check_timeout__free = 10u64 ,] [Duration :: from_secs (i_am_leader_check_timeout__free)]) } ,] [move | latest_received | { if let Some (latest_received) = latest_received { if Instant :: now () . duration_since (latest_received) > duration__free { Some (()) } else { None } } else { Some (()) } }]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < hydro_test :: __staged :: __deps :: tokio :: time :: Instant > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2038_27 ! ([duration__free = { use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_rs_438_15 ! ([i_am_leader_check_timeout__free = 10u64 ,] [Duration :: from_secs (i_am_leader_check_timeout__free)]) } ,] [move | latest_received | { if let Some (latest_received) = latest_received { if Instant :: now () . duration_since (latest_received) > duration__free { Some (()) } else { None } } else { Some (()) } }]) }), input: Batch { inner: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < hydro_test :: __staged :: __deps :: tokio :: time :: Instant > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2016_15 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < hydro_test :: __staged :: __deps :: tokio :: time :: Instant > , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2018_16 ! ([] [| latest , _ | { * latest = Some (Instant :: now ()) ; }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < hydro_test :: __staged :: __deps :: tokio :: time :: Instant > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2027_15 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < hydro_test :: __staged :: __deps :: tokio :: time :: Instant > , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2029_16 ! ([] [| latest , _ | { * latest = Some (Instant :: now ()) ; }]) }), input: ObserveNonDet { inner: Tee { inner: , @@ -1530,12 +1530,12 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_906_20 ! ([] [| _ | ()]) }), input: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }), input: Scan { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), input: Batch { inner: Source { source: Stream( @@ -1826,7 +1826,7 @@ expression: built.ir() inner: ChainFirst { first: Batch { inner: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1519_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1530_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), input: ObserveNonDet { inner: YieldConcat { inner: Inspect { @@ -3041,7 +3041,7 @@ expression: built.ir() inner: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_rs_94_22 ! ([] [| ballot | ballot . proposer_id]) }), input: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1519_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1530_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), input: ObserveNonDet { inner: Inspect { f: stageleft :: runtime_support :: fnmut1_borrow_type_hint :: < hydro_test :: __staged :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos_with_client :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_with_client_rs_62_36 ! ([] [| ballot | println ! ("Client notified that leader was elected: {:?}" , ballot)]) }), @@ -3623,8 +3623,8 @@ expression: built.ir() input: Cast { inner: CrossSingleton { left: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2427_15 ! ([] [| | 0usize]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (usize , (u32 , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: paxos_bench :: Client > , i32))) , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2429_16 ! ([] [| count , _ | * count += 1]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2438_15 ! ([] [| | 0usize]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (usize , (u32 , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: paxos_bench :: Client > , i32))) , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2440_16 ! ([] [| count , _ | * count += 1]) }), input: Tee { inner: : Map { f: stageleft :: runtime_support :: fnmut1_type_hint :: < ((usize , (u32 , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: paxos_bench :: Client > , i32))) , usize) , (usize , (u32 , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: paxos_bench :: Client > , i32))) > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_rs_786_20 ! ([] [| ((index , payload) , base_slot) | (base_slot + index , payload)]) }), @@ -3852,7 +3852,7 @@ expression: built.ir() inner: YieldConcat { inner: Tee { inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1519_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1530_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), input: ObserveNonDet { inner: Map { f: stageleft :: runtime_support :: fnmut1_type_hint :: < (usize , (usize , hydro_test :: __staged :: cluster :: paxos :: LogValue < (u32 , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: paxos_bench :: Client > , i32)) >)) , usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; hydro_lang :: __stageleft_quote_src_live_collections_keyed_singleton_rs_1369_30 ! ([] [| (k , _) | k]) }), @@ -4581,7 +4581,7 @@ expression: built.ir() first: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_879_20 ! ([] [| v | Some (v)]) }), input: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1519_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1530_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), input: ObserveNonDet { inner: FilterMap { f: stageleft :: runtime_support :: fnmut1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: __staged :: cluster :: paxos :: LogValue < (u32 , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: paxos_bench :: Client > , i32)) > >) , core :: option :: Option < usize > > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_rs_600_23 ! ([] [| (checkpoint , _log) | checkpoint]) }), @@ -6679,7 +6679,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_879_20 ! ([] [| v | Some (v)]) }), input: Batch { inner: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1519_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1530_23 ! ([] [| curr , new | { if new > * curr { * curr = new ; } }]) }), input: YieldConcat { inner: Cast { inner: DeferTick { @@ -6889,7 +6889,7 @@ expression: built.ir() ), input: YieldConcat { inner: Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1553_23 ! ([] [| curr , new | { if new < * curr { * curr = new ; } }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1564_23 ! ([] [| curr , new | { if new < * curr { * curr = new ; } }]) }), input: ObserveNonDet { inner: Map { f: stageleft :: runtime_support :: fnmut1_type_hint :: < (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: kv_replica :: Replica > , usize) , usize > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos_bench :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_bench_rs_96_32 ! ([] [| (_sender , seq) | seq]) }), @@ -7175,8 +7175,8 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , bool > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: paxos_bench :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_paxos_bench_rs_90_32 ! ([f__free = 1usize ,] [move | num_received | num_received == f__free + 1]) }), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2427_15 ! ([] [| | 0usize]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: kv_replica :: Replica > , usize) , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2429_16 ! ([] [| count , _ | * count += 1]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2438_15 ! ([] [| | 0usize]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: kv_replica :: Replica > , usize) , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2440_16 ! ([] [| count , _ | * count += 1]) }), input: ObserveNonDet { inner: Cast { inner: Cast { @@ -8089,12 +8089,12 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_935_20 ! ([] [| _ | ()]) }), input: Tee { inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }), input: Scan { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), input: Batch { inner: Source { source: Stream( @@ -8322,8 +8322,8 @@ expression: built.ir() input: CrossSingleton { left: Tee { inner: : Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2427_15 ! ([] [| | 0usize]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , core :: time :: Duration , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2429_16 ! ([] [| count , _ | * count += 1]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2438_15 ! ([] [| | 0usize]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , core :: time :: Duration , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2440_16 ! ([] [| count , _ | * count += 1]) }), input: ObserveNonDet { inner: Tee { inner: , @@ -9003,12 +9003,12 @@ expression: built.ir() input: DeferTick { input: Tee { inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }), input: Scan { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), input: Batch { inner: Source { source: Stream( diff --git a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap index 89e32e4dcbcf..848f195478c6 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap @@ -15,7 +15,7 @@ linkStyle default stroke:#aaa 5v1["
(5v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_keyed_stream_mod_rs_747_30!(
[] [| (_, v) | v]
)
})
"]:::otherClass 6v1["
(6v1)

tee()
"]:::otherClass 7v1["
(7v1)

inspect({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_486_20!(
[] [| p1a | println!("Acceptor received P1a: {:?}", p1a)]
)
})
"]:::otherClass -8v1["
(8v1)

reduce::<
'static,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1519_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass +8v1["
(8v1)

reduce::<
'static,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1530_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass 9v1["
(9v1)

source_iter([
{
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_488_46!(
[] [Ballot { num : 0, proposer_id : MemberId::from_raw_id(0) }]
)
},
])
"]:::otherClass 10v1["
(10v1)

persist::<'static>()
"]:::otherClass 11v1["
(11v1)

chain_first_n(1)
"]:::otherClass @@ -52,13 +52,13 @@ linkStyle default stroke:#aaa 42v1["
(42v1)

map(|res| {
let (id, b) = res.unwrap();
(
hydro_lang::__staged::location::MemberId::<
hydro_test::__staged::cluster::kv_replica::Replica,
>::from_tagless(id as hydro_lang::__staged::location::TaglessMemberId),
hydro_lang::runtime_support::bincode::deserialize::<usize>(&b).unwrap(),
)
})
"]:::otherClass 43v1["
(43v1)

reduce_keyed::<
'static,
>({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_bench_rs_73_24!(
[] [| curr_seq, seq | { if seq > * curr_seq { * curr_seq = seq; } }]
)
})
"]:::otherClass 44v1["
(44v1)

tee()
"]:::otherClass -45v1["
(45v1)

fold::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2427_15!(
[] [| | 0usize]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2429_16!(
[] [| count, _ | * count += 1]
)
},
)
"]:::otherClass +45v1["
(45v1)

fold::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2438_15!(
[] [| | 0usize]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2440_16!(
[] [| count, _ | * count += 1]
)
},
)
"]:::otherClass 46v1["
(46v1)

map({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_bench_rs_90_32!(
[f__free = 1usize,] [move | num_received | num_received == f__free + 1]
)
})
"]:::otherClass 47v1["
(47v1)

filter({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1053_46!(
[] [| b | * b]
)
})
"]:::otherClass 48v1["
(48v1)

cross_singleton()
"]:::otherClass 49v1["
(49v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1054_20!(
[] [| (d, _) | d]
)
})
"]:::otherClass 50v1["
(50v1)

map({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_bench_rs_96_32!([] [| (_sender, seq) | seq])
})
"]:::otherClass -51v1["
(51v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1553_23!(
[] [| curr, new | { if new < * curr { * curr = new; } }]
)
})
"]:::otherClass +51v1["
(51v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1564_23!(
[] [| curr, new | { if new < * curr { * curr = new; } }]
)
})
"]:::otherClass 52v1["
(52v1)

identity::<usize>()
"]:::otherClass 1v1-->2v1 3v1-->4v1 diff --git a/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap b/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap index af7eb0915496..a8c63fbc8226 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap @@ -12,7 +12,7 @@ linkStyle default stroke:#aaa 2v1["
(2v1)

for_each({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_153_21!([] [| s | println!("{}", s)])
})
"]:::otherClass 3v1["
(3v1)

chain()
"]:::otherClass 4v1["
(4v1)

chain()
"]:::otherClass -5v1["
(5v1)

reduce::<
'static,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1519_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass +5v1["
(5v1)

reduce::<
'static,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1530_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass 6v1["
(6v1)

source_iter([
{
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_282_30!(
[] [Ballot { num : 0, proposer_id : MemberId::from_raw_id(0) }]
)
},
])
"]:::otherClass 7v1["
(7v1)

persist::<'static>()
"]:::otherClass 8v1["
(8v1)

chain_first_n(1)
"]:::otherClass @@ -38,9 +38,9 @@ linkStyle default stroke:#aaa 28v1["
(28v1)

cross_singleton()
"]:::otherClass 29v1["
(29v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_singleton_rs_875_51!(
[] [| (d, _) | d]
)
})
"]:::otherClass 30v1["
(30v1)

source_stream({
hydro_lang::__stageleft_quote_src_location_mod_rs_1415_30!(
[interval__free = { use crate ::__staged::__deps:: *; use crate
::__staged::cluster::paxos:: *; #[allow(unused_imports)] use crate :: *;
__stageleft_quote_src_cluster_paxos_rs_424_15!([i_am_leader_send_timeout__free
= 5u64,] [Duration::from_secs(i_am_leader_send_timeout__free)]) },]
[tokio_stream::StreamExt::map(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval__free)),
| _ | ())]
)
})
"]:::otherClass -31v1["
(31v1)

scan::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1915_27!(
[] [| | None]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1918_24!(
[f__free = stageleft::runtime_support::fn2_borrow_mut_type_hint:: < (),
(),
hydro_test::__staged::__deps::hydro_lang::live_collections::keyed_stream::Generate
< () > > ({ use hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1588_37!([]
[| _, item | Generate::Return(item)]) }), init__free =
stageleft::runtime_support::fn0_type_hint:: < () > ({ use
hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1588_26!([]
[| | ()]) }),] [move | state : & mut Option < Option < _ > >, v | { if
state.is_none() { * state = Some(Some(init__free())); } match state {
Some(Some(state_value)) => match f__free(state_value, v) {
Generate::Yield(out) => Some(Some(out)), Generate::Return(out) => { *
state = Some(None); Some(Some(out)) } Generate::Break => None,
Generate::Continue => Some(None), }, _ => None, } }]
)
},
)
"]:::otherClass -32v1["
(32v1)

flat_map({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1955_27!(
[] [| d | d]
)
})
"]:::otherClass -33v1["
(33v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1589_23!(
[] [| _, _ | {}]
)
})
"]:::otherClass +31v1["
(31v1)

scan::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1926_27!(
[] [| | None]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1929_24!(
[f__free = stageleft::runtime_support::fn2_borrow_mut_type_hint:: < (),
(),
hydro_test::__staged::__deps::hydro_lang::live_collections::keyed_stream::Generate
< () > > ({ use hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1599_37!([]
[| _, item | Generate::Return(item)]) }), init__free =
stageleft::runtime_support::fn0_type_hint:: < () > ({ use
hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1599_26!([]
[| | ()]) }),] [move | state : & mut Option < Option < _ > >, v | { if
state.is_none() { * state = Some(Some(init__free())); } match state {
Some(Some(state_value)) => match f__free(state_value, v) {
Generate::Yield(out) => Some(Some(out)), Generate::Return(out) => { *
state = Some(None); Some(Some(out)) } Generate::Break => None,
Generate::Continue => Some(None), }, _ => None, } }]
)
},
)
"]:::otherClass +32v1["
(32v1)

flat_map({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1966_27!(
[] [| d | d]
)
})
"]:::otherClass +33v1["
(33v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1600_23!(
[] [| _, _ | {}]
)
})
"]:::otherClass 34v1["
(34v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_optional_rs_906_20!(
[] [| _ | ()]
)
})
"]:::otherClass 35v1["
(35v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_optional_rs_879_20!(
[] [| v | Some(v)]
)
})
"]:::otherClass 36v1["
(36v1)

source_iter([::std::option::Option::None])
"]:::otherClass @@ -64,8 +64,8 @@ linkStyle default stroke:#aaa 54v1["
(54v1)

fold_keyed::<
'static,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_networking_rs_36_11!(
[] [| | false]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_networking_rs_37_11!(
[] [| present, event | { match event { MembershipEvent::Joined => *
present = true, MembershipEvent::Left => * present = false, } }]
)
},
)
"]:::otherClass 55v1["
(55v1)

filter({
hydro_lang::__stageleft_quote_src_live_collections_keyed_singleton_rs_1807_26!(
[f__free = stageleft::runtime_support::fn1_borrow_type_hint:: < bool, bool >
({ use hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream::networking:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_networking_rs_1187_61!([]
[| b | * b]) }),] [{ let orig = f__free; move | t : & (_, _) | orig(& t.1) }]
)
})
"]:::otherClass 56v1["
(56v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_keyed_singleton_rs_1369_30!(
[] [| (k, _) | k]
)
})
"]:::otherClass -57v1["
(57v1)

fold::<
'static,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2016_15!(
[] [| | None]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2018_16!(
[] [| latest, _ | { * latest = Some(Instant::now()); }]
)
},
)
"]:::otherClass -58v1["
(58v1)

filter_map({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2027_27!(
[duration__free = { use crate ::__staged::__deps:: *; use crate
::__staged::cluster::paxos:: *; #[allow(unused_imports)] use crate :: *;
__stageleft_quote_src_cluster_paxos_rs_438_15!([i_am_leader_check_timeout__free
= 10u64,] [Duration::from_secs(i_am_leader_check_timeout__free)]) },] [move |
latest_received | { if let Some(latest_received) = latest_received { if
Instant::now().duration_since(latest_received) > duration__free { Some(()) }
else { None } } else { Some(()) } }]
)
})
"]:::otherClass +57v1["
(57v1)

fold::<
'static,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2027_15!(
[] [| | None]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2029_16!(
[] [| latest, _ | { * latest = Some(Instant::now()); }]
)
},
)
"]:::otherClass +58v1["
(58v1)

filter_map({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2038_27!(
[duration__free = { use crate ::__staged::__deps:: *; use crate
::__staged::cluster::paxos:: *; #[allow(unused_imports)] use crate :: *;
__stageleft_quote_src_cluster_paxos_rs_438_15!([i_am_leader_check_timeout__free
= 10u64,] [Duration::from_secs(i_am_leader_check_timeout__free)]) },] [move |
latest_received | { if let Some(latest_received) = latest_received { if
Instant::now().duration_since(latest_received) > duration__free { Some(()) }
else { None } } else { Some(()) } }]
)
})
"]:::otherClass 59v1["
(59v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_singleton_rs_1078_20!(
[] [| b | ! b]
)
})
"]:::otherClass 60v1["
(60v1)

filter({
hydro_lang::__stageleft_quote_src_live_collections_optional_rs_1105_34!(
[] [| b | * b]
)
})
"]:::otherClass 61v1["
(61v1)

cross_singleton()
"]:::otherClass @@ -77,9 +77,9 @@ linkStyle default stroke:#aaa 67v1["
(67v1)

chain_first_n(1)
"]:::otherClass 68v1["
(68v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_optional_rs_908_20!(
[] [| o | o.is_some()]
)
})
"]:::otherClass 69v1["
(69v1)

source_stream({
hydro_lang::__stageleft_quote_src_location_mod_rs_1439_30!(
[delay__free = { use crate ::__staged::__deps:: *; use crate
::__staged::cluster::paxos:: *; #[allow(unused_imports)] use crate :: *;
__stageleft_quote_src_cluster_paxos_rs_453_19!([CLUSTER_SELF_ID__free =
hydro_lang::__staged::location::MemberId:: <
hydro_test::__staged::cluster::paxos::Proposer >
::from_tagless((__hydro_lang_cluster_self_id_loc1v1).clone()),
i_am_leader_check_timeout_delay_multiplier__free = 15usize,]
[Duration::from_secs((CLUSTER_SELF_ID__free.get_raw_id() *
i_am_leader_check_timeout_delay_multiplier__free as u32).into())]) },
interval__free = { use crate ::__staged::__deps:: *; use crate
::__staged::cluster::paxos:: *; #[allow(unused_imports)] use crate :: *;
__stageleft_quote_src_cluster_paxos_rs_458_19!([i_am_leader_check_timeout__free
= 10u64,] [Duration::from_secs(i_am_leader_check_timeout__free)]) },]
[tokio_stream::StreamExt::map(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(tokio::time::Instant::now()
+ delay__free, interval__free,)), | _ | ())]
)
})
"]:::otherClass -70v1["
(70v1)

scan::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1915_27!(
[] [| | None]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1918_24!(
[f__free = stageleft::runtime_support::fn2_borrow_mut_type_hint:: < (),
(),
hydro_test::__staged::__deps::hydro_lang::live_collections::keyed_stream::Generate
< () > > ({ use hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1588_37!([]
[| _, item | Generate::Return(item)]) }), init__free =
stageleft::runtime_support::fn0_type_hint:: < () > ({ use
hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1588_26!([]
[| | ()]) }),] [move | state : & mut Option < Option < _ > >, v | { if
state.is_none() { * state = Some(Some(init__free())); } match state {
Some(Some(state_value)) => match f__free(state_value, v) {
Generate::Yield(out) => Some(Some(out)), Generate::Return(out) => { *
state = Some(None); Some(Some(out)) } Generate::Break => None,
Generate::Continue => Some(None), }, _ => None, } }]
)
},
)
"]:::otherClass -71v1["
(71v1)

flat_map({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1955_27!(
[] [| d | d]
)
})
"]:::otherClass -72v1["
(72v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1589_23!(
[] [| _, _ | {}]
)
})
"]:::otherClass +70v1["
(70v1)

scan::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1926_27!(
[] [| | None]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1929_24!(
[f__free = stageleft::runtime_support::fn2_borrow_mut_type_hint:: < (),
(),
hydro_test::__staged::__deps::hydro_lang::live_collections::keyed_stream::Generate
< () > > ({ use hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1599_37!([]
[| _, item | Generate::Return(item)]) }), init__free =
stageleft::runtime_support::fn0_type_hint:: < () > ({ use
hydro_lang::__staged::__deps:: *; use
hydro_lang::__staged::live_collections::stream:: *;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1599_26!([]
[| | ()]) }),] [move | state : & mut Option < Option < _ > >, v | { if
state.is_none() { * state = Some(Some(init__free())); } match state {
Some(Some(state_value)) => match f__free(state_value, v) {
Generate::Yield(out) => Some(Some(out)), Generate::Return(out) => { *
state = Some(None); Some(Some(out)) } Generate::Break => None,
Generate::Continue => Some(None), }, _ => None, } }]
)
},
)
"]:::otherClass +71v1["
(71v1)

flat_map({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1966_27!(
[] [| d | d]
)
})
"]:::otherClass +72v1["
(72v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1600_23!(
[] [| _, _ | {}]
)
})
"]:::otherClass 73v1["
(73v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_optional_rs_906_20!(
[] [| _ | ()]
)
})
"]:::otherClass 74v1["
(74v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_optional_rs_879_20!(
[] [| v | Some(v)]
)
})
"]:::otherClass 75v1["
(75v1)

source_iter([::std::option::Option::None])
"]:::otherClass @@ -178,7 +178,7 @@ linkStyle default stroke:#aaa 168v1["
(168v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_keyed_singleton_rs_484_23!(
[f__free = stageleft::runtime_support::fn1_type_hint:: < (usize,
core::option::Option < hydro_test::__staged::cluster::paxos::LogValue < (u32,
(hydro_test::__staged::__deps::hydro_lang::location::member_id::MemberId <
hydro_test::__staged::cluster::paxos_bench::Client >, i32)) > >), (usize,
hydro_test::__staged::cluster::paxos::LogValue < (u32,
(hydro_test::__staged::__deps::hydro_lang::location::member_id::MemberId <
hydro_test::__staged::cluster::paxos_bench::Client >, i32)) >) > ({ use crate
::__staged::__deps:: *; use crate ::__staged::cluster::paxos:: *;
#[allow(unused_imports)] use crate :: *;
__stageleft_quote_src_cluster_paxos_rs_628_16!([] [| (count, entry) | (count,
entry.unwrap())]) }),] [{ let orig = f__free; move | (k, v) | (k, orig(v)) }]
)
})
"]:::otherClass 169v1["
(169v1)

tee()
"]:::otherClass 170v1["
(170v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_keyed_singleton_rs_1369_30!(
[] [| (k, _) | k]
)
})
"]:::otherClass -171v1["
(171v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1519_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass +171v1["
(171v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1530_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass 172v1["
(172v1)

tee()
"]:::otherClass 173v1["
(173v1)

map({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_780_71!([] [| s | s + 1])
})
"]:::otherClass 174v1["
(174v1)

defer_tick_lazy()
"]:::otherClass @@ -190,7 +190,7 @@ linkStyle default stroke:#aaa 180v1["
(180v1)

cross_singleton()
"]:::otherClass 181v1["
(181v1)

map({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_786_20!(
[] [| ((index, payload), base_slot) | (base_slot + index, payload)]
)
})
"]:::otherClass 182v1["
(182v1)

tee()
"]:::otherClass -183v1["
(183v1)

fold::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2427_15!(
[] [| | 0usize]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2429_16!(
[] [| count, _ | * count += 1]
)
},
)
"]:::otherClass +183v1["
(183v1)

fold::<
'tick,
>(
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2438_15!(
[] [| | 0usize]
)
},
{
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_2440_16!(
[] [| count, _ | * count += 1]
)
},
)
"]:::otherClass 184v1["
(184v1)

cross_singleton()
"]:::otherClass 185v1["
(185v1)

map({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_794_20!(
[] [| (num_payloads, base_slot) | base_slot + num_payloads]
)
})
"]:::otherClass 186v1["
(186v1)

identity::<usize>()
"]:::otherClass @@ -204,7 +204,7 @@ linkStyle default stroke:#aaa 194v1["
(194v1)

map({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_721_16!(
[] [| ((slot, payload), ballot) | ((slot, ballot), Some(payload))]
)
})
"]:::otherClass 195v1["
(195v1)

cross_singleton()
"]:::otherClass 196v1["
(196v1)

filter_map({
#[allow(unused_imports)]
__stageleft_quote_src_cluster_paxos_rs_600_23!(
[] [| (checkpoint, _log) | checkpoint]
)
})
"]:::otherClass -197v1["
(197v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1519_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass +197v1["
(197v1)

reduce::<
'tick,
>({
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1530_23!(
[] [| curr, new | { if new > * curr { * curr = new; } }]
)
})
"]:::otherClass 198v1["
(198v1)

map({
hydro_lang::__stageleft_quote_src_live_collections_optional_rs_879_20!(
[] [| v | Some(v)]
)
})
"]:::otherClass 199v1["
(199v1)

source_iter([::std::option::Option::None])
"]:::otherClass 200v1["
(200v1)

persist::<'static>()
"]:::otherClass diff --git a/hydro_test/src/cluster/snapshots/two_pc_ir.snap b/hydro_test/src/cluster/snapshots/two_pc_ir.snap index 1b754caaf0c8..cca0a5ceeb42 100644 --- a/hydro_test/src/cluster/snapshots/two_pc_ir.snap +++ b/hydro_test/src/cluster/snapshots/two_pc_ir.snap @@ -1873,12 +1873,12 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_935_20 ! ([] [| _ | ()]) }), input: Tee { inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }), input: Scan { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), input: Batch { inner: Source { source: Stream( @@ -2106,8 +2106,8 @@ expression: built.ir() input: CrossSingleton { left: Tee { inner: : Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2427_15 ! ([] [| | 0usize]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , core :: time :: Duration , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2429_16 ! ([] [| count , _ | * count += 1]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2438_15 ! ([] [| | 0usize]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , core :: time :: Duration , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_2440_16 ! ([] [| count , _ | * count += 1]) }), input: ObserveNonDet { inner: Tee { inner: , @@ -2787,12 +2787,12 @@ expression: built.ir() input: DeferTick { input: Tee { inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }), input: Scan { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }), input: Batch { inner: Source { source: Stream( diff --git a/hydro_test/src/local/snapshots/chat_app_no_replay.snap b/hydro_test/src/local/snapshots/chat_app_no_replay.snap index 0b5535a1f72d..0d244679aa78 100644 --- a/hydro_test/src/local/snapshots/chat_app_no_replay.snap +++ b/hydro_test/src/local/snapshots/chat_app_no_replay.snap @@ -18,7 +18,7 @@ linkStyle default stroke:#aaa 8v1[\"(8v1) identity::<()>()"/]:::pullClass 9v1[\"(9v1) source_stream(DUMMY_SOURCE)"/]:::pullClass 10v1[\"
(10v1)
map(|res| {
let b = res.unwrap();
hydro_lang::runtime_support::bincode::deserialize::<u32>(&b).unwrap()
})
"/]:::pullClass -11v1[\"
(11v1)
fold::<
'static,
>(
{
use hydro_lang::__staged::__deps::*;
use hydro_lang::__staged::live_collections::stream::*;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1701_15!(
[] [| | vec![]]
)
},
{
use hydro_lang::__staged::__deps::*;
use hydro_lang::__staged::live_collections::stream::*;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1702_15!(
[] [| acc, v | { acc.push(v); }]
)
},
)
"/]:::pullClass +11v1[\"
(11v1)
fold::<
'static,
>(
{
use hydro_lang::__staged::__deps::*;
use hydro_lang::__staged::live_collections::stream::*;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1712_15!(
[] [| | vec![]]
)
},
{
use hydro_lang::__staged::__deps::*;
use hydro_lang::__staged::live_collections::stream::*;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1713_15!(
[] [| acc, v | { acc.push(v); }]
)
},
)
"/]:::pullClass 12v1[\"
(12v1)
flat_map({
use hydro_lang::__staged::__deps::*;
use hydro_lang::__staged::live_collections::singleton::*;
hydro_lang::__stageleft_quote_src_live_collections_singleton_rs_637_33!(
[] [| x | x]
)
})
"/]:::pullClass 13v1[\"
(13v1)
map({
use hydro_lang::__staged::__deps::*;
use hydro_lang::__staged::live_collections::stream::*;
hydro_lang::__stageleft_quote_src_live_collections_stream_mod_rs_1168_20!(
[] [| v | ((), v)]
)
})
"/]:::pullClass 14v1[\"(14v1) source_stream(DUMMY_SOURCE)"/]:::pullClass