diff --git a/dfir_lang/src/graph/ops/fold.rs b/dfir_lang/src/graph/ops/fold.rs index 41e1857c7255..a0470389ead8 100644 --- a/dfir_lang/src/graph/ops/fold.rs +++ b/dfir_lang/src/graph/ops/fold.rs @@ -1,7 +1,7 @@ use quote::quote_spanned; use super::{ - DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, Persistence, RANGE_0, + OperatorCategory, OperatorConstraints, OperatorWriteOutput, Persistence, RANGE_0, RANGE_1, WriteContextArgs, }; @@ -46,7 +46,7 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, op_span, @@ -126,8 +126,8 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { ) ); } - } else { - assert_eq!(0, outputs.len()); + } else if outputs.is_empty() { + // Terminal push: fold is a singleton reference target with no downstream. quote_spanned! {op_span=> let #ident = #root::dfir_pipes::push::for_each(|#item_ident| { #assign_accum_ident @@ -135,6 +135,35 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { #foreach_body }); } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = { + #[inline(always)] + fn __push_fold<'a, Acc, Item, CombFn, Next>( + acc_ref: &'a mut Acc, + comb_fn: CombFn, + next: Next, + ) -> #root::dfir_pipes::push::Accumulate< + #root::dfir_pipes::push::FoldState<&'a mut Acc, CombFn, Acc, Item>, + Next, + > + where + CombFn: ::std::ops::FnMut(&mut Acc, Item), + Next: #root::dfir_pipes::push::Push<&'a mut Acc, ()>, + { + #root::dfir_pipes::push::fold(acc_ref, comb_fn, next) + } + __push_fold( + &mut #singleton_output_ident, + |#accumulator_ident: &mut _, #item_ident| { #foreach_body }, + #root::dfir_pipes::push::map( + |__val: &mut _| ::std::clone::Clone::clone(&*__val), + #output, + ), + ) + }; + } }; Ok(OperatorWriteOutput { diff --git a/dfir_lang/src/graph/ops/fold_keyed.rs b/dfir_lang/src/graph/ops/fold_keyed.rs index 924dc649a3c0..f4cb43ce01b6 100644 --- a/dfir_lang/src/graph/ops/fold_keyed.rs +++ b/dfir_lang/src/graph/ops/fold_keyed.rs @@ -1,7 +1,7 @@ use quote::{ToTokens, quote_spanned}; use super::{ - DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, + OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, OperatorWriteOutput, Persistence, RANGE_1, WriteContextArgs, }; @@ -79,15 +79,15 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { op_span, work_fn_async, ident, inputs, + outputs, is_pull, root, - op_name, op_inst: OperatorInstance { generics: @@ -102,8 +102,6 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { .. }, _| { - assert!(is_pull, "TODO(mingwei): `{}` only supports pull.", op_name); - let persistence = match persistence_args[..] { [] => Persistence::Tick, [a] => a, @@ -143,7 +141,21 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { let mut #hashtable_ident = &mut #singleton_output_ident; }; - let write_iterator = if Persistence::Mutable == persistence { + let write_iterator = if !is_pull { + assert!( + Persistence::Mutable != persistence, + "fold_keyed::<'mutable> on push side is not supported ('mutable is being removed)" + ); + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::FoldKeyed::new( + &mut #singleton_output_ident, + #initfn, + #aggfn, + #output, + ); + } + } else if Persistence::Mutable == persistence { quote_spanned! {op_span=> #assign_hashtable_ident diff --git a/dfir_lang/src/graph/ops/fold_no_replay.rs b/dfir_lang/src/graph/ops/fold_no_replay.rs index 062a75cdaba2..919521a06251 100644 --- a/dfir_lang/src/graph/ops/fold_no_replay.rs +++ b/dfir_lang/src/graph/ops/fold_no_replay.rs @@ -1,7 +1,7 @@ use quote::quote_spanned; use super::{ - DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, Persistence, RANGE_0, + OperatorCategory, OperatorConstraints, OperatorWriteOutput, Persistence, RANGE_0, RANGE_1, WriteContextArgs, }; @@ -28,7 +28,7 @@ pub const FOLD_NO_REPLAY: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, context, @@ -117,8 +117,8 @@ pub const FOLD_NO_REPLAY: OperatorConstraints = OperatorConstraints { ) }; } - } else { - assert_eq!(0, outputs.len()); + } else if outputs.is_empty() { + // Terminal push: fold_no_replay is a singleton reference target with no downstream. quote_spanned! {op_span=> let #ident = #root::dfir_pipes::push::for_each(|#item_ident| { #assign_accum_ident @@ -126,6 +126,47 @@ pub const FOLD_NO_REPLAY: OperatorConstraints = OperatorConstraints { #foreach_body }); } + } else { + let output = &outputs[0]; + let was_updated_ident = wc.make_ident("was_updated"); + quote_spanned! {op_span=> + let #was_updated_ident = ::std::cell::Cell::new(false); + let #ident = { + #[inline(always)] + fn __push_fold<'a, Acc, Item, CombFn, Next>( + acc_ref: &'a mut Acc, + comb_fn: CombFn, + next: Next, + ) -> #root::dfir_pipes::push::Accumulate< + #root::dfir_pipes::push::FoldState<&'a mut Acc, CombFn, Acc, Item>, + Next, + > + where + CombFn: ::std::ops::FnMut(&mut Acc, Item), + Next: #root::dfir_pipes::push::Push<&'a mut Acc, ()>, + { + #root::dfir_pipes::push::fold(acc_ref, comb_fn, next) + } + __push_fold( + &mut #singleton_output_ident, + |#accumulator_ident: &mut _, #item_ident| { + #was_updated_ident.set(true); + #foreach_body + }, + #root::dfir_pipes::push::filter( + { + let __was_updated = &#was_updated_ident; + let __context: &_ = #context; + move |_| __was_updated.get() || __context.current_tick().0 == 0 + }, + #root::dfir_pipes::push::map( + |__val: &mut _| ::std::clone::Clone::clone(&*__val), + #output, + ), + ), + ) + }; + } }; Ok(OperatorWriteOutput { diff --git a/dfir_rs/tests/compile-fail/stable/surface_fold_keyed_generics_bad.stderr b/dfir_rs/tests/compile-fail/stable/surface_fold_keyed_generics_bad.stderr index d1867fbe8ba2..726c300d9a9b 100644 --- a/dfir_rs/tests/compile-fail/stable/surface_fold_keyed_generics_bad.stderr +++ b/dfir_rs/tests/compile-fail/stable/surface_fold_keyed_generics_bad.stderr @@ -1,4 +1,4 @@ -error[E0271]: type mismatch resolving `> as Pull>::Item == (_, _)` +error[E0271]: type mismatch resolving `> as Pull>::CanPend, CanEnd = > as Pull>::CanEnd> as Pull>::Item == (_, _)` --> tests/compile-fail/stable/surface_fold_keyed_generics_bad.rs:3:9 | 3 | source_iter(["hello", "world"]) @@ -14,7 +14,7 @@ note: required by a bound in `check_input` 4 | -> fold_keyed::<'tick, &str, usize>(String::new, |old: &mut _, val| { | ^^^^^^^^^^ required by this bound in `check_input` -error[E0271]: type mismatch resolving `> as Pull>::Item == (_, _)` +error[E0271]: type mismatch resolving `> as Pull>::CanPend, CanEnd = > as Pull>::CanEnd> as Pull>::Item == (_, _)` --> tests/compile-fail/stable/surface_fold_keyed_generics_bad.rs:4:16 | 4 | -> fold_keyed::<'tick, &str, usize>(String::new, |old: &mut _, val| { diff --git a/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_int.stderr b/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_int.stderr index 63540c20fe3c..1ea60196f076 100644 --- a/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_int.stderr +++ b/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_int.stderr @@ -1,4 +1,4 @@ -error[E0271]: type mismatch resolving `> as Pull>::Item == (_, _)` +error[E0271]: type mismatch resolving `> as Pull>::CanPend, CanEnd = > as Pull>::CanEnd> as Pull>::Item == (_, _)` --> tests/compile-fail/stable/surface_reduce_keyed_badtype_int.rs:3:9 | 3 | source_iter(0..1) @@ -14,7 +14,7 @@ note: required by a bound in `check_input` 4 | -> fold_keyed(|| 0, |old: &mut u32, val: u32| { *old += val; }) | ^^^^^^^^^^ required by this bound in `check_input` -error[E0271]: type mismatch resolving `> as Pull>::Item == (_, _)` +error[E0271]: type mismatch resolving `> as Pull>::CanPend, CanEnd = > as Pull>::CanEnd> as Pull>::Item == (_, _)` --> tests/compile-fail/stable/surface_reduce_keyed_badtype_int.rs:4:16 | 4 | -> fold_keyed(|| 0, |old: &mut u32, val: u32| { *old += val; }) diff --git a/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_option.stderr b/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_option.stderr index 44040b452874..8ec63163e6bc 100644 --- a/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_option.stderr +++ b/dfir_rs/tests/compile-fail/stable/surface_reduce_keyed_badtype_option.stderr @@ -1,4 +1,4 @@ -error[E0271]: type mismatch resolving `>> as Pull>::Item == (_, _)` +error[E0271]: type mismatch resolving `, Meta = (), CanPend = >> as Pull>::CanPend, CanEnd = >> as Pull>::CanEnd> as Pull>::Item == (_, _)` --> tests/compile-fail/stable/surface_reduce_keyed_badtype_option.rs:3:9 | 3 | source_iter([ Some(5), None, Some(12) ]) @@ -14,7 +14,7 @@ note: required by a bound in `check_input` 4 | -> fold_keyed(|| 0, |old: &mut u32, val: u32| { *old += val; }) | ^^^^^^^^^^ required by this bound in `check_input` -error[E0271]: type mismatch resolving `>> as Pull>::Item == (_, _)` +error[E0271]: type mismatch resolving `, Meta = (), CanPend = >> as Pull>::CanPend, CanEnd = >> as Pull>::CanEnd> as Pull>::Item == (_, _)` --> tests/compile-fail/stable/surface_reduce_keyed_badtype_option.rs:4:16 | 4 | -> fold_keyed(|| 0, |old: &mut u32, val: u32| { *old += val; }) diff --git a/dfir_rs/tests/metrics.rs b/dfir_rs/tests/metrics.rs index d67b461ae781..dc162ff30500 100644 --- a/dfir_rs/tests/metrics.rs +++ b/dfir_rs/tests/metrics.rs @@ -83,7 +83,7 @@ async fn test_handoff_metrics() { let mut flow = dfir_rs::dfir_syntax! { source_iter(0..5) -> map(|x| x * 2) - -> fold(|| 0, |acc: &mut _, x| { *acc += x; }) + -> defer_tick() -> for_each(|x| { output_send.send(x).unwrap(); }); }; @@ -99,7 +99,7 @@ async fn test_handoff_metrics() { // Verify output let output: Vec<_> = collect_ready_async(&mut output_recv).await; - assert_eq!(output, vec![20]); + assert_eq!(output, vec![0, 2, 4, 6, 8]); } #[multiplatform_test(dfir)] diff --git a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap index accb6cfec895..640cc67121ce 100644 --- a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap @@ -1,6 +1,5 @@ --- source: dfir_rs/tests/surface_cross_singleton.rs -assertion_line: 58 expression: df.meta_graph().unwrap().to_dot(cfg) --- digraph { @@ -16,7 +15,7 @@ digraph { n8v1 [label="(n8v1) cross_singleton()", shape=invhouse, fillcolor="#88aaff"] n9v1 [label="(n9v1) tee()", shape=house, fillcolor="#ffff88"] n10v1 [label="(n10v1) for_each(|x| egress_tx.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n11v1 [label="(n11v1) fold(|| 0, |_, _| {})", shape=invhouse, fillcolor="#88aaff"] + n11v1 [label="(n11v1) fold(|| 0, |_, _| {})", shape=house, fillcolor="#ffff88"] n12v1 [label="(n12v1) cross_singleton()", shape=invhouse, fillcolor="#88aaff"] n13v1 [label="(n13v1) fold(|| 0, |_, _| {})", shape=invhouse, fillcolor="#88aaff"] n14v1 [label="(n14v1) flat_map(|_| [])", shape=invhouse, fillcolor="#88aaff"] @@ -26,8 +25,6 @@ digraph { n18v1 [label="(n18v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n19v1 [label="(n19v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n20v1 [label="(n20v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n21v1 [label="(n21v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n22v1 [label="(n22v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 n1v1 -> n15v1 n3v1 -> n16v1 @@ -38,19 +35,17 @@ digraph { n7v1 -> n18v1 n8v1 -> n9v1 n9v1 -> n10v1 - n9v1 -> n19v1 - n3v1 -> n20v1 - n11v1 -> n21v1 + n9v1 -> n11v1 + n3v1 -> n19v1 + n11v1 -> n20v1 n13v1 -> n14v1 - n12v1 -> n22v1 + n12v1 -> n13v1 n15v1 -> n2v1 [color=red] n16v1 -> n8v1 [label="input"] n17v1 -> n4v1 [color=red] n18v1 -> n8v1 [label="single", color=red] - n19v1 -> n11v1 [color=red] - n20v1 -> n12v1 [label="input"] - n21v1 -> n12v1 [label="single", color=red] - n22v1 -> n13v1 [color=red] + n19v1 -> n12v1 [label="input"] + n20v1 -> n12v1 [label="single", color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -98,6 +93,11 @@ digraph { style=filled label = "sg_4v1" n10v1 + subgraph sg_4v1_var_folded_thing { + cluster=true + label="var folded_thing" + n11v1 + } subgraph sg_4v1_var_join { cluster=true label="var join" @@ -110,33 +110,16 @@ digraph { fillcolor="#dddddd" style=filled label = "sg_5v1" - subgraph sg_5v1_var_folded_thing { + subgraph sg_5v1_var_deferred_stream { cluster=true - label="var folded_thing" - n11v1 + label="var deferred_stream" + n13v1 + n14v1 } - } - subgraph sg_6v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_6v1" - subgraph sg_6v1_var_joined_folded { + subgraph sg_5v1_var_joined_folded { cluster=true label="var joined_folded" n12v1 } } - subgraph sg_7v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_7v1" - subgraph sg_7v1_var_deferred_stream { - cluster=true - label="var deferred_stream" - n13v1 - n14v1 - } - } } diff --git a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap index 27cc16fb4709..8beea61b49ce 100644 --- a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap @@ -1,6 +1,5 @@ --- source: dfir_rs/tests/surface_cross_singleton.rs -assertion_line: 58 expression: df.meta_graph().unwrap().to_mermaid(cfg) --- %%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% @@ -19,7 +18,7 @@ linkStyle default stroke:#aaa 8v1[\"(8v1) cross_singleton()"/]:::pullClass 9v1[/"(9v1) tee()"\]:::pushClass 10v1[/"(10v1) for_each(|x| egress_tx.send(x).unwrap())"\]:::pushClass -11v1[\"(11v1) fold(|| 0, |_, _| {})"/]:::pullClass +11v1[/"(11v1) fold(|| 0, |_, _| {})"\]:::pushClass 12v1[\"(12v1) cross_singleton()"/]:::pullClass 13v1[\"(13v1) fold(|| 0, |_, _| {})"/]:::pullClass 14v1[\"(14v1) flat_map(|_| [])"/]:::pullClass @@ -29,8 +28,6 @@ linkStyle default stroke:#aaa 18v1["(18v1) handoff"]:::otherClass 19v1["(19v1) handoff"]:::otherClass 20v1["(20v1) handoff"]:::otherClass -21v1["(21v1) handoff"]:::otherClass -22v1["(22v1) handoff"]:::otherClass 2v1-->3v1 1v1-->15v1 3v1-->16v1 @@ -41,19 +38,17 @@ linkStyle default stroke:#aaa 7v1-->18v1 8v1-->9v1 9v1-->10v1 -9v1-->19v1 -3v1-->20v1 -11v1-->21v1 +9v1-->11v1 +3v1-->19v1 +11v1-->20v1 13v1-->14v1 -12v1-->22v1 +12v1-->13v1 15v1--x2v1; linkStyle 15 stroke:red 16v1-->|input|8v1 17v1--o4v1; linkStyle 17 stroke:red 18v1--x|single|8v1; linkStyle 18 stroke:red -19v1--x11v1; linkStyle 19 stroke:red -20v1-->|input|12v1 -21v1--x|single|12v1; linkStyle 21 stroke:red -22v1--x13v1; linkStyle 22 stroke:red +19v1-->|input|12v1 +20v1--x|single|12v1; linkStyle 20 stroke:red subgraph sg_1v1 ["sg_1v1"] subgraph sg_1v1_var_teed_in ["var teed_in"] 1v1 @@ -77,24 +72,20 @@ subgraph sg_3v1 ["sg_3v1"] end subgraph sg_4v1 ["sg_4v1"] 10v1 + subgraph sg_4v1_var_folded_thing ["var folded_thing"] + 11v1 + end subgraph sg_4v1_var_join ["var join"] 8v1 9v1 end end subgraph sg_5v1 ["sg_5v1"] - subgraph sg_5v1_var_folded_thing ["var folded_thing"] - 11v1 - end -end -subgraph sg_6v1 ["sg_6v1"] - subgraph sg_6v1_var_joined_folded ["var joined_folded"] - 12v1 - end -end -subgraph sg_7v1 ["sg_7v1"] - subgraph sg_7v1_var_deferred_stream ["var deferred_stream"] + subgraph sg_5v1_var_deferred_stream ["var deferred_stream"] 13v1 14v1 end + subgraph sg_5v1_var_joined_folded ["var joined_folded"] + 12v1 + end end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap index d72e195d1270..00b222047709 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap @@ -9,23 +9,15 @@ digraph { n2v1 [label="(n2v1) fold::<'tick>(Vec::new, Vec::push)", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) flat_map(|mut vec| {\l vec.sort();\l vec\l})\l", shape=invhouse, fillcolor="#88aaff"] n4v1 [label="(n4v1) for_each(|v| print!(\"{:?}, \", v))", shape=house, fillcolor="#ffff88"] - n5v1 [label="(n5v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n3v1 -> n4v1 n2v1 -> n3v1 - n1v1 -> n5v1 - n5v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 n4v1 diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap index 23eeb5a328d2..2bc91938c224 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap @@ -12,15 +12,11 @@ linkStyle default stroke:#aaa 2v1[\"(2v1) fold::<'tick>(Vec::new, Vec::push)"/]:::pullClass 3v1[\"
(3v1)
flat_map(|mut vec| {
vec.sort();
vec
})
"/]:::pullClass 4v1[/"(4v1) for_each(|v| print!("{:?}, ", v))"\]:::pushClass -5v1["(5v1) handoff"]:::otherClass 3v1-->4v1 2v1-->3v1 -1v1-->5v1 -5v1--x2v1; linkStyle 3 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 4v1 diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap index 41feff8748af..d3b8ff5a45f0 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) fold::<\l 'static,\l>(\l Vec::new,\l |old: &mut Vec, mut x: Vec| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap index 6cc830469e8c..05d146be034c 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"
(2v1)
fold::<
'static,
>(
Vec::new,
|old: &mut Vec<u32>, mut x: Vec<u32>| {
old.append(&mut x);
},
)
"/]:::pullClass 3v1[/"(3v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap index 95a707ceb2f5..59ebaf6f3383 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap @@ -13,47 +13,35 @@ digraph { n6v1 [label="(n6v1) cross_join_multiset()", shape=invhouse, fillcolor="#88aaff"] n7v1 [label="(n7v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n8v1 + n1v1 -> n2v1 n3v1 -> n4v1 - n3v1 -> n9v1 + n3v1 -> n8v1 n5v1 -> n6v1 [label="0"] n6v1 -> n7v1 - n8v1 -> n2v1 [color=red] - n9v1 -> n6v1 [label="1"] + n8v1 -> n6v1 [label="1"] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" + n4v1 subgraph sg_1v1_var_teed_fold { cluster=true label="var teed_fold" n1v1 - } - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" - n4v1 - subgraph sg_2v1_var_teed_fold { - cluster=true - label="var teed_fold" n2v1 n3v1 } } - subgraph sg_3v1 { + subgraph sg_2v1 { cluster=true fillcolor="#dddddd" style=filled - label = "sg_3v1" + label = "sg_2v1" n5v1 n7v1 - subgraph sg_3v1_var_join_node { + subgraph sg_2v1_var_join_node { cluster=true label="var join_node" n6v1 diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap index cf1332118c0a..0b8fa1234d35 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap @@ -16,31 +16,25 @@ linkStyle default stroke:#aaa 6v1[\"(6v1) cross_join_multiset()"/]:::pullClass 7v1[/"(7v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass 8v1["(8v1) handoff"]:::otherClass -9v1["(9v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->8v1 +1v1-->2v1 3v1-->4v1 -3v1-->9v1 +3v1-->8v1 5v1-->|0|6v1 6v1-->7v1 -8v1--x2v1; linkStyle 6 stroke:red -9v1-->|1|6v1 +8v1-->|1|6v1 subgraph sg_1v1 ["sg_1v1"] + 4v1 subgraph sg_1v1_var_teed_fold ["var teed_fold"] 1v1 - end -end -subgraph sg_2v1 ["sg_2v1"] - 4v1 - subgraph sg_2v1_var_teed_fold ["var teed_fold"] 2v1 3v1 end end -subgraph sg_3v1 ["sg_3v1"] +subgraph sg_2v1 ["sg_2v1"] 5v1 7v1 - subgraph sg_3v1_var_join_node ["var join_node"] + subgraph sg_2v1_var_join_node ["var join_node"] 6v1 end end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap index 4b8afed77715..e99a8fda216b 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) fold::<\l 'tick,\l>(\l Vec::new,\l |old: &mut Vec, mut x: Vec| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap index 5b16dca98905..c270ae75724f 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"
(2v1)
fold::<
'tick,
>(
Vec::new,
|old: &mut Vec<u32>, mut x: Vec<u32>| {
old.append(&mut x);
},
)
"/]:::pullClass 3v1[/"(3v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_dot.snap index 8007905e5da7..ee84ae6985a5 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_dot.snap @@ -9,11 +9,9 @@ digraph { n2v1 [label="(n2v1) map(|m: SubordResponse| (m.xid, m.mtype))", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) fold_keyed::<'static>(|| 0, |old: &mut u32, val: u32| *old += val)", shape=invhouse, fillcolor="#88aaff"] n4v1 [label="(n4v1) for_each(|kv| result_send.send(kv).unwrap())", shape=house, fillcolor="#ffff88"] - n5v1 [label="(n5v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n3v1 -> n4v1 - n2v1 -> n5v1 + n2v1 -> n3v1 n1v1 -> n2v1 - n5v1 -> n3v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -21,12 +19,6 @@ digraph { label = "sg_1v1" n1v1 n2v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n3v1 n4v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_mermaid.snap index 7912a4da6579..575b372b38d9 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_infer_basic@graphvis_mermaid.snap @@ -12,16 +12,12 @@ linkStyle default stroke:#aaa 2v1[\"(2v1) map(|m: SubordResponse| (m.xid, m.mtype))"/]:::pullClass 3v1[\"(3v1) fold_keyed::<'static>(|| 0, |old: &mut u32, val: u32| *old += val)"/]:::pullClass 4v1[/"(4v1) for_each(|kv| result_send.send(kv).unwrap())"\]:::pushClass -5v1["(5v1) handoff"]:::otherClass 3v1-->4v1 -2v1-->5v1 +2v1-->3v1 1v1-->2v1 -5v1--x3v1; linkStyle 3 stroke:red subgraph sg_1v1 ["sg_1v1"] 1v1 2v1 -end -subgraph sg_2v1 ["sg_2v1"] 3v1 4v1 end diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_dot.snap index 06b48926a7b5..3ac21ebdb07f 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) fold_keyed::<\l 'static,\l>(Vec::new, |old: &mut Vec, mut x: Vec| old.append(&mut x))\l", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_mermaid.snap index 005c15e2e5ec..5017bf425bf9 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_static@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"
(2v1)
fold_keyed::<
'static,
>(Vec::new, |old: &mut Vec<u32>, mut x: Vec<u32>| old.append(&mut x))
"/]:::pullClass 3v1[/"(3v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_dot.snap index c1198c4f1490..cbc7d207fa31 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) fold_keyed::<\l 'tick,\l>(Vec::new, |old: &mut Vec, mut x: Vec| old.append(&mut x))\l", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_mermaid.snap index 46ee3897d610..c122bc4af0a1 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_tick@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"
(2v1)
fold_keyed::<
'tick,
>(Vec::new, |old: &mut Vec<u32>, mut x: Vec<u32>| old.append(&mut x))
"/]:::pullClass 3v1[/"(3v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_dot.snap index baaab6db78eb..996c6fe3cc72 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_dot.snap @@ -9,11 +9,9 @@ digraph { n2v1 [label="(n2v1) map(|m: SubordResponse| (m.xid, m.mtype))", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) fold_keyed::<'static, &'static str, u32>(|| 0, |old: &mut u32, val: u32| *old += val)", shape=invhouse, fillcolor="#88aaff"] n4v1 [label="(n4v1) for_each(|kv| result_send.send(kv).unwrap())", shape=house, fillcolor="#ffff88"] - n5v1 [label="(n5v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n3v1 -> n4v1 - n2v1 -> n5v1 + n2v1 -> n3v1 n1v1 -> n2v1 - n5v1 -> n3v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -21,12 +19,6 @@ digraph { label = "sg_1v1" n1v1 n2v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n3v1 n4v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_mermaid.snap index caedf15e5542..fe6a9768c70c 100644 --- a/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold_keyed__fold_keyed_typed_basic@graphvis_mermaid.snap @@ -12,16 +12,12 @@ linkStyle default stroke:#aaa 2v1[\"(2v1) map(|m: SubordResponse| (m.xid, m.mtype))"/]:::pullClass 3v1[\"(3v1) fold_keyed::<'static, &'static str, u32>(|| 0, |old: &mut u32, val: u32| *old += val)"/]:::pullClass 4v1[/"(4v1) for_each(|kv| result_send.send(kv).unwrap())"\]:::pushClass -5v1["(5v1) handoff"]:::otherClass 3v1-->4v1 -2v1-->5v1 +2v1-->3v1 1v1-->2v1 -5v1--x3v1; linkStyle 3 stroke:red subgraph sg_1v1 ["sg_1v1"] 1v1 2v1 -end -subgraph sg_2v1 ["sg_2v1"] 3v1 4v1 end diff --git a/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_dot.snap index 740faf8ade3b..effbe5dd2076 100644 --- a/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_dot.snap @@ -10,31 +10,23 @@ digraph { n3v1 [label="(n3v1) singleton", shape=parallelogram, fillcolor="#ddddff"] n4v1 [label="(n4v1) map(|x: i32| x * 10)", shape=invhouse, fillcolor="#88aaff"] n5v1 [label="(n5v1) for_each(|v: i32| out.push(v))", shape=house, fillcolor="#ffff88"] - n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n4v1 -> n5v1 n3v1 -> n4v1 n2v1 -> n3v1 - n1v1 -> n6v1 - n6v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 + n2v1 } subgraph sg_2v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_2v1" - n2v1 - } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" n4v1 n5v1 } diff --git a/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_mermaid.snap index 04ac8e134fdf..997d5dcb919a 100644 --- a/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_handoff__singleton_with_fold@graphvis_mermaid.snap @@ -13,19 +13,15 @@ linkStyle default stroke:#aaa 3v1["(3v1) singleton"]:::otherClass 4v1[\"(4v1) map(|x: i32| x * 10)"/]:::pullClass 5v1[/"(5v1) for_each(|v: i32| out.push(v))"\]:::pushClass -6v1["(6v1) handoff"]:::otherClass 4v1-->5v1 3v1-->4v1 2v1-->3v1 -1v1-->6v1 -6v1--x2v1; linkStyle 4 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 end -subgraph sg_3v1 ["sg_3v1"] +subgraph sg_2v1 ["sg_2v1"] 4v1 5v1 end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap index f047ef59287d..895898f50d8e 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap @@ -10,12 +10,10 @@ digraph { n3v1 [label="(n3v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"] n4v1 [label="(n4v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"] n5v1 [label="(n5v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n4v1 -> n5v1 - n3v1 -> n6v1 + n3v1 -> n4v1 n2v1 -> n3v1 n1v1 -> n2v1 - n6v1 -> n4v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -24,12 +22,6 @@ digraph { n1v1 n2v1 n3v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n4v1 n5v1 } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap index b229f7c7dd10..aa347615011a 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap @@ -13,18 +13,14 @@ linkStyle default stroke:#aaa 3v1[\"(3v1) persist::<'static>()"/]:::pullClass 4v1[\"(4v1) fold(|| 0, |a: &mut _, b| *a += b)"/]:::pullClass 5v1[/"(5v1) for_each(|x| result_send.send(x).unwrap())"\]:::pushClass -6v1["(6v1) handoff"]:::otherClass 4v1-->5v1 -3v1-->6v1 +3v1-->4v1 2v1-->3v1 1v1-->2v1 -6v1--x4v1; linkStyle 4 stroke:red subgraph sg_1v1 ["sg_1v1"] 1v1 2v1 3v1 -end -subgraph sg_2v1 ["sg_2v1"] 4v1 5v1 end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap index f8020b92b7bd..72d5d6d0efb9 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap @@ -14,7 +14,6 @@ digraph { n7v1 [label="(n7v1) union()", shape=invhouse, fillcolor="#88aaff"] n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"] n9v1 [label="(n9v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n4v1 n1v1 -> n2v1 n3v1 -> n4v1 @@ -22,8 +21,7 @@ digraph { n4v1 -> n5v1 n6v1 -> n7v1 n8v1 -> n9v1 - n7v1 -> n10v1 - n10v1 -> n8v1 [color=red] + n7v1 -> n8v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -43,16 +41,6 @@ digraph { cluster=true label="var m1" n7v1 - } - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" - subgraph sg_2v1_var_m1 { - cluster=true - label="var m1" n8v1 n9v1 } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap index d49ad4e5762f..41be2c9ff5e6 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap @@ -17,7 +17,6 @@ linkStyle default stroke:#aaa 7v1[\"(7v1) union()"/]:::pullClass 8v1[\"(8v1) fold(|| 0, |a: &mut _, b| *a += b)"/]:::pullClass 9v1[/"(9v1) for_each(|x| result_send.send(x).unwrap())"\]:::pushClass -10v1["(10v1) handoff"]:::otherClass 2v1-->4v1 1v1-->2v1 3v1-->4v1 @@ -25,8 +24,7 @@ linkStyle default stroke:#aaa 4v1-->5v1 6v1-->7v1 8v1-->9v1 -7v1-->10v1 -10v1--x8v1; linkStyle 8 stroke:red +7v1-->8v1 subgraph sg_1v1 ["sg_1v1"] 6v1 3v1 @@ -38,10 +36,6 @@ subgraph sg_1v1 ["sg_1v1"] end subgraph sg_1v1_var_m1 ["var m1"] 7v1 - end -end -subgraph sg_2v1 ["sg_2v1"] - subgraph sg_2v1_var_m1 ["var m1"] 8v1 9v1 end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap index d8ac1098af23..0e8aa63f7de4 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap @@ -12,9 +12,8 @@ digraph { n5v1 [label="(n5v1) persist::<'static>()", shape=house, fillcolor="#ffff88"] n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"] n7v1 [label="(n7v1) null()", shape=house, fillcolor="#ffff88"] - n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"] + n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=house, fillcolor="#ffff88"] n9v1 [label="(n9v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 n1v1 -> n2v1 n3v1 -> n4v1 @@ -22,8 +21,7 @@ digraph { n3v1 -> n5v1 n6v1 -> n7v1 n8v1 -> n9v1 - n6v1 -> n10v1 - n10v1 -> n8v1 [color=red] + n6v1 -> n8v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -31,6 +29,8 @@ digraph { label = "sg_1v1" n4v1 n7v1 + n8v1 + n9v1 subgraph sg_1v1_var_t0 { cluster=true label="var t0" @@ -45,12 +45,4 @@ digraph { n6v1 } } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" - n8v1 - n9v1 - } } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap index 7759f7b1ce47..288a0a129418 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap @@ -15,9 +15,8 @@ linkStyle default stroke:#aaa 5v1[/"(5v1) persist::<'static>()"\]:::pushClass 6v1[/"(6v1) tee()"\]:::pushClass 7v1[/"(7v1) null()"\]:::pushClass -8v1[\"(8v1) fold(|| 0, |a: &mut _, b| *a += b)"/]:::pullClass +8v1[/"(8v1) fold(|| 0, |a: &mut _, b| *a += b)"\]:::pushClass 9v1[/"(9v1) for_each(|x| result_send.send(x).unwrap())"\]:::pushClass -10v1["(10v1) handoff"]:::otherClass 2v1-->3v1 1v1-->2v1 3v1-->4v1 @@ -25,11 +24,12 @@ linkStyle default stroke:#aaa 3v1-->5v1 6v1-->7v1 8v1-->9v1 -6v1-->10v1 -10v1--x8v1; linkStyle 8 stroke:red +6v1-->8v1 subgraph sg_1v1 ["sg_1v1"] 4v1 7v1 + 8v1 + 9v1 subgraph sg_1v1_var_t0 ["var t0"] 1v1 2v1 @@ -40,7 +40,3 @@ subgraph sg_1v1 ["sg_1v1"] 6v1 end end -subgraph sg_2v1 ["sg_2v1"] - 8v1 - 9v1 -end diff --git a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_dot.snap index af8220570d7e..0c0c2057d2e0 100644 --- a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_dot.snap @@ -14,21 +14,24 @@ digraph { n7v1 [label="(n7v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] n8v1 [label="(n8v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"] n9v1 [label="(n9v1) for_each(|x| max_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n3v1 -> n4v1 - n2v1 -> n10v1 + n2v1 -> n3v1 n6v1 -> n7v1 n5v1 -> n6v1 n1v1 -> n5v1 n8v1 -> n9v1 n4v1 -> n8v1 - n10v1 -> n3v1 [color=red] n4v1 -> n5v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" + subgraph sg_1v1_var_max_of_stream2 { + cluster=true + label="var max_of_stream2" + n3v1 + } subgraph sg_1v1_var_stream2 { cluster=true label="var stream2" @@ -40,35 +43,24 @@ digraph { fillcolor="#dddddd" style=filled label = "sg_2v1" - subgraph sg_2v1_var_max_of_stream2 { - cluster=true - label="var max_of_stream2" - n3v1 - } - } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - subgraph sg_3v1_var_filtered_stream1 { + subgraph sg_2v1_var_filtered_stream1 { cluster=true label="var filtered_stream1" n5v1 n6v1 n7v1 } - subgraph sg_3v1_var_stream1 { + subgraph sg_2v1_var_stream1 { cluster=true label="var stream1" n1v1 } } - subgraph sg_4v1 { + subgraph sg_3v1 { cluster=true fillcolor="#dddddd" style=filled - label = "sg_4v1" + label = "sg_3v1" n8v1 n9v1 } diff --git a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_mermaid.snap index e3f56d1cae7f..b72d58d0f4b3 100644 --- a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton@graphvis_mermaid.snap @@ -17,37 +17,33 @@ linkStyle default stroke:#aaa 7v1[/"(7v1) for_each(|x| filter_send.send(x).unwrap())"\]:::pushClass 8v1[\"(8v1) map(|x| (context.current_tick(), x))"/]:::pullClass 9v1[/"(9v1) for_each(|x| max_send.send(x).unwrap())"\]:::pushClass -10v1["(10v1) handoff"]:::otherClass 3v1-->4v1 -2v1-->10v1 +2v1-->3v1 6v1-->7v1 5v1-->6v1 1v1-->5v1 8v1-->9v1 4v1-->8v1 -10v1--x3v1; linkStyle 7 stroke:red -4v1--x5v1; linkStyle 8 stroke:red +4v1--x5v1; linkStyle 7 stroke:red subgraph sg_1v1 ["sg_1v1"] + subgraph sg_1v1_var_max_of_stream2 ["var max_of_stream2"] + 3v1 + end subgraph sg_1v1_var_stream2 ["var stream2"] 2v1 end end subgraph sg_2v1 ["sg_2v1"] - subgraph sg_2v1_var_max_of_stream2 ["var max_of_stream2"] - 3v1 - end -end -subgraph sg_3v1 ["sg_3v1"] - subgraph sg_3v1_var_filtered_stream1 ["var filtered_stream1"] + subgraph sg_2v1_var_filtered_stream1 ["var filtered_stream1"] 5v1 6v1 7v1 end - subgraph sg_3v1_var_stream1 ["var stream1"] + subgraph sg_2v1_var_stream1 ["var stream1"] 1v1 end end -subgraph sg_4v1 ["sg_4v1"] +subgraph sg_3v1 ["sg_3v1"] 8v1 9v1 end diff --git a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_dot.snap index 9d8b52241757..2062b8960419 100644 --- a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_dot.snap @@ -13,20 +13,23 @@ digraph { n6v1 [label="(n6v1) filter(|&value| { value <= *max_of_stream2 })", shape=invhouse, fillcolor="#88aaff"] n7v1 [label="(n7v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"] n8v1 [label="(n8v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n3v1 -> n4v1 - n2v1 -> n9v1 + n2v1 -> n3v1 n7v1 -> n8v1 n6v1 -> n7v1 n5v1 -> n6v1 n1v1 -> n5v1 - n9v1 -> n3v1 [color=red] n4v1 -> n6v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" + subgraph sg_1v1_var_max_of_stream2 { + cluster=true + label="var max_of_stream2" + n3v1 + } subgraph sg_1v1_var_stream2 { cluster=true label="var stream2" @@ -38,18 +41,7 @@ digraph { fillcolor="#dddddd" style=filled label = "sg_2v1" - subgraph sg_2v1_var_max_of_stream2 { - cluster=true - label="var max_of_stream2" - n3v1 - } - } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - subgraph sg_3v1_var_filtered_stream1 { + subgraph sg_2v1_var_filtered_stream1 { cluster=true label="var filtered_stream1" n5v1 @@ -57,7 +49,7 @@ digraph { n7v1 n8v1 } - subgraph sg_3v1_var_stream1 { + subgraph sg_2v1_var_stream1 { cluster=true label="var stream1" n1v1 diff --git a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_mermaid.snap index e466477df8bc..563778f055dc 100644 --- a/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_singleton__fold_singleton_push@graphvis_mermaid.snap @@ -16,33 +16,29 @@ linkStyle default stroke:#aaa 6v1[\"(6v1) filter(|&value| { value <= *max_of_stream2 })"/]:::pullClass 7v1[\"(7v1) map(|x| (context.current_tick(), x))"/]:::pullClass 8v1[/"(8v1) for_each(|x| filter_send.send(x).unwrap())"\]:::pushClass -9v1["(9v1) handoff"]:::otherClass 3v1-->4v1 -2v1-->9v1 +2v1-->3v1 7v1-->8v1 6v1-->7v1 5v1-->6v1 1v1-->5v1 -9v1--x3v1; linkStyle 6 stroke:red -4v1--x6v1; linkStyle 7 stroke:red +4v1--x6v1; linkStyle 6 stroke:red subgraph sg_1v1 ["sg_1v1"] + subgraph sg_1v1_var_max_of_stream2 ["var max_of_stream2"] + 3v1 + end subgraph sg_1v1_var_stream2 ["var stream2"] 2v1 end end subgraph sg_2v1 ["sg_2v1"] - subgraph sg_2v1_var_max_of_stream2 ["var max_of_stream2"] - 3v1 - end -end -subgraph sg_3v1 ["sg_3v1"] - subgraph sg_3v1_var_filtered_stream1 ["var filtered_stream1"] + subgraph sg_2v1_var_filtered_stream1 ["var filtered_stream1"] 5v1 6v1 7v1 8v1 end - subgraph sg_3v1_var_stream1 ["var stream1"] + subgraph sg_2v1_var_stream1 ["var stream1"] 1v1 end end diff --git a/dfir_rs/tests/surface_push_blocking.rs b/dfir_rs/tests/surface_push_blocking.rs new file mode 100644 index 000000000000..181dc4c5005a --- /dev/null +++ b/dfir_rs/tests/surface_push_blocking.rs @@ -0,0 +1,60 @@ +//! Tests that blocking operators work on the push side of a subgraph. +//! These operators end up push-side when a tee (multi-output) is upstream in the same subgraph. + +use dfir_rs::dfir_syntax; +use dfir_rs::util::collect_ready; +use multiplatform_test::multiplatform_test; + +/// fold_keyed on push side +#[multiplatform_test] +pub fn test_fold_keyed_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<(i32, i32)>(); + let mut df = dfir_syntax! { + my_tee = source_iter([(1, 10), (1, 20), (2, 30)]) -> tee(); + my_tee -> fold_keyed(|| 0, |a: &mut _, b| *a += b) -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + let mut out = collect_ready::, _>(&mut out_recv); + out.sort(); + assert_eq!(&[(1, 30), (2, 30)], &*out); +} + +/// fold on push side: source -> tee -> fold -> for_each +#[multiplatform_test] +pub fn test_fold_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::(); + let mut df = dfir_syntax! { + my_tee = source_iter([1, 2, 3]) -> tee(); + my_tee -> fold(|| 0, |a: &mut _, b| *a += b) -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + assert_eq!(&[6], &*collect_ready::, _>(&mut out_recv)); +} + +/// fold_no_replay on push side: source -> tee -> fold_no_replay -> for_each +#[multiplatform_test] +pub fn test_fold_no_replay_push() { + let (items_send, items_recv) = dfir_rs::util::unbounded_channel::(); + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::(); + let mut df = dfir_syntax! { + my_tee = source_stream(items_recv) -> tee(); + my_tee -> fold_no_replay::<'static>(|| 0, |a: &mut _, b| *a += b) -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + + items_send.send(1).unwrap(); + items_send.send(2).unwrap(); + df.run_tick_sync(); + assert_eq!(&[3], &*collect_ready::, _>(&mut out_recv)); + + // No new input: fold_no_replay should NOT emit. + df.run_tick_sync(); + assert_eq!(&[] as &[i32], &*collect_ready::, _>(&mut out_recv)); + + // New input arrives: should emit updated accumulator. + items_send.send(10).unwrap(); + df.run_tick_sync(); + assert_eq!(&[13], &*collect_ready::, _>(&mut out_recv)); +} diff --git a/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc1v1.snap b/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc1v1.snap index 251c805fbe1a..d9caaa61e3bd 100644 --- a/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc1v1.snap +++ b/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc1v1.snap @@ -10,13 +10,11 @@ _5v1 = map (stageleft :: runtime_support :: fnmut1_type_hint :: < (f64 , f64) , _6v1 = fold :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < (u64 , u64) > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: compute_pi :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_compute_pi_rs_22_15 ! ([] [| | (0u64 , 0u64)]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , bool , () > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: compute_pi :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_compute_pi_rs_23_15 ! ([] [| (inside , total) , sample_inside | { if sample_inside { * inside += 1 ; } * total += 1 ; }]) })); _7v1 = map (hydro_lang :: runtime_support :: stageleft :: runtime_support :: fn1_type_hint :: < (u64 , u64) , _ > (| data | { hydro_lang :: runtime_support :: bincode :: serialize (& data) . unwrap () . into () })); _8v1 = dest_sink (DUMMY_SINK); -_9v1 = handoff(); _1v1 -> _2v1; _2v1 -> _3v1; _3v1 -> _4v1; _4v1 -> _5v1; -_5v1 -> _9v1; +_5v1 -> _6v1; _7v1 -> _8v1; _6v1 -> _7v1; -_9v1 -> _6v1; diff --git a/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap b/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap index 23507f5e2cdc..c6548ab078d0 100644 --- a/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap +++ b/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc1v1.snap @@ -22,26 +22,22 @@ _17v1 = reduce_keyed :: < 'static > (stageleft :: runtime_support :: fn2_borrow_ _18v1 = for_each (stageleft :: runtime_support :: fnmut1_type_hint :: < (std :: string :: String , i32) , () > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: map_reduce :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_map_reduce_rs_45_21 ! ([] [| (string , count) | println ! ("{}: {}" , string , count)]) })); _19v1 = handoff(); _20v1 = handoff(); -_21v1 = handoff(); -_22v1 = handoff(); _1v1 -> _2v1; _2v1 -> _3v1; _4v1 -> _5v1; -_5v1 -> _19v1; +_5v1 -> _6v1; _6v1 -> _7v1; _7v1 -> _8v1; -_8v1 -> _20v1; +_8v1 -> _9v1; _3v1 -> [input]_10v1; -_9v1 -> _21v1; +_9v1 -> _19v1; _10v1 -> _11v1; _12v1 -> _13v1; _11v1 -> _12v1; _14v1 -> _15v1; _15v1 -> _16v1; -_16v1 -> _22v1; +_16v1 -> _20v1; _17v1 -> _18v1; -_19v1 -> _6v1; -_20v1 -> _9v1; -_21v1 -> [single]_10v1; -_22v1 -> _17v1; +_19v1 -> [single]_10v1; +_20v1 -> _17v1; diff --git a/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc2v1.snap b/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc2v1.snap index 0f42a2a7e7a6..fc098a7dccbd 100644 --- a/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc2v1.snap +++ b/hydro_test/src/cluster/snapshots/map_reduce_ir@surface_graph_loc2v1.snap @@ -9,12 +9,10 @@ _4v1 = fold_keyed :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: _5v1 = inspect (stageleft :: runtime_support :: fnmut1_borrow_type_hint :: < (std :: string :: String , i32) , () > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: map_reduce :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_map_reduce_rs_27_20 ! ([] [| (string , count) | println ! ("partition count: {} - {}" , string , count)]) })); _6v1 = map (hydro_lang :: runtime_support :: stageleft :: runtime_support :: fn1_type_hint :: < (std :: string :: String , i32) , _ > (| data | { hydro_lang :: runtime_support :: bincode :: serialize (& data) . unwrap () . into () })); _7v1 = dest_sink (DUMMY_SINK); -_8v1 = handoff(); _1v1 -> _2v1; _2v1 -> _3v1; -_3v1 -> _8v1; +_3v1 -> _4v1; _4v1 -> _5v1; _6v1 -> _7v1; _5v1 -> _6v1; -_8v1 -> _4v1; diff --git a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap index 960bb6505cfd..9379f8e501a1 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap @@ -98,15 +98,15 @@ linkStyle default stroke:#aaa 35v1-->|1|33v1 26v1-->35v1 36v1-->37v1 -33v1--x36v1; linkStyle 38 stroke:red -37v1--x38v1; linkStyle 39 stroke:red +33v1-->36v1 +37v1-->38v1 30v1-->|input|39v1 38v1--x|single|39v1; linkStyle 41 stroke:red 39v1-->40v1 41v1-->42v1 42v1--x43v1; linkStyle 44 stroke:red 43v1-->44v1 -44v1--x45v1; linkStyle 46 stroke:red +44v1-->45v1 45v1-->46v1 46v1-->47v1 44v1-->|input|48v1 diff --git a/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap b/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap index 4a69dd70ae4f..a93c16bd1205 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap @@ -293,7 +293,7 @@ linkStyle default stroke:#aaa 15v1-->16v1 16v1-->17v1 18v1-->19v1 -19v1--x20v1; linkStyle 20 stroke:red +19v1-->20v1 20v1-->21v1 21v1-->22v1 16v1-->23v1 @@ -327,10 +327,10 @@ linkStyle default stroke:#aaa 49v1-->50v1 101v1--o51v1; linkStyle 52 stroke:red 52v1-->53v1 -53v1--x54v1; linkStyle 54 stroke:red +53v1-->54v1 54v1-->55v1 55v1-->56v1 -49v1--x57v1; linkStyle 57 stroke:red +49v1-->57v1 57v1-->58v1 26v1-->59v1 59v1-->60v1 @@ -371,7 +371,7 @@ linkStyle default stroke:#aaa 51v1--x|0|93v1; linkStyle 95 stroke:red 92v1-->|1|93v1 93v1-->94v1 -94v1--x95v1; linkStyle 98 stroke:red +94v1-->95v1 95v1-->96v1 96v1-->97v1 97v1-->98v1 @@ -417,7 +417,7 @@ linkStyle default stroke:#aaa 130v1-->131v1 131v1-->132v1 133v1-->134v1 -134v1--x135v1; linkStyle 144 stroke:red +134v1-->135v1 135v1-->136v1 136v1-->137v1 128v1-->138v1 @@ -452,7 +452,7 @@ linkStyle default stroke:#aaa 163v1-->164v1 164v1-->165v1 165v1-->166v1 -166v1--x167v1; linkStyle 179 stroke:red +166v1-->167v1 167v1-->168v1 168v1-->169v1 169v1-->170v1 @@ -470,14 +470,14 @@ linkStyle default stroke:#aaa 179v1--x|single|180v1; linkStyle 194 stroke:red 180v1-->181v1 181v1-->182v1 -182v1--x183v1; linkStyle 197 stroke:red +182v1-->183v1 183v1-->|input|184v1 179v1--x|single|184v1; linkStyle 199 stroke:red 184v1-->185v1 185v1-->186v1 233v1--o187v1; linkStyle 202 stroke:red 188v1-->189v1 -189v1--x190v1; linkStyle 204 stroke:red +189v1-->190v1 190v1-->191v1 191v1-->192v1 182v1-->|input|193v1 @@ -524,7 +524,7 @@ linkStyle default stroke:#aaa 187v1--x|0|225v1; linkStyle 248 stroke:red 224v1-->|1|225v1 225v1-->226v1 -226v1--x227v1; linkStyle 251 stroke:red +226v1-->227v1 227v1-->228v1 228v1-->229v1 229v1-->230v1 @@ -559,7 +559,7 @@ linkStyle default stroke:#aaa 252v1-->253v1 253v1-->254v1 255v1-->256v1 -256v1--x257v1; linkStyle 286 stroke:red +256v1-->257v1 257v1-->258v1 258v1-->259v1 240v1-->|probe|260v1 diff --git a/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap b/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap index 147571bef33b..3ab6c336d93f 100644 --- a/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap @@ -64,7 +64,7 @@ linkStyle default stroke:#aaa 56v1["
(56v1)

dest_sink(DUMMY_SINK)
"]:::otherClass 24v1--o1v1; linkStyle 0 stroke:red 2v1-->3v1 -3v1--x4v1; linkStyle 2 stroke:red +3v1-->4v1 4v1-->5v1 5v1-->6v1 7v1-->8v1 @@ -79,7 +79,7 @@ linkStyle default stroke:#aaa 1v1--x|0|17v1; linkStyle 14 stroke:red 16v1-->|1|17v1 17v1-->18v1 -18v1--x19v1; linkStyle 17 stroke:red +18v1-->19v1 19v1-->21v1 21v1-->22v1 18v1-->|pos|23v1 @@ -91,7 +91,7 @@ linkStyle default stroke:#aaa 27v1-->28v1 50v1--o29v1; linkStyle 27 stroke:red 30v1-->31v1 -31v1--x32v1; linkStyle 29 stroke:red +31v1-->32v1 32v1-->33v1 33v1-->34v1 34v1-->|0|35v1 @@ -105,7 +105,7 @@ linkStyle default stroke:#aaa 29v1--x|0|43v1; linkStyle 40 stroke:red 42v1-->|1|43v1 43v1-->44v1 -44v1--x45v1; linkStyle 43 stroke:red +44v1-->45v1 45v1-->47v1 47v1-->48v1 44v1-->|pos|49v1 diff --git a/hydro_test/src/local/snapshots/chat_app_no_replay.snap b/hydro_test/src/local/snapshots/chat_app_no_replay.snap index 55a69bde6c4a..30191b88dd78 100644 --- a/hydro_test/src/local/snapshots/chat_app_no_replay.snap +++ b/hydro_test/src/local/snapshots/chat_app_no_replay.snap @@ -30,7 +30,6 @@ linkStyle default stroke:#aaa 20v1[\"
(20v1)
map(|data| {
hydro_lang::runtime_support::bincode::serialize(&data).unwrap().into()
})
"/]:::pullClass 21v1[/"(21v1) dest_sink(DUMMY_SINK)"\]:::pushClass 22v1["(22v1) handoff"]:::otherClass -23v1["(23v1) handoff"]:::otherClass 1v1-->2v1 4v1-->1v1 3v1-->4v1 @@ -38,19 +37,18 @@ linkStyle default stroke:#aaa 8v1-->5v1 7v1-->8v1 9v1-->10v1 -10v1-->22v1 +10v1-->11v1 11v1-->12v1 12v1-->13v1 14v1-->15v1 15v1-->16v1 16v1-->17v1 13v1-->|probe|18v1 -17v1-->23v1 +17v1-->22v1 18v1-->19v1 20v1-->21v1 19v1-->20v1 -22v1--x11v1; linkStyle 18 stroke:red -23v1--x|build|18v1; linkStyle 19 stroke:red +22v1--x|build|18v1; linkStyle 18 stroke:red subgraph sg_1v1 ["sg_1v1"] 1v1 2v1 @@ -72,39 +70,37 @@ subgraph sg_2v1 ["sg_2v1"] end end subgraph sg_3v1 ["sg_3v1"] - subgraph sg_3v1_var_stream_8 ["var stream_8"] - 9v1 - 10v1 - end -end -subgraph sg_4v1 ["sg_4v1"] - subgraph sg_4v1_var_stream_14 ["var stream_14"] + subgraph sg_3v1_var_stream_14 ["var stream_14"] 14v1 15v1 end - subgraph sg_4v1_var_stream_15 ["var stream_15"] + subgraph sg_3v1_var_stream_15 ["var stream_15"] 16v1 end - subgraph sg_4v1_var_stream_17 ["var stream_17"] + subgraph sg_3v1_var_stream_17 ["var stream_17"] 17v1 end end -subgraph sg_5v1 ["sg_5v1"] +subgraph sg_4v1 ["sg_4v1"] 20v1 21v1 - subgraph sg_5v1_var_stream_12 ["var stream_12"] + subgraph sg_4v1_var_stream_12 ["var stream_12"] 12v1 end - subgraph sg_5v1_var_stream_13 ["var stream_13"] + subgraph sg_4v1_var_stream_13 ["var stream_13"] 13v1 end - subgraph sg_5v1_var_stream_18 ["var stream_18"] + subgraph sg_4v1_var_stream_18 ["var stream_18"] 18v1 end - subgraph sg_5v1_var_stream_19 ["var stream_19"] + subgraph sg_4v1_var_stream_19 ["var stream_19"] 19v1 end - subgraph sg_5v1_var_stream_9 ["var stream_9"] + subgraph sg_4v1_var_stream_8 ["var stream_8"] + 9v1 + 10v1 + end + subgraph sg_4v1_var_stream_9 ["var stream_9"] 11v1 end end