From f713343f8fdf4a6cacb4f11f20c6464e4f5e1513 Mon Sep 17 00:00:00 2001 From: Joe Hellerstein Date: Fri, 5 Jun 2026 06:26:51 +0000 Subject: [PATCH] feat(hydro_lang): add Stream::drop_consistency() and FlowBuilder::cluster_with_consistency() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Stream::drop_consistency(): safely forgets a consistency guarantee (e.g. EventualConsistency → NoConsistency). Purely type-level, no IR node inserted. Useful when replication strategies need to return streams on NoConsistency clusters after using broadcast_closed internally. - FlowBuilder::cluster_with_consistency(): creates a cluster with an explicit consistency guarantee (e.g. EventualConsistency) rather than the default NoConsistency. --- hydro_lang/src/compile/builder.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hydro_lang/src/compile/builder.rs b/hydro_lang/src/compile/builder.rs index 501531f39bfe..721dc6bc6522 100644 --- a/hydro_lang/src/compile/builder.rs +++ b/hydro_lang/src/compile/builder.rs @@ -228,6 +228,14 @@ impl<'a> FlowBuilder<'a> { } } + /// Create a cluster with a specific consistency guarantee. + pub fn cluster_with_consistency( + &mut self, + ) -> Cluster<'a, C, Con> { + use crate::location::Location; + Cluster::from_drop_consistency(self.cluster::()) + } + pub fn external(&mut self) -> External<'a, E> { let key = self.locations.insert(LocationType::External); self.location_names.insert(key, type_name::().to_owned());