Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions dfir_lang/src/graph/ops/chain.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::graph::PortIndexValue;

use super::{
DelayType, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1
OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1
};

/// > 2 input streams of the same type, 1 output stream of the same type
Expand Down Expand Up @@ -32,11 +30,6 @@ pub const CHAIN: OperatorConstraints = OperatorConstraints {
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |idx| match idx {
PortIndexValue::Int(idx) if idx.value == 0 => {
Some(DelayType::Stratum)
}
_else => None,
},
input_delaytype_fn: |_| None,
write_fn: super::union::UNION.write_fn,
};
11 changes: 2 additions & 9 deletions dfir_lang/src/graph/ops/chain_first_n.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use quote::quote_spanned;

use crate::graph::{
PortIndexValue,
ops::{OperatorWriteOutput, WriteContextArgs},
};

use super::{DelayType, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1};
use super::{OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1};

/// > 2 input streams of the same type, 1 output stream of the same type
///
Expand Down Expand Up @@ -36,13 +35,7 @@ pub const CHAIN_FIRST_N: OperatorConstraints = OperatorConstraints {
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |idx| match idx {
PortIndexValue::Int(idx) if idx.value == 0 => {
// will no longer be needed once subgraphs are always DAGs (only run once per tick)
Some(DelayType::Stratum)
}
_else => None,
},
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
op_span,
Expand Down
33 changes: 18 additions & 15 deletions dfir_lang/src/graph/ops/sort.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use quote::quote_spanned;

use super::{
DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
WriteContextArgs,
};

Expand All @@ -12,9 +12,7 @@ use super::{
/// -> sort()
/// -> assert_eq([1, 2, 3]);
/// ```
///
/// `sort` is blocking. Only the values collected within a single tick will be sorted and
/// emitted.
/// Within a tick, only the values received within that tick will be sorted and emitted.
Comment thread
MingweiSamuel marked this conversation as resolved.
pub const SORT: OperatorConstraints = OperatorConstraints {
name: "sort",
categories: &[OperatorCategory::Persistence],
Expand All @@ -29,27 +27,32 @@ pub const SORT: OperatorConstraints = OperatorConstraints {
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Stratum),
input_delaytype_fn: |_| None,
write_fn: |&WriteContextArgs {
root,
op_span,
work_fn_async,
ident,
inputs,
outputs,
is_pull,
..
},
_| {
assert!(is_pull);

let input = &inputs[0];
let write_iterator = quote_spanned! {op_span=>
// TODO(mingwei): unnecessary extra handoff into_iter() then collect().
let #ident = {
let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await;
<[_]>::sort_unstable(&mut tmp);
#root::dfir_pipes::pull::iter(tmp)
};
let write_iterator = if is_pull {
let input = &inputs[0];
quote_spanned! {op_span=>
let #ident = {
let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await;
<[_]>::sort_unstable(&mut tmp);
#root::dfir_pipes::pull::iter(tmp)
};
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::Sort::new(#output);
}
};
Ok(OperatorWriteOutput {
write_iterator,
Expand Down
41 changes: 30 additions & 11 deletions dfir_lang/src/graph/ops/sort_by_key.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use quote::quote_spanned;

use super::{
DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
WriteContextArgs,
};

Expand Down Expand Up @@ -29,27 +29,46 @@ pub const SORT_BY_KEY: OperatorConstraints = OperatorConstraints {
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Stratum),
input_delaytype_fn: |_| None,
write_fn: |&WriteContextArgs {
root,
op_span,
work_fn_async,
ident,
inputs,
outputs,
is_pull,
arguments,
..
},
_| {
assert!(is_pull);
let input = &inputs[0];
let write_iterator = quote_spanned! {op_span=>
// TODO(mingwei): unnecessary extra handoff into_iter() then collect().
let #ident = {
let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await;
#root::util::sort_unstable_by_key_hrtb(&mut tmp, #arguments);
#root::dfir_pipes::pull::iter(tmp)
};
let write_iterator = if is_pull {
let input = &inputs[0];
quote_spanned! {op_span=>
let #ident = {
let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await;
#root::util::sort_unstable_by_key_hrtb(&mut tmp, #arguments);
#root::dfir_pipes::pull::iter(tmp)
};
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::fold(
::std::vec::Vec::new(),
|__buf: &mut ::std::vec::Vec<_>, __item| {
__buf.push(__item);
},
#root::dfir_pipes::push::flat_map(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this one just uses regular flat_map, whereas the other uses a specialized pipes implementation. Why the difference, could we use flat_map for both? Is there a performance penalty?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, I think they should perform similarly. Creating an issue

|__buf: ::std::vec::Vec<_>| {
let mut __buf = __buf;
#root::util::sort_unstable_by_key_hrtb(&mut __buf, #arguments);
__buf
},
#output,
),
);
}
};
Ok(OperatorWriteOutput {
write_iterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,26 @@ digraph {
n17v1 [label="(n17v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n18v1 [label="(n18v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n19v1 [label="(n19v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n20v1 [label="(n20v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1
n1v1 -> n15v1
n3v1 -> n16v1
n1v1 -> n2v1
n3v1 -> n15v1
n4v1 -> n7v1 [label="0"]
n14v1 -> n17v1
n14v1 -> n16v1
n5v1 -> n6v1
n6v1 -> n7v1 [label="1"]
n7v1 -> n18v1
n7v1 -> n17v1
n8v1 -> n9v1
n9v1 -> n10v1
n9v1 -> n11v1
n3v1 -> n19v1
n11v1 -> n20v1
n3v1 -> n18v1
n11v1 -> n19v1
n13v1 -> n14v1
n12v1 -> n13v1
n15v1 -> n2v1 [color=red]
n16v1 -> n8v1 [label="input"]
n17v1 -> n4v1 [color=red]
n18v1 -> n8v1 [label="single", color=red]
n19v1 -> n12v1 [label="input"]
n20v1 -> n12v1 [label="single", color=red]
n15v1 -> n8v1 [label="input"]
n16v1 -> n4v1 [color=red]
n17v1 -> n8v1 [label="single", color=red]
n18v1 -> n12v1 [label="input"]
n19v1 -> n12v1 [label="single", color=red]
subgraph sg_1v1 {
cluster=true
fillcolor="#dddddd"
Expand All @@ -55,68 +53,58 @@ digraph {
cluster=true
label="var teed_in"
n1v1
}
}
subgraph sg_2v1 {
cluster=true
fillcolor="#dddddd"
style=filled
label = "sg_2v1"
subgraph sg_2v1_var_teed_in {
cluster=true
label="var teed_in"
n2v1
n3v1
}
}
subgraph sg_3v1 {
subgraph sg_2v1 {
cluster=true
fillcolor="#dddddd"
style=filled
label = "sg_3v1"
label = "sg_2v1"
n4v1
subgraph sg_3v1_var_persisted_stream {
subgraph sg_2v1_var_persisted_stream {
cluster=true
label="var persisted_stream"
n5v1
n6v1
}
subgraph sg_3v1_var_unioned_stream {
subgraph sg_2v1_var_unioned_stream {
cluster=true
label="var unioned_stream"
n7v1
}
}
subgraph sg_4v1 {
subgraph sg_3v1 {
cluster=true
fillcolor="#dddddd"
style=filled
label = "sg_4v1"
label = "sg_3v1"
n10v1
subgraph sg_4v1_var_folded_thing {
subgraph sg_3v1_var_folded_thing {
cluster=true
label="var folded_thing"
n11v1
}
subgraph sg_4v1_var_join {
subgraph sg_3v1_var_join {
cluster=true
label="var join"
n8v1
n9v1
}
}
subgraph sg_5v1 {
subgraph sg_4v1 {
cluster=true
fillcolor="#dddddd"
style=filled
label = "sg_5v1"
subgraph sg_5v1_var_deferred_stream {
label = "sg_4v1"
subgraph sg_4v1_var_deferred_stream {
cluster=true
label="var deferred_stream"
n13v1
n14v1
}
subgraph sg_5v1_var_joined_folded {
subgraph sg_4v1_var_joined_folded {
cluster=true
label="var joined_folded"
n12v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,65 +27,59 @@ linkStyle default stroke:#aaa
17v1["(17v1) <code>handoff</code>"]:::otherClass
18v1["(18v1) <code>handoff</code>"]:::otherClass
19v1["(19v1) <code>handoff</code>"]:::otherClass
20v1["(20v1) <code>handoff</code>"]:::otherClass
2v1-->3v1
1v1-->15v1
3v1-->16v1
1v1-->2v1
3v1-->15v1
4v1-->|0|7v1
14v1-->17v1
14v1-->16v1
5v1-->6v1
6v1-->|1|7v1
7v1-->18v1
7v1-->17v1
8v1-->9v1
9v1-->10v1
9v1-->11v1
3v1-->19v1
11v1-->20v1
3v1-->18v1
11v1-->19v1
13v1-->14v1
12v1-->13v1
15v1--x2v1; linkStyle 15 stroke:red
16v1-->|input|8v1
17v1--o4v1; linkStyle 17 stroke:red
18v1--x|single|8v1; linkStyle 18 stroke:red
19v1-->|input|12v1
20v1--x|single|12v1; linkStyle 20 stroke:red
15v1-->|input|8v1
16v1--o4v1; linkStyle 16 stroke:red
17v1--x|single|8v1; linkStyle 17 stroke:red
18v1-->|input|12v1
19v1--x|single|12v1; linkStyle 19 stroke:red
subgraph sg_1v1 ["sg_1v1"]
subgraph sg_1v1_var_teed_in ["var <tt>teed_in</tt>"]
1v1
end
end
subgraph sg_2v1 ["sg_2v1"]
subgraph sg_2v1_var_teed_in ["var <tt>teed_in</tt>"]
2v1
3v1
end
end
subgraph sg_3v1 ["sg_3v1"]
subgraph sg_2v1 ["sg_2v1"]
4v1
subgraph sg_3v1_var_persisted_stream ["var <tt>persisted_stream</tt>"]
subgraph sg_2v1_var_persisted_stream ["var <tt>persisted_stream</tt>"]
5v1
6v1
end
subgraph sg_3v1_var_unioned_stream ["var <tt>unioned_stream</tt>"]
subgraph sg_2v1_var_unioned_stream ["var <tt>unioned_stream</tt>"]
7v1
end
end
subgraph sg_4v1 ["sg_4v1"]
subgraph sg_3v1 ["sg_3v1"]
10v1
subgraph sg_4v1_var_folded_thing ["var <tt>folded_thing</tt>"]
subgraph sg_3v1_var_folded_thing ["var <tt>folded_thing</tt>"]
11v1
end
subgraph sg_4v1_var_join ["var <tt>join</tt>"]
subgraph sg_3v1_var_join ["var <tt>join</tt>"]
8v1
9v1
end
end
subgraph sg_5v1 ["sg_5v1"]
subgraph sg_5v1_var_deferred_stream ["var <tt>deferred_stream</tt>"]
subgraph sg_4v1 ["sg_4v1"]
subgraph sg_4v1_var_deferred_stream ["var <tt>deferred_stream</tt>"]
13v1
14v1
end
subgraph sg_5v1_var_joined_folded ["var <tt>joined_folded</tt>"]
subgraph sg_4v1_var_joined_folded ["var <tt>joined_folded</tt>"]
12v1
end
end
Loading
Loading