Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions hydro_lang/src/compile/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2475,6 +2475,10 @@ pub enum HydroNode {
init: ClosureExpr,
acc: ClosureExpr,
input: Box<HydroNode>,
/// Whether the accumulator was proven commutative.
is_commutative: bool,
/// Whether the accumulator was proven idempotent.
is_idempotent: bool,
metadata: HydroIrMetadata,
},

Expand All @@ -2494,23 +2498,39 @@ pub enum HydroNode {
init: ClosureExpr,
acc: ClosureExpr,
input: Box<HydroNode>,
/// Whether the accumulator was proven commutative.
is_commutative: bool,
/// Whether the accumulator was proven idempotent.
is_idempotent: bool,
metadata: HydroIrMetadata,
},

Reduce {
f: ClosureExpr,
input: Box<HydroNode>,
/// Whether the reducer was proven commutative.
is_commutative: bool,
Comment thread
jhellerstein marked this conversation as resolved.
/// Whether the reducer was proven idempotent.
is_idempotent: bool,
metadata: HydroIrMetadata,
Comment thread
jhellerstein marked this conversation as resolved.
},
ReduceKeyed {
f: ClosureExpr,
input: Box<HydroNode>,
/// Whether the reducer was proven commutative.
is_commutative: bool,
/// Whether the reducer was proven idempotent.
is_idempotent: bool,
metadata: HydroIrMetadata,
},
ReduceKeyedWatermark {
f: ClosureExpr,
input: Box<HydroNode>,
watermark: Box<HydroNode>,
/// Whether the reducer was proven commutative.
is_commutative: bool,
/// Whether the reducer was proven idempotent.
is_idempotent: bool,
metadata: HydroIrMetadata,
},

Expand Down Expand Up @@ -2982,11 +3002,15 @@ impl HydroNode {
init,
acc,
input,
is_commutative,
is_idempotent,
metadata,
} => HydroNode::Fold {
init: init.deep_clone(seen_tees),
acc: acc.deep_clone(seen_tees),
input: Box::new(input.deep_clone(seen_tees)),
is_commutative: *is_commutative,
is_idempotent: *is_idempotent,
metadata: metadata.clone(),
},
HydroNode::Scan {
Expand Down Expand Up @@ -3015,32 +3039,56 @@ impl HydroNode {
init,
acc,
input,
is_commutative,
is_idempotent,
metadata,
} => HydroNode::FoldKeyed {
init: init.deep_clone(seen_tees),
acc: acc.deep_clone(seen_tees),
input: Box::new(input.deep_clone(seen_tees)),
is_commutative: *is_commutative,
is_idempotent: *is_idempotent,
metadata: metadata.clone(),
},
HydroNode::ReduceKeyedWatermark {
f,
input,
watermark,
is_commutative,
is_idempotent,
metadata,
} => HydroNode::ReduceKeyedWatermark {
f: f.deep_clone(seen_tees),
input: Box::new(input.deep_clone(seen_tees)),
watermark: Box::new(watermark.deep_clone(seen_tees)),
is_commutative: *is_commutative,
is_idempotent: *is_idempotent,
metadata: metadata.clone(),
},
HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
HydroNode::Reduce {
f,
input,
is_commutative,
is_idempotent,
metadata,
} => HydroNode::Reduce {
f: f.deep_clone(seen_tees),
input: Box::new(input.deep_clone(seen_tees)),
is_commutative: *is_commutative,
is_idempotent: *is_idempotent,
metadata: metadata.clone(),
},
HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
HydroNode::ReduceKeyed {
f,
input,
is_commutative,
is_idempotent,
metadata,
} => HydroNode::ReduceKeyed {
f: f.deep_clone(seen_tees),
input: Box::new(input.deep_clone(seen_tees)),
is_commutative: *is_commutative,
is_idempotent: *is_idempotent,
metadata: metadata.clone(),
},
HydroNode::Network {
Expand Down
18 changes: 12 additions & 6 deletions hydro_lang/src/live_collections/keyed_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1986,8 +1986,8 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
) -> KeyedSingleton<K, A, L, B2>
where
K: Eq + Hash,
C: ValidCommutativityFor<O>,
Idemp: ValidIdempotenceFor<R>,
C: ValidCommutativityFor<O> + crate::properties::IsProved,
Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
B: ApplyMonotoneKeyedStream<M, B2>,
{
let init = init.splice_fn0_ctx(&self.location).into();
Expand All @@ -2003,6 +2003,8 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
init,
acc: comb.into(),
input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
is_commutative: C::IS_PROVED,
is_idempotent: Idemp::IS_PROVED,
metadata: retried
.location
.new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
Expand Down Expand Up @@ -2051,8 +2053,8 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
) -> KeyedSingleton<K, V, L, B>
where
K: Eq + Hash,
C: ValidCommutativityFor<O>,
Idemp: ValidIdempotenceFor<R>,
C: ValidCommutativityFor<O> + crate::properties::IsProved,
Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
{
let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
proof.register_proof(&f);
Expand All @@ -2066,6 +2068,8 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
HydroNode::ReduceKeyed {
f: f.into(),
input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
is_commutative: C::IS_PROVED,
is_idempotent: Idemp::IS_PROVED,
metadata: ordered
.location
.new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
Expand Down Expand Up @@ -2111,8 +2115,8 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
K: Eq + Hash,
O2: Clone,
F: Fn(&mut V, V) + 'a,
C: ValidCommutativityFor<O>,
Idemp: ValidIdempotenceFor<R>,
C: ValidCommutativityFor<O> + crate::properties::IsProved,
Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
{
let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
check_matching_location(&self.location.root(), other.location.outer());
Expand All @@ -2129,6 +2133,8 @@ impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
f: f.into(),
input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
is_commutative: C::IS_PROVED,
is_idempotent: Idemp::IS_PROVED,
metadata: ordered
.location
.new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
Expand Down
12 changes: 8 additions & 4 deletions hydro_lang/src/live_collections/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,8 @@ where
where
I: Fn() -> A + 'a,
F: 'a + Fn(&mut A, T),
C: ValidCommutativityFor<O>,
Idemp: ValidIdempotenceFor<R>,
C: ValidCommutativityFor<O> + crate::properties::IsProved,
Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
B: ApplyMonotoneStream<M, B2>,
{
let init = init.splice_fn0_ctx(&self.location).into();
Expand All @@ -1357,6 +1357,8 @@ where
init,
acc: comb.into(),
input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
is_commutative: C::IS_PROVED,
is_idempotent: Idemp::IS_PROVED,
metadata: retried
.location
.new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
Expand Down Expand Up @@ -1398,8 +1400,8 @@ where
) -> Optional<T, L, B>
where
F: Fn(&mut T, T) + 'a,
C: ValidCommutativityFor<O>,
Idemp: ValidIdempotenceFor<R>,
C: ValidCommutativityFor<O> + crate::properties::IsProved,
Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
{
let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
proof.register_proof(&f);
Expand All @@ -1411,6 +1413,8 @@ where
let core = HydroNode::Reduce {
f: f.into(),
input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
is_commutative: C::IS_PROVED,
is_idempotent: Idemp::IS_PROVED,
metadata: ordered_etc
.location
.new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
Expand Down
12 changes: 12 additions & 0 deletions hydro_lang/src/properties/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ pub enum NotProved {}
/// Marks that the property is proven.
pub enum Proved {}

/// Query whether a proof marker is `Proved`.
pub trait IsProved {
/// Whether the property has been proven.
const IS_PROVED: bool;
}
impl IsProved for NotProved {
const IS_PROVED: bool = false;
}
impl IsProved for Proved {
const IS_PROVED: bool = true;
}
Comment thread
jhellerstein marked this conversation as resolved.

/// Algebraic properties for an aggregation function of type (T, &mut A) -> ().
///
/// Commutativity:
Expand Down
9 changes: 7 additions & 2 deletions hydro_lang/src/viz/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,8 +1281,12 @@ impl HydroNode {
),

// Single-expression Aggregation operations - grouped by node type
HydroNode::Reduce { f, input, metadata }
| HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
HydroNode::Reduce {
f, input, metadata, ..
}
| HydroNode::ReduceKeyed {
f, input, metadata, ..
} => build_single_expr_transform(
TransformParams {
structure,
seen_tees,
Expand Down Expand Up @@ -1443,6 +1447,7 @@ impl HydroNode {
input,
watermark,
metadata,
..
} => {
let input_id = input.build_graph_structure(structure, seen_tees, config);
let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ note: required by a bound in `hydro_lang::prelude::Stream::<T, L, B, O, R>::fold
| pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
| ---- required by a bound in this associated function
...
| C: ValidCommutativityFor<O>,
| C: ValidCommutativityFor<O> + crate::properties::IsProved,
| ^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Stream::<T, L, B, O, R>::fold`
6 changes: 6 additions & 0 deletions hydro_test/src/cluster/snapshots/compute_pi_ir.snap
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ expression: built.ir()
},
},
},
is_commutative: false,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Tick(0, Cluster(loc1v1)),
collection_kind: Singleton {
Expand Down Expand Up @@ -188,6 +190,8 @@ expression: built.ir()
},
},
},
is_commutative: true,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Process(loc2v1),
collection_kind: Optional {
Expand Down Expand Up @@ -266,6 +270,8 @@ expression: built.ir()
},
},
},
is_commutative: false,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Tick(1, Process(loc2v1)),
collection_kind: Optional {
Expand Down
2 changes: 2 additions & 0 deletions hydro_test/src/cluster/snapshots/many_to_many_ir.snap
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ expression: built.ir()
},
},
},
is_commutative: false,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Cluster(loc1v1),
collection_kind: KeyedSingleton {
Expand Down
8 changes: 8 additions & 0 deletions hydro_test/src/cluster/snapshots/map_reduce_ir.snap
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ expression: built.ir()
},
},
},
is_commutative: false,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Process(loc1v1),
collection_kind: KeyedSingleton {
Expand Down Expand Up @@ -230,6 +232,8 @@ expression: built.ir()
},
},
},
is_commutative: false,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Tick(0, Process(loc1v1)),
collection_kind: Singleton {
Expand Down Expand Up @@ -329,6 +333,8 @@ expression: built.ir()
},
},
},
is_commutative: false,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Tick(1, Cluster(loc2v1)),
collection_kind: KeyedSingleton {
Expand Down Expand Up @@ -443,6 +449,8 @@ expression: built.ir()
},
},
},
is_commutative: true,
is_idempotent: false,
metadata: HydroIrMetadata {
location_id: Process(loc1v1),
collection_kind: KeyedSingleton {
Expand Down
Loading
Loading