Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hydro_lang/src/compile/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,16 @@ impl HydroRoot {
fold_hooked_idents,
);

let input_ident = maybe_observe_for_mut(
f,
input_ident,
&input.metadata().location_id,
&input.metadata().collection_kind,
&input.metadata().op,
builders_or_callback,
next_stmt_id,
);

let stmt_id = next_stmt_id.get_and_increment();

match builders_or_callback {
Expand Down
35 changes: 23 additions & 12 deletions hydro_lang/src/live_collections/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,20 +1294,31 @@ where

/// Executes the provided closure for every element in this stream.
///
/// Because the closure may have side effects, the stream must have deterministic order
/// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
/// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
/// [`Stream::assume_retries`] with an explanation for why this is the case.
/// If the stream is unordered or has retries, the closure must demonstrate commutativity
/// and/or idempotence via annotations:
/// ```rust,ignore
/// stream.for_each(q!(
/// |x| *flag_mut |= x,
/// commutative = manual_proof!(/** boolean OR is commutative */),
/// idempotent = manual_proof!(/** boolean OR is idempotent */)
/// ));
/// ```
///
/// The closure may capture singletons via `by_ref()` or `by_mut()`. No commutativity
/// or idempotence proofs are needed because the `TotalOrder + ExactlyOnce` requirements
/// already guarantee deterministic execution.
pub fn for_each<F: FnMut(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
where
O: IsOrdered,
R: IsExactlyOnce,
/// On a `TotalOrder + ExactlyOnce` stream, no annotations are needed.
///
/// The closure may capture singletons via `by_ref()` or `by_mut()`.
pub fn for_each<F: FnMut(T) + 'a, C, I>(
self,
f: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, I>>,
) where
C: ValidCommutativityFor<O>,
I: ValidIdempotenceFor<R>,
{
let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into());
let f = crate::handoff_ref::with_ref_capture(|| {
let (f, proof) = f.splice_fnmut1_ctx_props(&self.location);
proof.register_proof(&f);
f.into()
});
self.location
.flow_state()
.borrow_mut()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
error[E0277]: Because the input stream has ordering `hydro_lang::live_collections::stream::NoOrder`, the closure must demonstrate commutativity with a `commutative = ...` annotation.
--> tests/compile-fail/nightly/non_commutative_for_each.rs:11:10
|
11 | .for_each(q!(|x| println!("{}", x)));
| ^^^^^^^^ required for this call
|
= note: To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary.
help: the trait `ValidCommutativityFor<hydro_lang::live_collections::stream::NoOrder>` is not implemented for `NotProved`
but trait `ValidCommutativityFor<hydro_lang::live_collections::stream::TotalOrder>` is implemented for it
--> src/properties/mod.rs
|
| impl ValidCommutativityFor<TotalOrder> for NotProved {}
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
= help: for that trait implementation, expected `hydro_lang::live_collections::stream::TotalOrder`, found `hydro_lang::live_collections::stream::NoOrder`
note: required by a bound in `hydro_lang::prelude::Stream::<T, L, B, O, R>::for_each`
--> src/live_collections/stream/mod.rs
|
| pub fn for_each<F: FnMut(T) + 'a, C, I>(
| -------- required by a bound in this associated function
...
| C: ValidCommutativityFor<O>,
| ^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Stream::<T, L, B, O, R>::for_each`
14 changes: 14 additions & 0 deletions hydro_lang/tests/compile-fail/non_commutative_for_each.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#![allow(unexpected_cfgs)]

use hydro_lang::live_collections::stream::NoOrder;
use hydro_lang::prelude::*;

struct P1 {}

fn test<'a>(p1: &Process<'a, P1>) {
p1.source_iter(q!(0..10))
.weaken_ordering::<NoOrder>()
.for_each(q!(|x| println!("{}", x)));
}

fn main() {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
error[E0277]: Because the input stream has ordering `hydro_lang::live_collections::stream::NoOrder`, the closure must demonstrate commutativity with a `commutative = ...` annotation.
--> tests/compile-fail/stable/non_commutative_for_each.rs:11:10
|
11 | .for_each(q!(|x| println!("{}", x)));
| ^^^^^^^^ required for this call
|
= note: To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary.
help: the trait `ValidCommutativityFor<hydro_lang::live_collections::stream::NoOrder>` is not implemented for `NotProved`
but trait `ValidCommutativityFor<hydro_lang::live_collections::stream::TotalOrder>` is implemented for it
--> src/properties/mod.rs
|
| impl ValidCommutativityFor<TotalOrder> for NotProved {}
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
= help: for that trait implementation, expected `hydro_lang::live_collections::stream::TotalOrder`, found `hydro_lang::live_collections::stream::NoOrder`
note: required by a bound in `hydro_lang::prelude::Stream::<T, L, B, O, R>::for_each`
--> src/live_collections/stream/mod.rs
|
| pub fn for_each<F: FnMut(T) + 'a, C, I>(
| -------- required by a bound in this associated function
...
| C: ValidCommutativityFor<O>,
| ^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Stream::<T, L, B, O, R>::for_each`
8 changes: 4 additions & 4 deletions hydro_test/src/cluster/snapshots/compute_pi_ir.snap
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ expression: built.ir()
input: Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_906_20 ! ([] [| _ | ()]) }),
input: Reduce {
f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }),
f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }),
input: FlatMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }),
f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }),
input: Scan {
init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }),
init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }),
input: Batch {
inner: Source {
source: Stream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ _2v1 = map (| res | { let (id , b) = res . unwrap () ; (hydro_lang :: __staged :
_3v1 = map (stageleft :: runtime_support :: fnmut1_type_hint :: < (hydro_test :: __staged :: __deps :: hydro_lang :: location :: member_id :: MemberId < hydro_test :: __staged :: cluster :: compute_pi :: Worker > , (u64 , u64)) , (u64 , u64) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_keyed_stream_mod_rs_747_30 ! ([] [| (_ , v) | v]) }));
_4v1 = reduce :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , (u64 , u64) , () > ({ use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: compute_pi :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_compute_pi_rs_37_12 ! ([] [| (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; }]) }));
_5v1 = source_stream ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: location :: * ; hydro_lang :: __stageleft_quote_src_location_mod_rs_1415_30 ! ([interval__free = { use crate :: __staged :: __deps :: * ; use crate :: __staged :: cluster :: compute_pi :: * ; # [allow (unused_imports)] use crate :: * ; __stageleft_quote_src_cluster_compute_pi_rs_46_15 ! ([] [Duration :: from_secs (1)]) } ,] [tokio_stream :: StreamExt :: map (tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval__free)) , | _ | ())]) });
_6v1 = scan :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1915_27 ! ([] [| | None]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1918_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1588_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }));
_7v1 = flat_map (stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1955_27 ! ([] [| d | d]) }));
_8v1 = reduce :: < 'tick > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1589_23 ! ([] [| _ , _ | { }]) }));
_6v1 = scan :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1926_27 ! ([] [| | None]) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < core :: option :: Option < () > > , () , core :: option :: Option < core :: option :: Option < () > > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1929_24 ! ([f__free = stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , hydro_test :: __staged :: __deps :: hydro_lang :: live_collections :: keyed_stream :: Generate < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_37 ! ([] [| _ , item | Generate :: Return (item)]) }) , init__free = stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1599_26 ! ([] [| | ()]) }) ,] [move | state : & mut Option < Option < _ > > , v | { if state . is_none () { * state = Some (Some (init__free ())) ; } match state { Some (Some (state_value)) => match f__free (state_value , v) { Generate :: Yield (out) => Some (Some (out)) , Generate :: Return (out) => { * state = Some (None) ; Some (Some (out)) } Generate :: Break => None , Generate :: Continue => Some (None) , } , _ => None , } }]) }));
_7v1 = flat_map (stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < () > , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1966_27 ! ([] [| d | d]) }));
_8v1 = reduce :: < 'tick > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: stream :: * ; hydro_lang :: __stageleft_quote_src_live_collections_stream_mod_rs_1600_23 ! ([] [| _ , _ | { }]) }));
_9v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_906_20 ! ([] [| _ | ()]) }));
_10v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < () > > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: optional :: * ; hydro_lang :: __stageleft_quote_src_live_collections_optional_rs_879_20 ! ([] [| v | Some (v)]) }));
_11v1 = source_iter ([:: std :: option :: Option :: None]);
Expand Down
Loading
Loading