Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 54 additions & 34 deletions hydro_lang/src/compile/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ use crate::compile::builder::StmtId;
use crate::compile::builder::{CycleId, ExternalPortId};
#[cfg(feature = "build")]
use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
#[cfg(feature = "build")]
use crate::handoff_ref::handoff_ref_ident;
use crate::location::dynamic::{ClusterConsistency, LocationId};
use crate::location::{LocationKey, NetworkHint};
#[cfg(feature = "build")]
use crate::singleton_ref::singleton_ref_ident;

pub mod backtrace;
use backtrace::Backtrace;
Expand All @@ -45,8 +45,8 @@ use backtrace::Backtrace;
/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
pub struct ClosureExpr {
pub(crate) expr: DebugExpr,
/// Each entry is `(singleton_node, is_mut)`.
/// The index in the Vec determines the ident name via [`singleton_ref_ident`].
/// Each entry is `(reference_node, is_mut)`.
/// The index in the Vec determines the ident name via [`handoff_ref_ident`].
pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
}

Expand All @@ -58,12 +58,18 @@ impl Clone for ClosureExpr {
.singleton_refs
.iter()
.map(|(node, is_mut)| {
let HydroNode::Singleton { inner, metadata } = node else {
panic!("singleton_refs should only contain HydroNode::Singleton");
let HydroNode::Reference {
inner,
kind,
metadata,
} = node
else {
panic!("singleton_refs should only contain HydroNode::Reference");
};
(
HydroNode::Singleton {
HydroNode::Reference {
inner: SharedNode(Rc::clone(&inner.0)),
kind: *kind,
metadata: metadata.clone(),
},
*is_mut,
Expand Down Expand Up @@ -191,8 +197,8 @@ impl ClosureExpr {
for ((i, (node, is_mut)), ref_ident) in
self.singleton_refs.iter().enumerate().zip(ref_idents)
{
let HydroNode::Singleton { inner, .. } = node else {
panic!("singleton_refs should only contain HydroNode::Singleton");
let HydroNode::Reference { inner, .. } = node else {
panic!("singleton_refs should only contain HydroNode::Reference");
};
let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
let counter = access_counters.entry(ptr).or_insert(0);
Expand All @@ -206,7 +212,7 @@ impl ClosureExpr {
};

// TODO(mingwei): proper spanning?
let local_ident = singleton_ref_ident(i);
let local_ident = handoff_ref_ident(i);
let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
let mut_token = is_mut.then(|| quote!(mut));
Expand Down Expand Up @@ -2347,15 +2353,17 @@ pub enum HydroNode {
metadata: HydroIrMetadata,
},

/// A singleton materialization point. Wraps a SharedNode so that:
/// - The pipe output delivers the single item to one consumer
/// - `#var` references can borrow the value from the singleton slot
/// A reference materialization point. Wraps a SharedNode so that:
/// - The pipe output delivers data to one consumer
/// - `#var` references can borrow the value from the slot
///
/// In DFIR codegen, emits `ident = inner_ident -> singleton()`.
/// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
/// `-> handoff()` depending on `kind`.
///
/// Uses the same `built_tees` dedup pattern as `Tee`.
Singleton {
Reference {
inner: SharedNode,
kind: crate::handoff_ref::HandoffRefKind,
metadata: HydroIrMetadata,
},

Expand Down Expand Up @@ -2645,7 +2653,7 @@ impl HydroNode {
| HydroNode::CycleSource { .. }
| HydroNode::ExternalInput { .. } => {}

HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
*inner = SharedNode(transformed.clone());
} else {
Expand Down Expand Up @@ -2811,7 +2819,10 @@ impl HydroNode {
cycle_id: *cycle_id,
metadata: metadata.clone(),
},
HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
HydroNode::Tee { inner, metadata }
| HydroNode::Reference {
inner, metadata, ..
} => {
let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
SharedNode(transformed.clone())
} else {
Expand All @@ -2821,9 +2832,10 @@ impl HydroNode {
*new_rc.borrow_mut() = cloned;
SharedNode(new_rc)
};
if matches!(self, HydroNode::Singleton { .. }) {
HydroNode::Singleton {
if let HydroNode::Reference { kind, .. } = self {
HydroNode::Reference {
inner: cloned_inner,
kind: *kind,
metadata: metadata.clone(),
}
} else {
Expand Down Expand Up @@ -3595,28 +3607,36 @@ impl HydroNode {
ident_stack.push(ret_ident);
}

HydroNode::Singleton { inner, .. } => {
HydroNode::Reference { inner, kind, .. } => {
let ret_ident = if let Some(built_idents) =
built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
{
built_idents[0].clone()
} else {
let inner_ident = ident_stack.pop().unwrap();

let singleton_ident =
let ref_ident =
syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());

built_tees.insert(
inner.0.as_ref() as *const RefCell<HydroNode>,
vec![singleton_ident.clone()],
vec![ref_ident.clone()],
);

match builders_or_callback {
BuildersOrCallback::Builders(graph_builders) => {
let builder = graph_builders.get_dfir_mut(&out_location);
let op_ident = syn::Ident::new(
match kind {
crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
crate::handoff_ref::HandoffRefKind::Optional => "optional",
crate::handoff_ref::HandoffRefKind::Vec => "handoff",
},
Span::call_site(),
);
builder.add_dfir(
parse_quote! {
#singleton_ident = #inner_ident -> singleton();
#ref_ident = #inner_ident -> #op_ident();
},
None,
Some(&next_stmt_id.to_string()),
Expand All @@ -3627,11 +3647,11 @@ impl HydroNode {
}
}

singleton_ident
ref_ident
};

// we consume a stmt id regardless of if we emit the singleton() operator,
// so that during rewrites we touch all recipients of the singleton()
// we consume a stmt id regardless of if we emit the operator,
// so that during rewrites we touch all recipients
let _ = next_stmt_id.get_and_increment();
ident_stack.push(ret_ident);
}
Expand Down Expand Up @@ -4918,7 +4938,7 @@ impl HydroNode {
}
HydroNode::CycleSource { .. }
| HydroNode::Tee { .. }
| HydroNode::Singleton { .. }
| HydroNode::Reference { .. }
| HydroNode::YieldConcat { .. }
| HydroNode::BeginAtomic { .. }
| HydroNode::EndAtomic { .. }
Expand Down Expand Up @@ -4998,7 +5018,7 @@ impl HydroNode {
| HydroNode::SingletonSource { metadata, .. }
| HydroNode::CycleSource { metadata, .. }
| HydroNode::Tee { metadata, .. }
| HydroNode::Singleton { metadata, .. }
| HydroNode::Reference { metadata, .. }
| HydroNode::Partition { metadata, .. }
| HydroNode::YieldConcat { metadata, .. }
| HydroNode::BeginAtomic { metadata, .. }
Expand Down Expand Up @@ -5056,7 +5076,7 @@ impl HydroNode {
| HydroNode::SingletonSource { metadata, .. }
| HydroNode::CycleSource { metadata, .. }
| HydroNode::Tee { metadata, .. }
| HydroNode::Singleton { metadata, .. }
| HydroNode::Reference { metadata, .. }
| HydroNode::Partition { metadata, .. }
| HydroNode::YieldConcat { metadata, .. }
| HydroNode::BeginAtomic { metadata, .. }
Expand Down Expand Up @@ -5107,7 +5127,7 @@ impl HydroNode {
| HydroNode::ExternalInput { .. }
| HydroNode::CycleSource { .. }
| HydroNode::Tee { .. }
| HydroNode::Singleton { .. }
| HydroNode::Reference { .. }
| HydroNode::Partition { .. } => {
// Tee/Partition should find their input in separate special ways
vec![]
Expand Down Expand Up @@ -5186,9 +5206,9 @@ impl HydroNode {
HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
Rc::strong_count(&inner.0) > 1
}
// A zero-output singleton() is valid in DFIR (it drains itself at
// A zero-output reference node is valid in DFIR (it drains itself at
// end of tick), so it doesn't need to be driven by another consumer.
HydroNode::Singleton { .. } => false,
HydroNode::Reference { .. } => false,
_ => false,
}
}
Expand All @@ -5215,8 +5235,8 @@ impl HydroNode {
HydroNode::Tee { inner, .. } => {
format!("Tee({})", inner.0.borrow().print_root())
}
HydroNode::Singleton { inner, .. } => {
format!("Singleton({})", inner.0.borrow().print_root())
HydroNode::Reference { inner, kind, .. } => {
format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
}
HydroNode::Partition { f, is_true, .. } => {
format!("Partition({:?}, is_true={})", f, is_true)
Expand Down
Loading
Loading