diff --git a/dfir_lang/src/graph/ops/chain.rs b/dfir_lang/src/graph/ops/chain.rs index d0e80caa2b3b..3767f3838d8f 100644 --- a/dfir_lang/src/graph/ops/chain.rs +++ b/dfir_lang/src/graph/ops/chain.rs @@ -1,7 +1,5 @@ -use crate::graph::PortIndexValue; - use super::{ - DelayType, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1 + OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1 }; /// > 2 input streams of the same type, 1 output stream of the same type @@ -32,11 +30,6 @@ pub const CHAIN: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |idx| match idx { - PortIndexValue::Int(idx) if idx.value == 0 => { - Some(DelayType::Stratum) - } - _else => None, - }, + input_delaytype_fn: |_| None, write_fn: super::union::UNION.write_fn, }; diff --git a/dfir_lang/src/graph/ops/chain_first_n.rs b/dfir_lang/src/graph/ops/chain_first_n.rs index 949826d3089e..c542e2a6c8da 100644 --- a/dfir_lang/src/graph/ops/chain_first_n.rs +++ b/dfir_lang/src/graph/ops/chain_first_n.rs @@ -1,11 +1,10 @@ use quote::quote_spanned; use crate::graph::{ - PortIndexValue, ops::{OperatorWriteOutput, WriteContextArgs}, }; -use super::{DelayType, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1}; +use super::{OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1}; /// > 2 input streams of the same type, 1 output stream of the same type /// @@ -36,13 +35,7 @@ pub const CHAIN_FIRST_N: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |idx| match idx { - PortIndexValue::Int(idx) if idx.value == 0 => { - // will no longer be needed once subgraphs are always DAGs (only run once per tick) - Some(DelayType::Stratum) - } - _else => None, - }, + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, op_span, diff --git a/dfir_lang/src/graph/ops/sort.rs b/dfir_lang/src/graph/ops/sort.rs index 1989d26707ba..195595f8fc42 100644 --- a/dfir_lang/src/graph/ops/sort.rs +++ b/dfir_lang/src/graph/ops/sort.rs @@ -1,7 +1,7 @@ use quote::quote_spanned; use super::{ - DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, + OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs, }; @@ -12,9 +12,7 @@ use super::{ /// -> sort() /// -> assert_eq([1, 2, 3]); /// ``` -/// -/// `sort` is blocking. Only the values collected within a single tick will be sorted and -/// emitted. +/// Within a tick, only the values received within that tick will be sorted and emitted. pub const SORT: OperatorConstraints = OperatorConstraints { name: "sort", categories: &[OperatorCategory::Persistence], @@ -29,27 +27,32 @@ pub const SORT: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |&WriteContextArgs { root, op_span, work_fn_async, ident, inputs, + outputs, is_pull, .. }, _| { - assert!(is_pull); - - let input = &inputs[0]; - let write_iterator = quote_spanned! {op_span=> - // TODO(mingwei): unnecessary extra handoff into_iter() then collect(). - let #ident = { - let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await; - <[_]>::sort_unstable(&mut tmp); - #root::dfir_pipes::pull::iter(tmp) - }; + let write_iterator = if is_pull { + let input = &inputs[0]; + quote_spanned! {op_span=> + let #ident = { + let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await; + <[_]>::sort_unstable(&mut tmp); + #root::dfir_pipes::pull::iter(tmp) + }; + } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::Sort::new(#output); + } }; Ok(OperatorWriteOutput { write_iterator, diff --git a/dfir_lang/src/graph/ops/sort_by_key.rs b/dfir_lang/src/graph/ops/sort_by_key.rs index f45fc73678e6..22b59968d1df 100644 --- a/dfir_lang/src/graph/ops/sort_by_key.rs +++ b/dfir_lang/src/graph/ops/sort_by_key.rs @@ -1,7 +1,7 @@ use quote::quote_spanned; use super::{ - DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, + OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs, }; @@ -29,27 +29,46 @@ pub const SORT_BY_KEY: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |&WriteContextArgs { root, op_span, work_fn_async, ident, inputs, + outputs, is_pull, arguments, .. }, _| { - assert!(is_pull); - let input = &inputs[0]; - let write_iterator = quote_spanned! {op_span=> - // TODO(mingwei): unnecessary extra handoff into_iter() then collect(). - let #ident = { - let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await; - #root::util::sort_unstable_by_key_hrtb(&mut tmp, #arguments); - #root::dfir_pipes::pull::iter(tmp) - }; + let write_iterator = if is_pull { + let input = &inputs[0]; + quote_spanned! {op_span=> + let #ident = { + let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await; + #root::util::sort_unstable_by_key_hrtb(&mut tmp, #arguments); + #root::dfir_pipes::pull::iter(tmp) + }; + } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::fold( + ::std::vec::Vec::new(), + |__buf: &mut ::std::vec::Vec<_>, __item| { + __buf.push(__item); + }, + #root::dfir_pipes::push::flat_map( + |__buf: ::std::vec::Vec<_>| { + let mut __buf = __buf; + #root::util::sort_unstable_by_key_hrtb(&mut __buf, #arguments); + __buf + }, + #output, + ), + ); + } }; Ok(OperatorWriteOutput { write_iterator, diff --git a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap index 640cc67121ce..dfb2d30db149 100644 --- a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_dot.snap @@ -24,28 +24,26 @@ digraph { n17v1 [label="(n17v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n18v1 [label="(n18v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n19v1 [label="(n19v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n20v1 [label="(n20v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n15v1 - n3v1 -> n16v1 + n1v1 -> n2v1 + n3v1 -> n15v1 n4v1 -> n7v1 [label="0"] - n14v1 -> n17v1 + n14v1 -> n16v1 n5v1 -> n6v1 n6v1 -> n7v1 [label="1"] - n7v1 -> n18v1 + n7v1 -> n17v1 n8v1 -> n9v1 n9v1 -> n10v1 n9v1 -> n11v1 - n3v1 -> n19v1 - n11v1 -> n20v1 + n3v1 -> n18v1 + n11v1 -> n19v1 n13v1 -> n14v1 n12v1 -> n13v1 - n15v1 -> n2v1 [color=red] - n16v1 -> n8v1 [label="input"] - n17v1 -> n4v1 [color=red] - n18v1 -> n8v1 [label="single", color=red] - n19v1 -> n12v1 [label="input"] - n20v1 -> n12v1 [label="single", color=red] + n15v1 -> n8v1 [label="input"] + n16v1 -> n4v1 [color=red] + n17v1 -> n8v1 [label="single", color=red] + n18v1 -> n12v1 [label="input"] + n19v1 -> n12v1 [label="single", color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -55,68 +53,58 @@ digraph { cluster=true label="var teed_in" n1v1 - } - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" - subgraph sg_2v1_var_teed_in { - cluster=true - label="var teed_in" n2v1 n3v1 } } - subgraph sg_3v1 { + subgraph sg_2v1 { cluster=true fillcolor="#dddddd" style=filled - label = "sg_3v1" + label = "sg_2v1" n4v1 - subgraph sg_3v1_var_persisted_stream { + subgraph sg_2v1_var_persisted_stream { cluster=true label="var persisted_stream" n5v1 n6v1 } - subgraph sg_3v1_var_unioned_stream { + subgraph sg_2v1_var_unioned_stream { cluster=true label="var unioned_stream" n7v1 } } - subgraph sg_4v1 { + subgraph sg_3v1 { cluster=true fillcolor="#dddddd" style=filled - label = "sg_4v1" + label = "sg_3v1" n10v1 - subgraph sg_4v1_var_folded_thing { + subgraph sg_3v1_var_folded_thing { cluster=true label="var folded_thing" n11v1 } - subgraph sg_4v1_var_join { + subgraph sg_3v1_var_join { cluster=true label="var join" n8v1 n9v1 } } - subgraph sg_5v1 { + subgraph sg_4v1 { cluster=true fillcolor="#dddddd" style=filled - label = "sg_5v1" - subgraph sg_5v1_var_deferred_stream { + label = "sg_4v1" + subgraph sg_4v1_var_deferred_stream { cluster=true label="var deferred_stream" n13v1 n14v1 } - subgraph sg_5v1_var_joined_folded { + subgraph sg_4v1_var_joined_folded { cluster=true label="var joined_folded" n12v1 diff --git a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap index 8beea61b49ce..04e73d835153 100644 --- a/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_cross_singleton__union_defer_tick@graphvis_mermaid.snap @@ -27,65 +27,59 @@ linkStyle default stroke:#aaa 17v1["(17v1) handoff"]:::otherClass 18v1["(18v1) handoff"]:::otherClass 19v1["(19v1) handoff"]:::otherClass -20v1["(20v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->15v1 -3v1-->16v1 +1v1-->2v1 +3v1-->15v1 4v1-->|0|7v1 -14v1-->17v1 +14v1-->16v1 5v1-->6v1 6v1-->|1|7v1 -7v1-->18v1 +7v1-->17v1 8v1-->9v1 9v1-->10v1 9v1-->11v1 -3v1-->19v1 -11v1-->20v1 +3v1-->18v1 +11v1-->19v1 13v1-->14v1 12v1-->13v1 -15v1--x2v1; linkStyle 15 stroke:red -16v1-->|input|8v1 -17v1--o4v1; linkStyle 17 stroke:red -18v1--x|single|8v1; linkStyle 18 stroke:red -19v1-->|input|12v1 -20v1--x|single|12v1; linkStyle 20 stroke:red +15v1-->|input|8v1 +16v1--o4v1; linkStyle 16 stroke:red +17v1--x|single|8v1; linkStyle 17 stroke:red +18v1-->|input|12v1 +19v1--x|single|12v1; linkStyle 19 stroke:red subgraph sg_1v1 ["sg_1v1"] subgraph sg_1v1_var_teed_in ["var teed_in"] 1v1 - end -end -subgraph sg_2v1 ["sg_2v1"] - subgraph sg_2v1_var_teed_in ["var teed_in"] 2v1 3v1 end end -subgraph sg_3v1 ["sg_3v1"] +subgraph sg_2v1 ["sg_2v1"] 4v1 - subgraph sg_3v1_var_persisted_stream ["var persisted_stream"] + subgraph sg_2v1_var_persisted_stream ["var persisted_stream"] 5v1 6v1 end - subgraph sg_3v1_var_unioned_stream ["var unioned_stream"] + subgraph sg_2v1_var_unioned_stream ["var unioned_stream"] 7v1 end end -subgraph sg_4v1 ["sg_4v1"] +subgraph sg_3v1 ["sg_3v1"] 10v1 - subgraph sg_4v1_var_folded_thing ["var folded_thing"] + subgraph sg_3v1_var_folded_thing ["var folded_thing"] 11v1 end - subgraph sg_4v1_var_join ["var join"] + subgraph sg_3v1_var_join ["var join"] 8v1 9v1 end end -subgraph sg_5v1 ["sg_5v1"] - subgraph sg_5v1_var_deferred_stream ["var deferred_stream"] +subgraph sg_4v1 ["sg_4v1"] + subgraph sg_4v1_var_deferred_stream ["var deferred_stream"] 13v1 14v1 end - subgraph sg_5v1_var_joined_folded ["var joined_folded"] + subgraph sg_4v1_var_joined_folded ["var joined_folded"] 12v1 end end diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_dot.snap index ddc5e9ef001b..a7961095ccf6 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_dot.snap @@ -13,15 +13,13 @@ digraph { n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"] n7v1 [label="(n7v1) for_each(|x| println!(\"neg: {:?}\", x))", shape=house, fillcolor="#ffff88"] n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n8v1 + n1v1 -> n2v1 n4v1 -> n1v1 [label="pos"] n5v1 -> n6v1 - n6v1 -> n9v1 + n6v1 -> n8v1 n6v1 -> n7v1 - n8v1 -> n2v1 [color=red] - n9v1 -> n1v1 [label="neg", color=red] + n8v1 -> n1v1 [label="neg", color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -44,6 +42,8 @@ digraph { cluster=true label="var diff" n1v1 + n2v1 + n3v1 } subgraph sg_2v1_var_poss { cluster=true @@ -51,16 +51,4 @@ digraph { n4v1 } } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - subgraph sg_3v1_var_diff { - cluster=true - label="var diff" - n2v1 - n3v1 - } - } } diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_mermaid.snap index fe83832cacf5..e4e3f8bea441 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static@graphvis_mermaid.snap @@ -16,15 +16,13 @@ linkStyle default stroke:#aaa 6v1[/"(6v1) tee()"\]:::pushClass 7v1[/"(7v1) for_each(|x| println!("neg: {:?}", x))"\]:::pushClass 8v1["(8v1) handoff"]:::otherClass -9v1["(9v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->8v1 +1v1-->2v1 4v1-->|pos|1v1 5v1-->6v1 -6v1-->9v1 +6v1-->8v1 6v1-->7v1 -8v1--x2v1; linkStyle 6 stroke:red -9v1--x|neg|1v1; linkStyle 7 stroke:red +8v1--x|neg|1v1; linkStyle 6 stroke:red subgraph sg_1v1 ["sg_1v1"] 7v1 subgraph sg_1v1_var_negs ["var negs"] @@ -35,14 +33,10 @@ end subgraph sg_2v1 ["sg_2v1"] subgraph sg_2v1_var_diff ["var diff"] 1v1 + 2v1 + 3v1 end subgraph sg_2v1_var_poss ["var poss"] 4v1 end end -subgraph sg_3v1 ["sg_3v1"] - subgraph sg_3v1_var_diff ["var diff"] - 2v1 - 3v1 - end -end diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_dot.snap index 262ff123ca67..cff331993f59 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_dot.snap @@ -13,15 +13,13 @@ digraph { n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"] n7v1 [label="(n7v1) for_each(|x| println!(\"neg: {:?}\", x))", shape=house, fillcolor="#ffff88"] n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n8v1 + n1v1 -> n2v1 n4v1 -> n1v1 [label="pos"] n5v1 -> n6v1 - n6v1 -> n9v1 + n6v1 -> n8v1 n6v1 -> n7v1 - n8v1 -> n2v1 [color=red] - n9v1 -> n1v1 [label="neg", color=red] + n8v1 -> n1v1 [label="neg", color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -44,6 +42,8 @@ digraph { cluster=true label="var diff" n1v1 + n2v1 + n3v1 } subgraph sg_2v1_var_poss { cluster=true @@ -51,16 +51,4 @@ digraph { n4v1 } } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - subgraph sg_3v1_var_diff { - cluster=true - label="var diff" - n2v1 - n3v1 - } - } } diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_mermaid.snap index 34718caf167b..22c9c99b798c 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_static_tick@graphvis_mermaid.snap @@ -16,15 +16,13 @@ linkStyle default stroke:#aaa 6v1[/"(6v1) tee()"\]:::pushClass 7v1[/"(7v1) for_each(|x| println!("neg: {:?}", x))"\]:::pushClass 8v1["(8v1) handoff"]:::otherClass -9v1["(9v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->8v1 +1v1-->2v1 4v1-->|pos|1v1 5v1-->6v1 -6v1-->9v1 +6v1-->8v1 6v1-->7v1 -8v1--x2v1; linkStyle 6 stroke:red -9v1--x|neg|1v1; linkStyle 7 stroke:red +8v1--x|neg|1v1; linkStyle 6 stroke:red subgraph sg_1v1 ["sg_1v1"] 7v1 subgraph sg_1v1_var_negs ["var negs"] @@ -35,14 +33,10 @@ end subgraph sg_2v1 ["sg_2v1"] subgraph sg_2v1_var_diff ["var diff"] 1v1 + 2v1 + 3v1 end subgraph sg_2v1_var_poss ["var poss"] 4v1 end end -subgraph sg_3v1 ["sg_3v1"] - subgraph sg_3v1_var_diff ["var diff"] - 2v1 - 3v1 - end -end diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_dot.snap index 4c026e653774..ac13d2e0b990 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_dot.snap @@ -13,15 +13,13 @@ digraph { n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"] n7v1 [label="(n7v1) for_each(|x| println!(\"neg: {:?}\", x))", shape=house, fillcolor="#ffff88"] n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n8v1 + n1v1 -> n2v1 n4v1 -> n1v1 [label="pos"] n5v1 -> n6v1 - n6v1 -> n9v1 + n6v1 -> n8v1 n6v1 -> n7v1 - n8v1 -> n2v1 [color=red] - n9v1 -> n1v1 [label="neg", color=red] + n8v1 -> n1v1 [label="neg", color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -44,6 +42,8 @@ digraph { cluster=true label="var diff" n1v1 + n2v1 + n3v1 } subgraph sg_2v1_var_poss { cluster=true @@ -51,16 +51,4 @@ digraph { n4v1 } } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - subgraph sg_3v1_var_diff { - cluster=true - label="var diff" - n2v1 - n3v1 - } - } } diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_mermaid.snap index 9a6927f5b50d..3172019a1ee0 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_multiset_tick_static@graphvis_mermaid.snap @@ -16,15 +16,13 @@ linkStyle default stroke:#aaa 6v1[/"(6v1) tee()"\]:::pushClass 7v1[/"(7v1) for_each(|x| println!("neg: {:?}", x))"\]:::pushClass 8v1["(8v1) handoff"]:::otherClass -9v1["(9v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->8v1 +1v1-->2v1 4v1-->|pos|1v1 5v1-->6v1 -6v1-->9v1 +6v1-->8v1 6v1-->7v1 -8v1--x2v1; linkStyle 6 stroke:red -9v1--x|neg|1v1; linkStyle 7 stroke:red +8v1--x|neg|1v1; linkStyle 6 stroke:red subgraph sg_1v1 ["sg_1v1"] 7v1 subgraph sg_1v1_var_negs ["var negs"] @@ -35,14 +33,10 @@ end subgraph sg_2v1 ["sg_2v1"] subgraph sg_2v1_var_diff ["var diff"] 1v1 + 2v1 + 3v1 end subgraph sg_2v1_var_poss ["var poss"] 4v1 end end -subgraph sg_3v1 ["sg_3v1"] - subgraph sg_3v1_var_diff ["var diff"] - 2v1 - 3v1 - end -end diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_dot.snap index 61d8cb090e52..02f2550d1d01 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_dot.snap @@ -11,13 +11,11 @@ digraph { n4v1 [label="(n4v1) sort()", shape=invhouse, fillcolor="#88aaff"] n5v1 [label="(n5v1) for_each(|v| output_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n1v1 -> n3v1 [label="pos"] n2v1 -> n6v1 n4v1 -> n5v1 - n3v1 -> n7v1 + n3v1 -> n4v1 n6v1 -> n3v1 [label="neg", color=red] - n7v1 -> n4v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -35,16 +33,6 @@ digraph { cluster=true label="var diff" n3v1 - } - } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - subgraph sg_3v1_var_diff { - cluster=true - label="var diff" n4v1 n5v1 } diff --git a/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_mermaid.snap index 4d0420378a15..6985dfdbaac5 100644 --- a/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_difference__diff_static@graphvis_mermaid.snap @@ -14,13 +14,11 @@ linkStyle default stroke:#aaa 4v1[\"(4v1) sort()"/]:::pullClass 5v1[/"(5v1) for_each(|v| output_send.send(v).unwrap())"\]:::pushClass 6v1["(6v1) handoff"]:::otherClass -7v1["(7v1) handoff"]:::otherClass 1v1-->|pos|3v1 2v1-->6v1 4v1-->5v1 -3v1-->7v1 +3v1-->4v1 6v1--x|neg|3v1; linkStyle 4 stroke:red -7v1--x4v1; linkStyle 5 stroke:red subgraph sg_1v1 ["sg_1v1"] 2v1 end @@ -28,10 +26,6 @@ subgraph sg_2v1 ["sg_2v1"] 1v1 subgraph sg_2v1_var_diff ["var diff"] 3v1 - end -end -subgraph sg_3v1 ["sg_3v1"] - subgraph sg_3v1_var_diff ["var diff"] 4v1 5v1 end diff --git a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap index f82b6587decc..14503bc05e5f 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) sort()", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| print!(\"{:?}, \", v))", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap index 6875ef8d17f4..5dad7825e9ca 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"(2v1) sort()"/]:::pullClass 3v1[/"(3v1) for_each(|v| print!("{:?}, ", v))"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap index 7e7a24ab721f..871be3dcd90c 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_iter(vec![(2, 'y'), (3, 'x'), (1, 'z')])", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) sort_by_key(|(k, _v)| k)", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| println!(\"{:?}\", v))", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap index 11b297fc9687..9fc81dd1e3f6 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_iter(vec![(2, 'y'), (3, 'x'), (1, 'z')])"/]:::pullClass 2v1[\"(2v1) sort_by_key(|(k, _v)| k)"/]:::pullClass 3v1[/"(3v1) for_each(|v| println!("{:?}", v))"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/surface_push_blocking.rs b/dfir_rs/tests/surface_push_blocking.rs index 1524f42e0780..4511f162f6ce 100644 --- a/dfir_rs/tests/surface_push_blocking.rs +++ b/dfir_rs/tests/surface_push_blocking.rs @@ -5,6 +5,35 @@ use dfir_rs::dfir_syntax; use dfir_rs::util::collect_ready; use multiplatform_test::multiplatform_test; +/// sort on push side: source -> tee -> sort -> for_each +#[multiplatform_test] +pub fn test_sort_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::(); + let mut df = dfir_syntax! { + my_tee = source_iter([3, 1, 2]) -> tee(); + my_tee -> sort() -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + assert_eq!(&[1, 2, 3], &*collect_ready::, _>(&mut out_recv)); +} + +/// sort_by_key on push side: source -> tee -> sort_by_key -> for_each +#[multiplatform_test] +pub fn test_sort_by_key_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<(i32, char)>(); + let mut df = dfir_syntax! { + my_tee = source_iter([(2, 'y'), (3, 'x'), (1, 'z')]) -> tee(); + my_tee -> sort_by_key(|(k, _v)| k) -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + assert_eq!( + &[(1, 'z'), (2, 'y'), (3, 'x')], + &*collect_ready::, _>(&mut out_recv) + ); +} + /// reduce on push side: source -> tee -> reduce -> for_each #[multiplatform_test] pub fn test_reduce_push() { diff --git a/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap b/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap index 0b1baaf51144..4529cd637fad 100644 --- a/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap +++ b/hydro_test/src/cluster/snapshots/compute_pi_ir@surface_graph_loc2v1.snap @@ -21,7 +21,6 @@ _16v1 = cross_singleton (); _17v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < ((u64 , u64) , bool) , (u64 , u64) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_1105_51 ! ([] [| (d , _) | d]) })); _18v1 = for_each (stageleft :: runtime_support :: fnmut1_type_hint :: < (u64 , u64) , () > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: compute_pi :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_compute_pi_rs_50_21 ! ([] [| (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; }]) })); _19v1 = handoff(); -_20v1 = handoff(); _1v1 -> _2v1; _2v1 -> _3v1; @@ -32,13 +31,12 @@ _7v1 -> _8v1; _8v1 -> _9v1; _9v1 -> _10v1; _11v1 -> _12v1; -_10v1 -> _19v1; +_10v1 -> [0]_13v1; _12v1 -> [1]_13v1; _13v1 -> _14v1; _14v1 -> _15v1; _4v1 -> [input]_16v1; -_15v1 -> _20v1; +_15v1 -> _19v1; _16v1 -> _17v1; _17v1 -> _18v1; -_19v1 -> [0]_13v1; -_20v1 -> [single]_16v1; +_19v1 -> [single]_16v1; diff --git a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap index 88c3a23a51b5..76a1ccc09628 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap @@ -67,7 +67,7 @@ linkStyle default stroke:#aaa 6v1-->7v1 7v1-->8v1 9v1-->10v1 -8v1--x|0|11v1; linkStyle 7 stroke:red +8v1-->|0|11v1 10v1-->|1|11v1 11v1-->12v1 6v1-->|input|13v1 @@ -88,12 +88,12 @@ linkStyle default stroke:#aaa 52v1-->26v1 26v1-->27v1 28v1-->29v1 -27v1--x|0|30v1; linkStyle 28 stroke:red +27v1-->|0|30v1 29v1-->|1|30v1 21v1-->|input|31v1 12v1--x|single|31v1; linkStyle 31 stroke:red 31v1-->32v1 -34v1--x|0|33v1; linkStyle 33 stroke:red +34v1-->|0|33v1 32v1-->34v1 35v1-->|1|33v1 26v1-->35v1 @@ -116,8 +116,8 @@ linkStyle default stroke:#aaa 50v1-->51v1 51v1-->52v1 2v1 -34v1 35v1 +34v1 16v1 17v1 24v1 diff --git a/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap b/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap index 725934538dc7..336098cccddc 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir@proposer_mermaid.snap @@ -274,18 +274,18 @@ linkStyle default stroke:#aaa 264v1["
(264v1)

map(|(id, data)| {
(
id.into_tagless(),
hydro_lang::runtime_support::bincode::serialize(&data).unwrap().into(),
)
})
"]:::otherClass 265v1["
(265v1)

dest_sink(DUMMY_SINK)
"]:::otherClass 1v1-->2v1 -132v1--x|0|3v1; linkStyle 1 stroke:red +132v1-->|0|3v1 250v1-->|1|3v1 -3v1--x|0|4v1; linkStyle 3 stroke:red +3v1-->|0|4v1 50v1-->|1|4v1 4v1-->5v1 6v1-->7v1 -5v1--x|0|8v1; linkStyle 7 stroke:red +5v1-->|0|8v1 7v1-->|1|8v1 8v1-->9v1 17v1--o10v1; linkStyle 10 stroke:red 11v1-->12v1 -10v1--x|0|13v1; linkStyle 12 stroke:red +10v1-->|0|13v1 12v1-->|1|13v1 9v1-->|input|14v1 13v1--x|single|14v1; linkStyle 15 stroke:red @@ -310,7 +310,7 @@ linkStyle default stroke:#aaa 33v1-->34v1 34v1-->35v1 36v1-->37v1 -35v1--x|0|38v1; linkStyle 37 stroke:red +35v1-->|0|38v1 37v1-->|1|38v1 38v1-->39v1 39v1-->40v1 @@ -340,7 +340,7 @@ linkStyle default stroke:#aaa 62v1-->63v1 63v1-->64v1 65v1-->66v1 -64v1--x|0|67v1; linkStyle 67 stroke:red +64v1-->|0|67v1 66v1-->|1|67v1 67v1-->68v1 69v1-->70v1 @@ -349,7 +349,7 @@ linkStyle default stroke:#aaa 72v1-->73v1 73v1-->74v1 75v1-->76v1 -74v1--x|0|77v1; linkStyle 76 stroke:red +74v1-->|0|77v1 76v1-->|1|77v1 77v1-->78v1 68v1-->|input|79v1 @@ -368,7 +368,7 @@ linkStyle default stroke:#aaa 89v1-->90v1 90v1-->91v1 91v1-->92v1 -51v1--x|0|93v1; linkStyle 95 stroke:red +51v1-->|0|93v1 92v1-->|1|93v1 93v1-->94v1 94v1-->95v1 @@ -402,7 +402,7 @@ linkStyle default stroke:#aaa 117v1-->118v1 118v1-->119v1 120v1-->121v1 -119v1--x|0|122v1; linkStyle 129 stroke:red +119v1-->|0|122v1 121v1-->|1|122v1 122v1-->123v1 9v1-->|input|124v1 @@ -426,7 +426,7 @@ linkStyle default stroke:#aaa 140v1-->141v1 141v1-->142v1 143v1-->144v1 -142v1--x|0|145v1; linkStyle 153 stroke:red +142v1-->|0|145v1 144v1-->|1|145v1 145v1-->146v1 128v1-->|input|147v1 @@ -461,9 +461,9 @@ linkStyle default stroke:#aaa 172v1-->173v1 186v1--o174v1; linkStyle 186 stroke:red 175v1-->176v1 -174v1--x|0|177v1; linkStyle 188 stroke:red +174v1-->|0|177v1 176v1-->|1|177v1 -173v1--x|0|178v1; linkStyle 190 stroke:red +173v1-->|0|178v1 177v1-->|1|178v1 178v1-->179v1 162v1-->|input|180v1 @@ -489,7 +489,7 @@ linkStyle default stroke:#aaa 196v1-->197v1 197v1-->198v1 199v1-->200v1 -198v1--x|0|201v1; linkStyle 216 stroke:red +198v1-->|0|201v1 200v1-->|1|201v1 201v1-->202v1 195v1-->|input|203v1 @@ -504,9 +504,9 @@ linkStyle default stroke:#aaa 208v1-->|input|209v1 25v1--x|single|209v1; linkStyle 229 stroke:red 209v1-->210v1 -204v1--x|0|211v1; linkStyle 231 stroke:red +204v1-->|0|211v1 210v1-->|1|211v1 -194v1--x|0|212v1; linkStyle 233 stroke:red +194v1-->|0|212v1 211v1-->|1|212v1 128v1-->213v1 212v1-->|input|214v1 @@ -521,7 +521,7 @@ linkStyle default stroke:#aaa 221v1-->222v1 222v1-->223v1 223v1-->224v1 -187v1--x|0|225v1; linkStyle 248 stroke:red +187v1-->|0|225v1 224v1-->|1|225v1 225v1-->226v1 226v1-->227v1 @@ -538,7 +538,7 @@ linkStyle default stroke:#aaa 231v1--x|neg|236v1; linkStyle 262 stroke:red 236v1-->237v1 247v1--o238v1; linkStyle 264 stroke:red -238v1--x|0|239v1; linkStyle 265 stroke:red +238v1-->|0|239v1 216v1-->|1|239v1 239v1-->240v1 237v1--o241v1; linkStyle 268 stroke:red diff --git a/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap b/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap index 3ab6c336d93f..0fc34cf3e401 100644 --- a/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/two_pc_ir@coordinator_mermaid.snap @@ -76,7 +76,7 @@ linkStyle default stroke:#aaa 13v1-->14v1 14v1-->15v1 15v1-->16v1 -1v1--x|0|17v1; linkStyle 14 stroke:red +1v1-->|0|17v1 16v1-->|1|17v1 17v1-->18v1 18v1-->19v1 @@ -102,7 +102,7 @@ linkStyle default stroke:#aaa 39v1-->40v1 40v1-->41v1 41v1-->42v1 -29v1--x|0|43v1; linkStyle 40 stroke:red +29v1-->|0|43v1 42v1-->|1|43v1 43v1-->44v1 44v1-->45v1 @@ -117,14 +117,14 @@ linkStyle default stroke:#aaa 53v1-->54v1 55v1-->56v1 48v1-->55v1 +28v1 10v1 11v1 -28v1 -36v1 -37v1 54v1 55v1 56v1 +36v1 +37v1 subgraph var_cycle_2 ["var cycle_2"] style var_cycle_2 fill:transparent 24v1