hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45 /// The [`StreamOrder`] corresponding to this type.
46 const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82 /// The weaker of the two orderings.
83 type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88 type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93 type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99 MinRetries<Self, Min = Self>
100 + MinRetries<ExactlyOnce, Min = Self>
101 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103 /// The [`StreamRetry`] corresponding to this type.
104 const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136 /// The weaker of the two retry guarantees.
137 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142 type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147 type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153 label = "required here",
154 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166 label = "required here",
167 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192/// (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196 Type,
197 Loc,
198 Bound: Boundedness = Unbounded,
199 Order: Ordering = TotalOrder,
200 Retry: Retries = ExactlyOnce,
201> {
202 pub(crate) location: Loc,
203 pub(crate) ir_node: RefCell<HydroNode>,
204 pub(crate) flow_state: FlowState,
205
206 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210 fn drop(&mut self) {
211 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214 input: Box::new(ir_node),
215 op_metadata: HydroIrOpMetadata::new(),
216 });
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222 for Stream<T, L, Unbounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227 let new_meta = stream
228 .location
229 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231 Stream {
232 location: stream.location.clone(),
233 flow_state: stream.flow_state.clone(),
234 ir_node: RefCell::new(HydroNode::Cast {
235 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236 metadata: new_meta,
237 }),
238 _phantom: PhantomData,
239 }
240 }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244 for Stream<T, L, B, NoOrder, R>
245where
246 L: Location<'a>,
247{
248 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249 stream.weaken_ordering()
250 }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254 for Stream<T, L, B, O, AtLeastOnce>
255where
256 L: Location<'a>,
257{
258 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259 stream.weaken_retries()
260 }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265 L: Location<'a>,
266{
267 fn defer_tick(self) -> Self {
268 Stream::defer_tick(self)
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280 Stream::new(
281 location.clone(),
282 HydroNode::CycleSource {
283 cycle_id,
284 metadata: location.new_node_metadata(Self::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291 for Stream<T, Tick<L>, Bounded, O, R>
292where
293 L: Location<'a>,
294{
295 type Location = Tick<L>;
296
297 fn location(&self) -> &Self::Location {
298 self.location()
299 }
300
301 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
302 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
303 location.clone(),
304 HydroNode::DeferTick {
305 input: Box::new(HydroNode::CycleSource {
306 cycle_id,
307 metadata: location.new_node_metadata(Self::collection_kind()),
308 }),
309 metadata: location.new_node_metadata(Self::collection_kind()),
310 },
311 );
312
313 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
314 }
315}
316
317impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
318 for Stream<T, Tick<L>, Bounded, O, R>
319where
320 L: Location<'a>,
321{
322 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
323 assert_eq!(
324 Location::id(&self.location),
325 expected_location,
326 "locations do not match"
327 );
328 self.location
329 .flow_state()
330 .borrow_mut()
331 .push_root(HydroRoot::CycleSink {
332 cycle_id,
333 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334 op_metadata: HydroIrOpMetadata::new(),
335 });
336 }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
340 for Stream<T, L, B, O, R>
341where
342 L: Location<'a>,
343{
344 type Location = L;
345
346 fn create_source(cycle_id: CycleId, location: L) -> Self {
347 Stream::new(
348 location.clone(),
349 HydroNode::CycleSource {
350 cycle_id,
351 metadata: location.new_node_metadata(Self::collection_kind()),
352 },
353 )
354 }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
358 for Stream<T, L, B, O, R>
359where
360 L: Location<'a>,
361{
362 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
363 assert_eq!(
364 Location::id(&self.location),
365 expected_location,
366 "locations do not match"
367 );
368 self.location
369 .flow_state()
370 .borrow_mut()
371 .push_root(HydroRoot::CycleSink {
372 cycle_id,
373 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
374 op_metadata: HydroIrOpMetadata::new(),
375 });
376 }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
380where
381 T: Clone,
382 L: Location<'a>,
383{
384 fn clone(&self) -> Self {
385 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
386 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
387 *self.ir_node.borrow_mut() = HydroNode::Tee {
388 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
389 metadata: self.location.new_node_metadata(Self::collection_kind()),
390 };
391 }
392
393 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
394 unreachable!()
395 };
396 Stream {
397 location: self.location.clone(),
398 flow_state: self.flow_state.clone(),
399 ir_node: HydroNode::Tee {
400 inner: SharedNode(inner.0.clone()),
401 metadata: metadata.clone(),
402 }
403 .into(),
404 _phantom: PhantomData,
405 }
406 }
407}
408
409impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
410where
411 L: Location<'a>,
412{
413 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
414 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
415 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
416
417 let flow_state = location.flow_state().clone();
418 Stream {
419 location,
420 flow_state,
421 ir_node: RefCell::new(ir_node),
422 _phantom: PhantomData,
423 }
424 }
425
426 /// Returns the [`Location`] where this stream is being materialized.
427 pub fn location(&self) -> &L {
428 &self.location
429 }
430
431 /// Weakens the consistency of this live collection to not guarantee any consistency across
432 /// cluster members (if this collection is on a cluster).
433 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
434 where
435 L: Location<'a>,
436 {
437 if L::consistency()
438 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
439 {
440 // already no consistency
441 Stream::new(
442 self.location.drop_consistency(),
443 self.ir_node.replace(HydroNode::Placeholder),
444 )
445 } else {
446 Stream::new(
447 self.location.drop_consistency(),
448 HydroNode::Cast {
449 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
450 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
451 T,
452 L::DropConsistency,
453 B,
454 O,
455 R,
456 >::collection_kind(
457 )),
458 },
459 )
460 }
461 }
462
463 /// Casts this live collection to have the consistency guarantees specified in the given
464 /// location type parameter. The developer must ensure that the strengthened consistency
465 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
466 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
467 self,
468 _proof: impl crate::properties::ConsistencyProof,
469 ) -> Stream<T, L2, B, O, R>
470 where
471 L: Location<'a>,
472 {
473 if L::consistency() == L2::consistency() {
474 Stream::new(
475 self.location.with_consistency_of(),
476 self.ir_node.replace(HydroNode::Placeholder),
477 )
478 } else {
479 Stream::new(
480 self.location.with_consistency_of(),
481 HydroNode::AssertIsConsistent {
482 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
483 trusted: false,
484 metadata: self
485 .location
486 .clone()
487 .with_consistency_of::<L2>()
488 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
489 },
490 )
491 }
492 }
493
494 pub(crate) fn assert_has_consistency_of_trusted<
495 L2: Location<'a, DropConsistency = L::DropConsistency>,
496 >(
497 self,
498 _proof: impl crate::properties::ConsistencyProof,
499 ) -> Stream<T, L2, B, O, R>
500 where
501 L: Location<'a>,
502 {
503 if L::consistency() == L2::consistency() {
504 Stream::new(
505 self.location.with_consistency_of(),
506 self.ir_node.replace(HydroNode::Placeholder),
507 )
508 } else {
509 Stream::new(
510 self.location.with_consistency_of(),
511 HydroNode::AssertIsConsistent {
512 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
513 trusted: true,
514 metadata: self
515 .location
516 .clone()
517 .with_consistency_of::<L2>()
518 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
519 },
520 )
521 }
522 }
523
524 pub(crate) fn collection_kind() -> CollectionKind {
525 CollectionKind::Stream {
526 bound: B::BOUND_KIND,
527 order: O::ORDERING_KIND,
528 retry: R::RETRIES_KIND,
529 element_type: quote_type::<T>().into(),
530 }
531 }
532
533 /// Produces a stream based on invoking `f` on each element.
534 /// If you do not want to modify the stream and instead only want to view
535 /// each item use [`Stream::inspect`] instead.
536 ///
537 /// # Example
538 /// ```rust
539 /// # #[cfg(feature = "deploy")] {
540 /// # use hydro_lang::prelude::*;
541 /// # use futures::StreamExt;
542 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
543 /// let words = process.source_iter(q!(vec!["hello", "world"]));
544 /// words.map(q!(|x| x.to_uppercase()))
545 /// # }, |mut stream| async move {
546 /// # for w in vec!["HELLO", "WORLD"] {
547 /// # assert_eq!(stream.next().await.unwrap(), w);
548 /// # }
549 /// # }));
550 /// # }
551 /// ```
552 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
553 where
554 F: Fn(T) -> U + 'a,
555 {
556 let f = crate::singleton_ref::with_singleton_capture(|| {
557 f.splice_fn1_ctx(&self.location).into()
558 });
559 Stream::new(
560 self.location.clone(),
561 HydroNode::Map {
562 f,
563 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
564 metadata: self
565 .location
566 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
567 },
568 )
569 }
570
571 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
572 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
573 /// for the output type `U` must produce items in a **deterministic** order.
574 ///
575 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
576 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
577 ///
578 /// # Example
579 /// ```rust
580 /// # #[cfg(feature = "deploy")] {
581 /// # use hydro_lang::prelude::*;
582 /// # use futures::StreamExt;
583 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
584 /// process
585 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
586 /// .flat_map_ordered(q!(|x| x))
587 /// # }, |mut stream| async move {
588 /// // 1, 2, 3, 4
589 /// # for w in (1..5) {
590 /// # assert_eq!(stream.next().await.unwrap(), w);
591 /// # }
592 /// # }));
593 /// # }
594 /// ```
595 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
596 where
597 I: IntoIterator<Item = U>,
598 F: Fn(T) -> I + 'a,
599 {
600 let f = crate::singleton_ref::with_singleton_capture(|| {
601 f.splice_fn1_ctx(&self.location).into()
602 });
603 Stream::new(
604 self.location.clone(),
605 HydroNode::FlatMap {
606 f,
607 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
608 metadata: self
609 .location
610 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
611 },
612 )
613 }
614
615 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
616 /// for the output type `U` to produce items in any order.
617 ///
618 /// # Example
619 /// ```rust
620 /// # #[cfg(feature = "deploy")] {
621 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
622 /// # use futures::StreamExt;
623 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
624 /// process
625 /// .source_iter(q!(vec![
626 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
627 /// std::collections::HashSet::from_iter(vec![3, 4]),
628 /// ]))
629 /// .flat_map_unordered(q!(|x| x))
630 /// # }, |mut stream| async move {
631 /// // 1, 2, 3, 4, but in no particular order
632 /// # let mut results = Vec::new();
633 /// # for w in (1..5) {
634 /// # results.push(stream.next().await.unwrap());
635 /// # }
636 /// # results.sort();
637 /// # assert_eq!(results, vec![1, 2, 3, 4]);
638 /// # }));
639 /// # }
640 /// ```
641 pub fn flat_map_unordered<U, I, F>(
642 self,
643 f: impl IntoQuotedMut<'a, F, L>,
644 ) -> Stream<U, L, B, NoOrder, R>
645 where
646 I: IntoIterator<Item = U>,
647 F: Fn(T) -> I + 'a,
648 {
649 let f = crate::singleton_ref::with_singleton_capture(|| {
650 f.splice_fn1_ctx(&self.location).into()
651 });
652 Stream::new(
653 self.location.clone(),
654 HydroNode::FlatMap {
655 f,
656 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
657 metadata: self
658 .location
659 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
660 },
661 )
662 }
663
664 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
665 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
666 ///
667 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
668 /// not deterministic, use [`Stream::flatten_unordered`] instead.
669 ///
670 /// ```rust
671 /// # #[cfg(feature = "deploy")] {
672 /// # use hydro_lang::prelude::*;
673 /// # use futures::StreamExt;
674 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
675 /// process
676 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
677 /// .flatten_ordered()
678 /// # }, |mut stream| async move {
679 /// // 1, 2, 3, 4
680 /// # for w in (1..5) {
681 /// # assert_eq!(stream.next().await.unwrap(), w);
682 /// # }
683 /// # }));
684 /// # }
685 /// ```
686 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
687 where
688 T: IntoIterator<Item = U>,
689 {
690 self.flat_map_ordered(q!(|d| d))
691 }
692
693 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
694 /// for the element type `T` to produce items in any order.
695 ///
696 /// # Example
697 /// ```rust
698 /// # #[cfg(feature = "deploy")] {
699 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
700 /// # use futures::StreamExt;
701 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
702 /// process
703 /// .source_iter(q!(vec![
704 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
705 /// std::collections::HashSet::from_iter(vec![3, 4]),
706 /// ]))
707 /// .flatten_unordered()
708 /// # }, |mut stream| async move {
709 /// // 1, 2, 3, 4, but in no particular order
710 /// # let mut results = Vec::new();
711 /// # for w in (1..5) {
712 /// # results.push(stream.next().await.unwrap());
713 /// # }
714 /// # results.sort();
715 /// # assert_eq!(results, vec![1, 2, 3, 4]);
716 /// # }));
717 /// # }
718 /// ```
719 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
720 where
721 T: IntoIterator<Item = U>,
722 {
723 self.flat_map_unordered(q!(|d| d))
724 }
725
726 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
727 /// then emit the elements of that stream one by one. When the inner stream yields
728 /// `Pending`, this operator yields as well.
729 pub fn flat_map_stream_blocking<U, S, F>(
730 self,
731 f: impl IntoQuotedMut<'a, F, L>,
732 ) -> Stream<U, L, B, O, R>
733 where
734 S: futures::Stream<Item = U>,
735 F: Fn(T) -> S + 'a,
736 {
737 let f = f.splice_fn1_ctx(&self.location).into();
738 Stream::new(
739 self.location.clone(),
740 HydroNode::FlatMapStreamBlocking {
741 f,
742 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
743 metadata: self
744 .location
745 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
746 },
747 )
748 }
749
750 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
751 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
752 /// yields as well.
753 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
754 where
755 T: futures::Stream<Item = U>,
756 {
757 self.flat_map_stream_blocking(q!(|d| d))
758 }
759
760 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
761 /// `f`, preserving the order of the elements.
762 ///
763 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
764 /// not modify or take ownership of the values. If you need to modify the values while filtering
765 /// use [`Stream::filter_map`] instead.
766 ///
767 /// # Example
768 /// ```rust
769 /// # #[cfg(feature = "deploy")] {
770 /// # use hydro_lang::prelude::*;
771 /// # use futures::StreamExt;
772 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773 /// process
774 /// .source_iter(q!(vec![1, 2, 3, 4]))
775 /// .filter(q!(|&x| x > 2))
776 /// # }, |mut stream| async move {
777 /// // 3, 4
778 /// # for w in (3..5) {
779 /// # assert_eq!(stream.next().await.unwrap(), w);
780 /// # }
781 /// # }));
782 /// # }
783 /// ```
784 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
785 where
786 F: Fn(&T) -> bool + 'a,
787 {
788 let f = crate::singleton_ref::with_singleton_capture(|| {
789 f.splice_fn1_borrow_ctx(&self.location).into()
790 });
791 Stream::new(
792 self.location.clone(),
793 HydroNode::Filter {
794 f,
795 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
796 metadata: self.location.new_node_metadata(Self::collection_kind()),
797 },
798 )
799 }
800
801 /// Splits the stream into two streams based on a predicate, without cloning elements.
802 ///
803 /// Elements for which `f` returns `true` are sent to the first output stream,
804 /// and elements for which `f` returns `false` are sent to the second output stream.
805 ///
806 /// Unlike using `filter` twice, this only evaluates the predicate once per element
807 /// and does not require `T: Clone`.
808 ///
809 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
810 /// the predicate is only used for routing; the element itself is moved to the
811 /// appropriate output stream.
812 ///
813 /// # Example
814 /// ```rust
815 /// # #[cfg(feature = "deploy")] {
816 /// # use hydro_lang::prelude::*;
817 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
818 /// # use futures::StreamExt;
819 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
820 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
821 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
822 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
823 /// evens.map(q!(|x| (x, true)))
824 /// .merge_unordered(odds.map(q!(|x| (x, false))))
825 /// # }, |mut stream| async move {
826 /// # let mut results = Vec::new();
827 /// # for _ in 0..6 {
828 /// # results.push(stream.next().await.unwrap());
829 /// # }
830 /// # results.sort();
831 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
832 /// # }));
833 /// # }
834 /// ```
835 #[expect(
836 clippy::type_complexity,
837 reason = "return type mirrors the input stream type"
838 )]
839 pub fn partition<F>(
840 self,
841 f: impl IntoQuotedMut<'a, F, L>,
842 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
843 where
844 F: Fn(&T) -> bool + 'a,
845 {
846 let f = crate::singleton_ref::with_singleton_capture(|| {
847 f.splice_fn1_borrow_ctx(&self.location).into()
848 });
849 let shared = SharedNode(Rc::new(RefCell::new(
850 self.ir_node.replace(HydroNode::Placeholder),
851 )));
852
853 let true_stream = Stream::new(
854 self.location.clone(),
855 HydroNode::Partition {
856 inner: SharedNode(shared.0.clone()),
857 f: f.clone(),
858 is_true: true,
859 metadata: self.location.new_node_metadata(Self::collection_kind()),
860 },
861 );
862
863 let false_stream = Stream::new(
864 self.location.clone(),
865 HydroNode::Partition {
866 inner: SharedNode(shared.0),
867 f,
868 is_true: false,
869 metadata: self.location.new_node_metadata(Self::collection_kind()),
870 },
871 );
872
873 (true_stream, false_stream)
874 }
875
876 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
877 ///
878 /// # Example
879 /// ```rust
880 /// # #[cfg(feature = "deploy")] {
881 /// # use hydro_lang::prelude::*;
882 /// # use futures::StreamExt;
883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884 /// process
885 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
886 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
887 /// # }, |mut stream| async move {
888 /// // 1, 2
889 /// # for w in (1..3) {
890 /// # assert_eq!(stream.next().await.unwrap(), w);
891 /// # }
892 /// # }));
893 /// # }
894 /// ```
895 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
896 where
897 F: Fn(T) -> Option<U> + 'a,
898 {
899 let f = f.splice_fn1_ctx(&self.location).into();
900 Stream::new(
901 self.location.clone(),
902 HydroNode::FilterMap {
903 f,
904 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
905 metadata: self
906 .location
907 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
908 },
909 )
910 }
911
912 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
913 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
914 /// If `other` is an empty [`Optional`], no values will be produced.
915 ///
916 /// # Example
917 /// ```rust
918 /// # #[cfg(feature = "deploy")] {
919 /// # use hydro_lang::prelude::*;
920 /// # use futures::StreamExt;
921 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922 /// let tick = process.tick();
923 /// let batch = process
924 /// .source_iter(q!(vec![1, 2, 3, 4]))
925 /// .batch(&tick, nondet!(/** test */));
926 /// let count = batch.clone().count(); // `count()` returns a singleton
927 /// batch.cross_singleton(count).all_ticks()
928 /// # }, |mut stream| async move {
929 /// // (1, 4), (2, 4), (3, 4), (4, 4)
930 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
931 /// # assert_eq!(stream.next().await.unwrap(), w);
932 /// # }
933 /// # }));
934 /// # }
935 /// ```
936 pub fn cross_singleton<O2>(
937 self,
938 other: impl Into<Optional<O2, L, Bounded>>,
939 ) -> Stream<(T, O2), L, B, O, R>
940 where
941 O2: Clone,
942 {
943 let other: Optional<O2, L, Bounded> = other.into();
944 check_matching_location(&self.location, &other.location);
945
946 Stream::new(
947 self.location.clone(),
948 HydroNode::CrossSingleton {
949 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
950 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
951 metadata: self
952 .location
953 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
954 },
955 )
956 }
957
958 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
959 ///
960 /// # Example
961 /// ```rust
962 /// # #[cfg(feature = "deploy")] {
963 /// # use hydro_lang::prelude::*;
964 /// # use futures::StreamExt;
965 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
966 /// let tick = process.tick();
967 /// // ticks are lazy by default, forces the second tick to run
968 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
969 ///
970 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
971 /// let batch_first_tick = process
972 /// .source_iter(q!(vec![1, 2, 3, 4]))
973 /// .batch(&tick, nondet!(/** test */));
974 /// let batch_second_tick = process
975 /// .source_iter(q!(vec![5, 6, 7, 8]))
976 /// .batch(&tick, nondet!(/** test */))
977 /// .defer_tick();
978 /// batch_first_tick.chain(batch_second_tick)
979 /// .filter_if(signal)
980 /// .all_ticks()
981 /// # }, |mut stream| async move {
982 /// // [1, 2, 3, 4]
983 /// # for w in vec![1, 2, 3, 4] {
984 /// # assert_eq!(stream.next().await.unwrap(), w);
985 /// # }
986 /// # }));
987 /// # }
988 /// ```
989 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
990 self.cross_singleton(signal.filter(q!(|b| *b)))
991 .map(q!(|(d, _)| d))
992 }
993
994 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
995 ///
996 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
997 /// leader of a cluster.
998 ///
999 /// # Example
1000 /// ```rust
1001 /// # #[cfg(feature = "deploy")] {
1002 /// # use hydro_lang::prelude::*;
1003 /// # use futures::StreamExt;
1004 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1005 /// let tick = process.tick();
1006 /// // ticks are lazy by default, forces the second tick to run
1007 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1008 ///
1009 /// let batch_first_tick = process
1010 /// .source_iter(q!(vec![1, 2, 3, 4]))
1011 /// .batch(&tick, nondet!(/** test */));
1012 /// let batch_second_tick = process
1013 /// .source_iter(q!(vec![5, 6, 7, 8]))
1014 /// .batch(&tick, nondet!(/** test */))
1015 /// .defer_tick(); // appears on the second tick
1016 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1017 /// batch_first_tick.chain(batch_second_tick)
1018 /// .filter_if_some(some_on_first_tick)
1019 /// .all_ticks()
1020 /// # }, |mut stream| async move {
1021 /// // [1, 2, 3, 4]
1022 /// # for w in vec![1, 2, 3, 4] {
1023 /// # assert_eq!(stream.next().await.unwrap(), w);
1024 /// # }
1025 /// # }));
1026 /// # }
1027 /// ```
1028 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1029 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1030 self.filter_if(signal.is_some())
1031 }
1032
1033 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1034 ///
1035 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1036 /// some local state.
1037 ///
1038 /// # Example
1039 /// ```rust
1040 /// # #[cfg(feature = "deploy")] {
1041 /// # use hydro_lang::prelude::*;
1042 /// # use futures::StreamExt;
1043 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1044 /// let tick = process.tick();
1045 /// // ticks are lazy by default, forces the second tick to run
1046 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1047 ///
1048 /// let batch_first_tick = process
1049 /// .source_iter(q!(vec![1, 2, 3, 4]))
1050 /// .batch(&tick, nondet!(/** test */));
1051 /// let batch_second_tick = process
1052 /// .source_iter(q!(vec![5, 6, 7, 8]))
1053 /// .batch(&tick, nondet!(/** test */))
1054 /// .defer_tick(); // appears on the second tick
1055 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1056 /// batch_first_tick.chain(batch_second_tick)
1057 /// .filter_if_none(some_on_first_tick)
1058 /// .all_ticks()
1059 /// # }, |mut stream| async move {
1060 /// // [5, 6, 7, 8]
1061 /// # for w in vec![5, 6, 7, 8] {
1062 /// # assert_eq!(stream.next().await.unwrap(), w);
1063 /// # }
1064 /// # }));
1065 /// # }
1066 /// ```
1067 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1068 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1069 self.filter_if(other.is_none())
1070 }
1071
1072 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1073 /// returning all tupled pairs.
1074 ///
1075 /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1076 /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1077 /// symmetric hash join is used and ordering is [`NoOrder`].
1078 ///
1079 /// # Example
1080 /// ```rust
1081 /// # #[cfg(feature = "deploy")] {
1082 /// # use hydro_lang::prelude::*;
1083 /// # use std::collections::HashSet;
1084 /// # use futures::StreamExt;
1085 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1086 /// let tick = process.tick();
1087 /// let stream1 = process.source_iter(q!(vec![1, 2]));
1088 /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1089 /// stream1.cross_product(stream2)
1090 /// # }, |mut stream| async move {
1091 /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1092 /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1093 /// # stream.map(|i| assert!(expected.contains(&i)));
1094 /// # }));
1095 /// # }
1096 #[expect(
1097 clippy::type_complexity,
1098 reason = "MinRetries projection in return type"
1099 )]
1100 pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1101 self,
1102 other: Stream<T2, L, B2, O2, R2>,
1103 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1104 where
1105 T: Clone,
1106 T2: Clone,
1107 R: MinRetries<R2>,
1108 {
1109 self.map(q!(|v| ((), v)))
1110 .join(other.map(q!(|v| ((), v))))
1111 .map(q!(|((), (v1, v2))| (v1, v2)))
1112 }
1113
1114 /// Takes one stream as input and filters out any duplicate occurrences. The output
1115 /// contains all unique values from the input.
1116 ///
1117 /// # Example
1118 /// ```rust
1119 /// # #[cfg(feature = "deploy")] {
1120 /// # use hydro_lang::prelude::*;
1121 /// # use futures::StreamExt;
1122 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123 /// let tick = process.tick();
1124 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1125 /// # }, |mut stream| async move {
1126 /// # for w in vec![1, 2, 3, 4] {
1127 /// # assert_eq!(stream.next().await.unwrap(), w);
1128 /// # }
1129 /// # }));
1130 /// # }
1131 /// ```
1132 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1133 where
1134 T: Eq + Hash,
1135 {
1136 Stream::new(
1137 self.location.clone(),
1138 HydroNode::Unique {
1139 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1140 metadata: self
1141 .location
1142 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1143 },
1144 )
1145 }
1146
1147 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1148 ///
1149 /// The `other` stream must be [`Bounded`], since this function will wait until
1150 /// all its elements are available before producing any output.
1151 /// # Example
1152 /// ```rust
1153 /// # #[cfg(feature = "deploy")] {
1154 /// # use hydro_lang::prelude::*;
1155 /// # use futures::StreamExt;
1156 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1157 /// let tick = process.tick();
1158 /// let stream = process
1159 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1160 /// .batch(&tick, nondet!(/** test */));
1161 /// let batch = process
1162 /// .source_iter(q!(vec![1, 2]))
1163 /// .batch(&tick, nondet!(/** test */));
1164 /// stream.filter_not_in(batch).all_ticks()
1165 /// # }, |mut stream| async move {
1166 /// # for w in vec![3, 4] {
1167 /// # assert_eq!(stream.next().await.unwrap(), w);
1168 /// # }
1169 /// # }));
1170 /// # }
1171 /// ```
1172 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1173 where
1174 T: Eq + Hash,
1175 B2: IsBounded,
1176 {
1177 check_matching_location(&self.location, &other.location);
1178
1179 Stream::new(
1180 self.location.clone(),
1181 HydroNode::Difference {
1182 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1183 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1184 metadata: self
1185 .location
1186 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1187 },
1188 )
1189 }
1190
1191 /// An operator which allows you to "inspect" each element of a stream without
1192 /// modifying it. The closure `f` is called on a reference to each item. This is
1193 /// mainly useful for debugging, and should not be used to generate side-effects.
1194 ///
1195 /// # Example
1196 /// ```rust
1197 /// # #[cfg(feature = "deploy")] {
1198 /// # use hydro_lang::prelude::*;
1199 /// # use futures::StreamExt;
1200 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1201 /// let nums = process.source_iter(q!(vec![1, 2]));
1202 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1203 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1204 /// # }, |mut stream| async move {
1205 /// # for w in vec![1, 2] {
1206 /// # assert_eq!(stream.next().await.unwrap(), w);
1207 /// # }
1208 /// # }));
1209 /// # }
1210 /// ```
1211 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L::DropConsistency>) -> Self
1212 where
1213 F: Fn(&T) + 'a,
1214 {
1215 let f = crate::singleton_ref::with_singleton_capture(|| {
1216 f.splice_fn1_borrow_ctx(&self.location.drop_consistency())
1217 .into()
1218 });
1219
1220 Stream::new(
1221 self.location.clone(),
1222 HydroNode::Inspect {
1223 f,
1224 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1225 metadata: self.location.new_node_metadata(Self::collection_kind()),
1226 },
1227 )
1228 }
1229
1230 /// Executes the provided closure for every element in this stream.
1231 ///
1232 /// Because the closure may have side effects, the stream must have deterministic order
1233 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1234 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1235 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1236 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1237 where
1238 O: IsOrdered,
1239 R: IsExactlyOnce,
1240 {
1241 let f = f.splice_fn1_ctx(&self.location).into();
1242 self.location
1243 .flow_state()
1244 .borrow_mut()
1245 .push_root(HydroRoot::ForEach {
1246 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247 f,
1248 op_metadata: HydroIrOpMetadata::new(),
1249 });
1250 }
1251
1252 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1253 /// TCP socket to some other server. You should _not_ use this API for interacting with
1254 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1255 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1256 /// interaction with asynchronous sinks.
1257 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1258 where
1259 O: IsOrdered,
1260 R: IsExactlyOnce,
1261 S: 'a + futures::Sink<T> + Unpin,
1262 {
1263 self.location
1264 .flow_state()
1265 .borrow_mut()
1266 .push_root(HydroRoot::DestSink {
1267 sink: sink.splice_typed_ctx(&self.location).into(),
1268 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1269 op_metadata: HydroIrOpMetadata::new(),
1270 });
1271 }
1272
1273 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1274 ///
1275 /// # Example
1276 /// ```rust
1277 /// # #[cfg(feature = "deploy")] {
1278 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1279 /// # use futures::StreamExt;
1280 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1281 /// let tick = process.tick();
1282 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1283 /// numbers.enumerate()
1284 /// # }, |mut stream| async move {
1285 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1286 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1287 /// # assert_eq!(stream.next().await.unwrap(), w);
1288 /// # }
1289 /// # }));
1290 /// # }
1291 /// ```
1292 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1293 where
1294 O: IsOrdered,
1295 R: IsExactlyOnce,
1296 {
1297 Stream::new(
1298 self.location.clone(),
1299 HydroNode::Enumerate {
1300 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1301 metadata: self.location.new_node_metadata(Stream::<
1302 (usize, T),
1303 L,
1304 B,
1305 TotalOrder,
1306 ExactlyOnce,
1307 >::collection_kind()),
1308 },
1309 )
1310 }
1311
1312 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1313 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1314 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1315 ///
1316 /// Depending on the input stream guarantees, the closure may need to be commutative
1317 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1318 ///
1319 /// # Example
1320 /// ```rust
1321 /// # #[cfg(feature = "deploy")] {
1322 /// # use hydro_lang::prelude::*;
1323 /// # use futures::StreamExt;
1324 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1326 /// words
1327 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1328 /// .into_stream()
1329 /// # }, |mut stream| async move {
1330 /// // "HELLOWORLD"
1331 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1332 /// # }));
1333 /// # }
1334 /// ```
1335 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1336 self,
1337 init: impl IntoQuotedMut<'a, I, L>,
1338 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1339 ) -> Singleton<A, L, B2>
1340 where
1341 I: Fn() -> A + 'a,
1342 F: 'a + Fn(&mut A, T),
1343 C: ValidCommutativityFor<O> + crate::properties::IsProved,
1344 Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
1345 B: ApplyMonotoneStream<M, B2>,
1346 {
1347 let init = init.splice_fn0_ctx(&self.location).into();
1348 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1349 proof.register_proof(&comb);
1350
1351 // Only assume_retries (for idempotence), not assume_ordering.
1352 // The fold hook in the simulator handles ordering non-determinism directly.
1353 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1354 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1355
1356 let core = HydroNode::Fold {
1357 init,
1358 acc: comb.into(),
1359 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1360 is_commutative: C::IS_PROVED,
1361 is_idempotent: Idemp::IS_PROVED,
1362 metadata: retried
1363 .location
1364 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1365 // we do not guarantee consistency at this point because if the algebraic properties
1366 // do not hold in practice, replica consistency may fail to be maintained, so we
1367 // would like the simulator to assert consistency; in the future, this will be dynamic
1368 // based on the proof mechanism
1369 };
1370
1371 Singleton::new(retried.location.clone(), core)
1372 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1373 }
1374
1375 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1376 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1377 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1378 /// reference, so that it can be modified in place.
1379 ///
1380 /// Depending on the input stream guarantees, the closure may need to be commutative
1381 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1382 ///
1383 /// # Example
1384 /// ```rust
1385 /// # #[cfg(feature = "deploy")] {
1386 /// # use hydro_lang::prelude::*;
1387 /// # use futures::StreamExt;
1388 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1389 /// let bools = process.source_iter(q!(vec![false, true, false]));
1390 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1391 /// # }, |mut stream| async move {
1392 /// // true
1393 /// # assert_eq!(stream.next().await.unwrap(), true);
1394 /// # }));
1395 /// # }
1396 /// ```
1397 pub fn reduce<F, C, Idemp>(
1398 self,
1399 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1400 ) -> Optional<T, L, B>
1401 where
1402 F: Fn(&mut T, T) + 'a,
1403 C: ValidCommutativityFor<O> + crate::properties::IsProved,
1404 Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
1405 {
1406 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1407 proof.register_proof(&f);
1408
1409 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1410 let ordered_etc: Stream<T, L::DropConsistency, B> =
1411 self.assume_retries(nondet).assume_ordering(nondet);
1412
1413 let core = HydroNode::Reduce {
1414 f: f.into(),
1415 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1416 is_commutative: C::IS_PROVED,
1417 is_idempotent: Idemp::IS_PROVED,
1418 metadata: ordered_etc
1419 .location
1420 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1421 };
1422
1423 Optional::new(ordered_etc.location.clone(), core)
1424 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1425 }
1426
1427 /// Computes the maximum element in the stream as an [`Optional`], which
1428 /// will be empty until the first element in the input arrives.
1429 ///
1430 /// # Example
1431 /// ```rust
1432 /// # #[cfg(feature = "deploy")] {
1433 /// # use hydro_lang::prelude::*;
1434 /// # use futures::StreamExt;
1435 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1436 /// let tick = process.tick();
1437 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1438 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1439 /// batch.max().all_ticks()
1440 /// # }, |mut stream| async move {
1441 /// // 4
1442 /// # assert_eq!(stream.next().await.unwrap(), 4);
1443 /// # }));
1444 /// # }
1445 /// ```
1446 pub fn max(self) -> Optional<T, L, B>
1447 where
1448 T: Ord,
1449 {
1450 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1451 .assume_ordering_trusted_bounded::<TotalOrder>(
1452 nondet!(/** max is commutative, but order affects intermediates */),
1453 )
1454 .reduce(q!(|curr, new| {
1455 if new > *curr {
1456 *curr = new;
1457 }
1458 }))
1459 }
1460
1461 /// Computes the minimum element in the stream as an [`Optional`], which
1462 /// will be empty until the first element in the input arrives.
1463 ///
1464 /// # Example
1465 /// ```rust
1466 /// # #[cfg(feature = "deploy")] {
1467 /// # use hydro_lang::prelude::*;
1468 /// # use futures::StreamExt;
1469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1470 /// let tick = process.tick();
1471 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1472 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1473 /// batch.min().all_ticks()
1474 /// # }, |mut stream| async move {
1475 /// // 1
1476 /// # assert_eq!(stream.next().await.unwrap(), 1);
1477 /// # }));
1478 /// # }
1479 /// ```
1480 pub fn min(self) -> Optional<T, L, B>
1481 where
1482 T: Ord,
1483 {
1484 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1485 .assume_ordering_trusted_bounded::<TotalOrder>(
1486 nondet!(/** max is commutative, but order affects intermediates */),
1487 )
1488 .reduce(q!(|curr, new| {
1489 if new < *curr {
1490 *curr = new;
1491 }
1492 }))
1493 }
1494
1495 /// Computes the first element in the stream as an [`Optional`], which
1496 /// will be empty until the first element in the input arrives.
1497 ///
1498 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1499 /// re-ordering of elements may cause the first element to change.
1500 ///
1501 /// # Example
1502 /// ```rust
1503 /// # #[cfg(feature = "deploy")] {
1504 /// # use hydro_lang::prelude::*;
1505 /// # use futures::StreamExt;
1506 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1507 /// let tick = process.tick();
1508 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1509 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1510 /// batch.first().all_ticks()
1511 /// # }, |mut stream| async move {
1512 /// // 1
1513 /// # assert_eq!(stream.next().await.unwrap(), 1);
1514 /// # }));
1515 /// # }
1516 /// ```
1517 pub fn first(self) -> Optional<T, L, B>
1518 where
1519 O: IsOrdered,
1520 {
1521 self.make_totally_ordered()
1522 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1523 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1524 .reduce(q!(|_, _| {}))
1525 }
1526
1527 /// Computes the last element in the stream as an [`Optional`], which
1528 /// will be empty until an element in the input arrives.
1529 ///
1530 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1531 /// re-ordering of elements may cause the last element to change.
1532 ///
1533 /// # Example
1534 /// ```rust
1535 /// # #[cfg(feature = "deploy")] {
1536 /// # use hydro_lang::prelude::*;
1537 /// # use futures::StreamExt;
1538 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539 /// let tick = process.tick();
1540 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1541 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1542 /// batch.last().all_ticks()
1543 /// # }, |mut stream| async move {
1544 /// // 4
1545 /// # assert_eq!(stream.next().await.unwrap(), 4);
1546 /// # }));
1547 /// # }
1548 /// ```
1549 pub fn last(self) -> Optional<T, L, B>
1550 where
1551 O: IsOrdered,
1552 {
1553 self.make_totally_ordered()
1554 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1555 .reduce(q!(|curr, new| *curr = new))
1556 }
1557
1558 /// Returns a stream containing at most the first `n` elements of the input stream,
1559 /// preserving the original order. Similar to `LIMIT` in SQL.
1560 ///
1561 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1562 /// retries, since the result depends on the order and cardinality of elements.
1563 ///
1564 /// # Example
1565 /// ```rust
1566 /// # #[cfg(feature = "deploy")] {
1567 /// # use hydro_lang::prelude::*;
1568 /// # use futures::StreamExt;
1569 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1570 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1571 /// numbers.limit(q!(3))
1572 /// # }, |mut stream| async move {
1573 /// // 10, 20, 30
1574 /// # for w in vec![10, 20, 30] {
1575 /// # assert_eq!(stream.next().await.unwrap(), w);
1576 /// # }
1577 /// # }));
1578 /// # }
1579 /// ```
1580 pub fn limit(
1581 self,
1582 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1583 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1584 where
1585 O: IsOrdered,
1586 R: IsExactlyOnce,
1587 {
1588 self.generator(
1589 q!(|| 0usize),
1590 q!(move |count, item| {
1591 if *count == n {
1592 Generate::Break
1593 } else {
1594 *count += 1;
1595 if *count == n {
1596 Generate::Return(item)
1597 } else {
1598 Generate::Yield(item)
1599 }
1600 }
1601 }),
1602 )
1603 }
1604
1605 /// Collects all the elements of this stream into a single [`Vec`] element.
1606 ///
1607 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1608 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1609 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1610 /// the vector at an arbitrary point in time.
1611 ///
1612 /// # Example
1613 /// ```rust
1614 /// # #[cfg(feature = "deploy")] {
1615 /// # use hydro_lang::prelude::*;
1616 /// # use futures::StreamExt;
1617 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1618 /// let tick = process.tick();
1619 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1620 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1621 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1622 /// # }, |mut stream| async move {
1623 /// // [ vec![1, 2, 3, 4] ]
1624 /// # for w in vec![vec![1, 2, 3, 4]] {
1625 /// # assert_eq!(stream.next().await.unwrap(), w);
1626 /// # }
1627 /// # }));
1628 /// # }
1629 /// ```
1630 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1631 where
1632 O: IsOrdered,
1633 R: IsExactlyOnce,
1634 {
1635 self.make_totally_ordered().make_exactly_once().fold(
1636 q!(|| vec![]),
1637 q!(|acc, v| {
1638 acc.push(v);
1639 }),
1640 )
1641 }
1642
1643 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1644 /// and emitting each intermediate result.
1645 ///
1646 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1647 /// containing all intermediate accumulated values. The scan operation can also terminate early
1648 /// by returning `None`.
1649 ///
1650 /// The function takes a mutable reference to the accumulator and the current element, and returns
1651 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1652 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1653 ///
1654 /// # Examples
1655 ///
1656 /// Basic usage - running sum:
1657 /// ```rust
1658 /// # #[cfg(feature = "deploy")] {
1659 /// # use hydro_lang::prelude::*;
1660 /// # use futures::StreamExt;
1661 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1662 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1663 /// q!(|| 0),
1664 /// q!(|acc, x| {
1665 /// *acc += x;
1666 /// Some(*acc)
1667 /// }),
1668 /// )
1669 /// # }, |mut stream| async move {
1670 /// // Output: 1, 3, 6, 10
1671 /// # for w in vec![1, 3, 6, 10] {
1672 /// # assert_eq!(stream.next().await.unwrap(), w);
1673 /// # }
1674 /// # }));
1675 /// # }
1676 /// ```
1677 ///
1678 /// Early termination example:
1679 /// ```rust
1680 /// # #[cfg(feature = "deploy")] {
1681 /// # use hydro_lang::prelude::*;
1682 /// # use futures::StreamExt;
1683 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1684 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1685 /// q!(|| 1),
1686 /// q!(|state, x| {
1687 /// *state = *state * x;
1688 /// if *state > 6 {
1689 /// None // Terminate the stream
1690 /// } else {
1691 /// Some(-*state)
1692 /// }
1693 /// }),
1694 /// )
1695 /// # }, |mut stream| async move {
1696 /// // Output: -1, -2, -6
1697 /// # for w in vec![-1, -2, -6] {
1698 /// # assert_eq!(stream.next().await.unwrap(), w);
1699 /// # }
1700 /// # }));
1701 /// # }
1702 /// ```
1703 pub fn scan<A, U, I, F>(
1704 self,
1705 init: impl IntoQuotedMut<'a, I, L>,
1706 f: impl IntoQuotedMut<'a, F, L>,
1707 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1708 where
1709 O: IsOrdered,
1710 R: IsExactlyOnce,
1711 I: Fn() -> A + 'a,
1712 F: Fn(&mut A, T) -> Option<U> + 'a,
1713 {
1714 let init = init.splice_fn0_ctx(&self.location).into();
1715 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1716
1717 Stream::new(
1718 self.location.clone(),
1719 HydroNode::Scan {
1720 init,
1721 acc: f,
1722 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1723 metadata: self.location.new_node_metadata(
1724 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1725 ),
1726 },
1727 )
1728 }
1729
1730 /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1731 /// stream, maintaining an internal state (accumulator) and emitting the values returned
1732 /// by the function.
1733 ///
1734 /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1735 /// future. The future is polled to completion. If it resolves to `Some`, the value is
1736 /// emitted. If it resolves to `None`, the item is filtered out.
1737 ///
1738 /// # Examples
1739 ///
1740 /// ```rust
1741 /// # #[cfg(feature = "deploy")] {
1742 /// # use hydro_lang::prelude::*;
1743 /// # use futures::StreamExt;
1744 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1745 /// process
1746 /// .source_iter(q!(vec![1, 2, 3, 4]))
1747 /// .scan_async_blocking(
1748 /// q!(|| 0),
1749 /// q!(|acc, x| {
1750 /// *acc += x;
1751 /// let val = *acc;
1752 /// async move { Some(val) }
1753 /// }),
1754 /// )
1755 /// # }, |mut stream| async move {
1756 /// // Output: 1, 3, 6, 10
1757 /// # for w in vec![1, 3, 6, 10] {
1758 /// # assert_eq!(stream.next().await.unwrap(), w);
1759 /// # }
1760 /// # }));
1761 /// # }
1762 /// ```
1763 pub fn scan_async_blocking<A, U, I, F, Fut>(
1764 self,
1765 init: impl IntoQuotedMut<'a, I, L>,
1766 f: impl IntoQuotedMut<'a, F, L>,
1767 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1768 where
1769 O: IsOrdered,
1770 R: IsExactlyOnce,
1771 I: Fn() -> A + 'a,
1772 F: Fn(&mut A, T) -> Fut + 'a,
1773 Fut: Future<Output = Option<U>> + 'a,
1774 {
1775 let init = init.splice_fn0_ctx(&self.location).into();
1776 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1777
1778 Stream::new(
1779 self.location.clone(),
1780 HydroNode::ScanAsyncBlocking {
1781 init,
1782 acc: f,
1783 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1784 metadata: self.location.new_node_metadata(
1785 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1786 ),
1787 },
1788 )
1789 }
1790
1791 /// Iteratively processes the elements of the stream using a state machine that can yield
1792 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1793 /// syntax in Rust, without requiring special syntax.
1794 ///
1795 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1796 /// state. The second argument defines the processing logic, taking in a mutable reference
1797 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1798 /// variants define what is emitted and whether further inputs should be processed.
1799 ///
1800 /// # Example
1801 /// ```rust
1802 /// # #[cfg(feature = "deploy")] {
1803 /// # use hydro_lang::prelude::*;
1804 /// # use futures::StreamExt;
1805 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1806 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1807 /// q!(|| 0),
1808 /// q!(|acc, x| {
1809 /// *acc += x;
1810 /// if *acc > 100 {
1811 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1812 /// } else if *acc % 2 == 0 {
1813 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1814 /// } else {
1815 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1816 /// }
1817 /// }),
1818 /// )
1819 /// # }, |mut stream| async move {
1820 /// // Output: "even", "done!"
1821 /// # let mut results = Vec::new();
1822 /// # for _ in 0..2 {
1823 /// # results.push(stream.next().await.unwrap());
1824 /// # }
1825 /// # results.sort();
1826 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1827 /// # }));
1828 /// # }
1829 /// ```
1830 pub fn generator<A, U, I, F>(
1831 self,
1832 init: impl IntoQuotedMut<'a, I, L> + Copy,
1833 f: impl IntoQuotedMut<'a, F, L> + Copy,
1834 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1835 where
1836 O: IsOrdered,
1837 R: IsExactlyOnce,
1838 I: Fn() -> A + 'a,
1839 F: Fn(&mut A, T) -> Generate<U> + 'a,
1840 {
1841 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1842 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1843
1844 let this = self.make_totally_ordered().make_exactly_once();
1845
1846 // State is Option<Option<A>>:
1847 // None = not yet initialized
1848 // Some(Some(a)) = active with state a
1849 // Some(None) = terminated
1850 let scan_init = q!(|| None)
1851 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1852 .into();
1853 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1854 if state.is_none() {
1855 *state = Some(Some(init()));
1856 }
1857 match state {
1858 Some(Some(state_value)) => match f(state_value, v) {
1859 Generate::Yield(out) => Some(Some(out)),
1860 Generate::Return(out) => {
1861 *state = Some(None);
1862 Some(Some(out))
1863 }
1864 // Unlike KeyedStream, we can terminate the scan directly on
1865 // Break/Return because there is only one state (no other keys
1866 // that still need processing).
1867 Generate::Break => None,
1868 Generate::Continue => Some(None),
1869 },
1870 // State is Some(None) after Return; terminate the scan.
1871 _ => None,
1872 }
1873 })
1874 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1875 .into();
1876
1877 let scan_node = HydroNode::Scan {
1878 init: scan_init,
1879 acc: scan_f,
1880 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1881 metadata: this.location.new_node_metadata(Stream::<
1882 Option<U>,
1883 L,
1884 B,
1885 TotalOrder,
1886 ExactlyOnce,
1887 >::collection_kind()),
1888 };
1889
1890 let flatten_f = q!(|d| d)
1891 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1892 .into();
1893 let flatten_node = HydroNode::FlatMap {
1894 f: flatten_f,
1895 input: Box::new(scan_node),
1896 metadata: this
1897 .location
1898 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1899 };
1900
1901 Stream::new(this.location.clone(), flatten_node)
1902 }
1903
1904 /// Given a time interval, returns a stream corresponding to samples taken from the
1905 /// stream roughly at that interval. The output will have elements in the same order
1906 /// as the input, but with arbitrary elements skipped between samples. There is also
1907 /// no guarantee on the exact timing of the samples.
1908 ///
1909 /// # Non-Determinism
1910 /// The output stream is non-deterministic in which elements are sampled, since this
1911 /// is controlled by a clock.
1912 pub fn sample_every(
1913 self,
1914 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1915 nondet: NonDet,
1916 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1917 where
1918 L: TopLevel<'a>,
1919 {
1920 let samples = self.location.source_interval(interval);
1921
1922 let tick = self.location.tick();
1923 self.batch(&tick, nondet)
1924 .filter_if(samples.batch(&tick, nondet).first().is_some())
1925 .all_ticks()
1926 .weaken_retries()
1927 }
1928
1929 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1930 /// stream has not emitted a value since that duration.
1931 ///
1932 /// # Non-Determinism
1933 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1934 /// samples take place, timeouts may be non-deterministically generated or missed,
1935 /// and the notification of the timeout may be delayed as well. There is also no
1936 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1937 /// detected based on when the next sample is taken.
1938 pub fn timeout(
1939 self,
1940 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
1941 nondet: NonDet,
1942 ) -> Optional<(), L::DropConsistency, Unbounded>
1943 where
1944 L: TopLevel<'a>,
1945 {
1946 let tick = self.location.tick();
1947
1948 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1949 q!(|| None),
1950 q!(
1951 |latest, _| {
1952 *latest = Some(Instant::now());
1953 },
1954 commutative = manual_proof!(/** TODO */)
1955 ),
1956 );
1957
1958 latest_received
1959 .snapshot(&tick, nondet)
1960 .filter_map(q!(move |latest_received| {
1961 if let Some(latest_received) = latest_received {
1962 if Instant::now().duration_since(latest_received) > duration {
1963 Some(())
1964 } else {
1965 None
1966 }
1967 } else {
1968 Some(())
1969 }
1970 }))
1971 .latest()
1972 }
1973
1974 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1975 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1976 ///
1977 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1978 /// processed before an acknowledgement is emitted.
1979 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1980 let id = self.location.flow_state().borrow_mut().next_clock_id();
1981 let out_location = Atomic {
1982 tick: Tick {
1983 id,
1984 l: self.location.clone(),
1985 },
1986 };
1987 Stream::new(
1988 out_location.clone(),
1989 HydroNode::BeginAtomic {
1990 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1991 metadata: out_location
1992 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1993 },
1994 )
1995 }
1996
1997 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1998 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1999 /// the order of the input. The output stream will execute in the [`Tick`] that was
2000 /// used to create the atomic section.
2001 ///
2002 /// # Non-Determinism
2003 /// The batch boundaries are non-deterministic and may change across executions.
2004 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2005 self,
2006 tick: &Tick<L2>,
2007 _nondet: NonDet,
2008 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2009 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2010 Stream::new(
2011 tick.drop_consistency(),
2012 HydroNode::Batch {
2013 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2014 metadata: tick
2015 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2016 },
2017 )
2018 }
2019
2020 /// An operator which allows you to "name" a `HydroNode`.
2021 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2022 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2023 {
2024 let mut node = self.ir_node.borrow_mut();
2025 let metadata = node.metadata_mut();
2026 metadata.tag = Some(name.to_owned());
2027 }
2028 self
2029 }
2030
2031 /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2032 /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2033 /// so uses must be carefully vetted.
2034 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2035 where
2036 B: IsBounded,
2037 {
2038 Optional::new(
2039 self.location.clone(),
2040 HydroNode::Cast {
2041 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2042 metadata: self
2043 .location
2044 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2045 },
2046 )
2047 }
2048
2049 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2050 if O::ORDERING_KIND == O2::ORDERING_KIND {
2051 Stream::new(
2052 self.location.clone(),
2053 self.ir_node.replace(HydroNode::Placeholder),
2054 )
2055 } else {
2056 panic!(
2057 "Runtime ordering {:?} did not match requested cast {:?}.",
2058 O::ORDERING_KIND,
2059 O2::ORDERING_KIND
2060 )
2061 }
2062 }
2063
2064 /// Explicitly "casts" the stream to a type with a different ordering
2065 /// guarantee. Useful in unsafe code where the ordering cannot be proven
2066 /// by the type-system.
2067 ///
2068 /// # Non-Determinism
2069 /// This function is used as an escape hatch, and any mistakes in the
2070 /// provided ordering guarantee will propagate into the guarantees
2071 /// for the rest of the program.
2072 pub fn assume_ordering<O2: Ordering>(
2073 self,
2074 _nondet: NonDet,
2075 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2076 if O::ORDERING_KIND == O2::ORDERING_KIND {
2077 self.use_ordering_type().weaken_consistency()
2078 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2079 // We can always weaken the ordering guarantee
2080 let target_location = self.location().drop_consistency();
2081 Stream::new(
2082 target_location.clone(),
2083 HydroNode::Cast {
2084 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2085 metadata: target_location
2086 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2087 },
2088 )
2089 } else {
2090 let target_location = self.location().drop_consistency();
2091 Stream::new(
2092 target_location.clone(),
2093 HydroNode::ObserveNonDet {
2094 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2095 trusted: false,
2096 metadata: target_location
2097 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2098 },
2099 )
2100 }
2101 }
2102
2103 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2104 // intermediate states will not be revealed
2105 fn assume_ordering_trusted_bounded<O2: Ordering>(
2106 self,
2107 nondet: NonDet,
2108 ) -> Stream<T, L, B, O2, R> {
2109 if B::BOUNDED {
2110 self.assume_ordering_trusted(nondet)
2111 } else {
2112 let self_location = self.location.clone();
2113 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2114 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2115 }
2116 }
2117
2118 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2119 // is not observable
2120 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2121 self,
2122 _nondet: NonDet,
2123 ) -> Stream<T, L, B, O2, R> {
2124 if O::ORDERING_KIND == O2::ORDERING_KIND {
2125 self.use_ordering_type()
2126 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2127 // We can always weaken the ordering guarantee
2128 Stream::new(
2129 self.location.clone(),
2130 HydroNode::Cast {
2131 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2132 metadata: self
2133 .location
2134 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2135 },
2136 )
2137 } else {
2138 Stream::new(
2139 self.location.clone(),
2140 HydroNode::ObserveNonDet {
2141 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2142 trusted: true,
2143 metadata: self
2144 .location
2145 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2146 },
2147 )
2148 }
2149 }
2150
2151 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2152 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2153 /// which is always safe because that is the weakest possible guarantee.
2154 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2155 self.weaken_ordering::<NoOrder>()
2156 }
2157
2158 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2159 /// enforcing that `O2` is weaker than the input ordering guarantee.
2160 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2161 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2162 self.assume_ordering_trusted::<O2>(nondet)
2163 }
2164
2165 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2166 /// implies that `O == TotalOrder`.
2167 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2168 where
2169 O: IsOrdered,
2170 {
2171 self.assume_ordering_trusted(nondet!(/** no-op */))
2172 }
2173
2174 /// Explicitly "casts" the stream to a type with a different retries
2175 /// guarantee. Useful in unsafe code where the lack of retries cannot
2176 /// be proven by the type-system.
2177 ///
2178 /// # Non-Determinism
2179 /// This function is used as an escape hatch, and any mistakes in the
2180 /// provided retries guarantee will propagate into the guarantees
2181 /// for the rest of the program.
2182 pub fn assume_retries<R2: Retries>(
2183 self,
2184 _nondet: NonDet,
2185 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2186 if R::RETRIES_KIND == R2::RETRIES_KIND {
2187 Stream::new(
2188 self.location.drop_consistency(),
2189 self.ir_node.replace(HydroNode::Placeholder),
2190 )
2191 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2192 // We can always weaken the retries guarantee
2193 let target_location = self.location.drop_consistency();
2194 Stream::new(
2195 target_location.clone(),
2196 HydroNode::Cast {
2197 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2198 metadata: target_location
2199 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2200 },
2201 )
2202 } else {
2203 let target_location = self.location.drop_consistency();
2204 Stream::new(
2205 target_location.clone(),
2206 HydroNode::ObserveNonDet {
2207 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2208 trusted: false,
2209 metadata: target_location
2210 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2211 },
2212 )
2213 }
2214 }
2215
2216 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2217 // is not observable
2218 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2219 if R::RETRIES_KIND == R2::RETRIES_KIND {
2220 Stream::new(
2221 self.location.clone(),
2222 self.ir_node.replace(HydroNode::Placeholder),
2223 )
2224 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2225 // We can always weaken the retries guarantee
2226 Stream::new(
2227 self.location.clone(),
2228 HydroNode::Cast {
2229 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2230 metadata: self
2231 .location
2232 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2233 },
2234 )
2235 } else {
2236 Stream::new(
2237 self.location.clone(),
2238 HydroNode::ObserveNonDet {
2239 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2240 trusted: true,
2241 metadata: self
2242 .location
2243 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2244 },
2245 )
2246 }
2247 }
2248
2249 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2250 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2251 /// which is always safe because that is the weakest possible guarantee.
2252 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2253 self.weaken_retries::<AtLeastOnce>()
2254 }
2255
2256 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2257 /// enforcing that `R2` is weaker than the input retries guarantee.
2258 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2259 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2260 self.assume_retries_trusted::<R2>(nondet)
2261 }
2262
2263 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2264 /// implies that `R == ExactlyOnce`.
2265 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2266 where
2267 R: IsExactlyOnce,
2268 {
2269 self.assume_retries_trusted(nondet!(/** no-op */))
2270 }
2271
2272 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2273 /// implies that `B == Bounded`.
2274 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2275 where
2276 B: IsBounded,
2277 {
2278 self.weaken_boundedness()
2279 }
2280
2281 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2282 /// which implies that `B == Bounded`.
2283 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2284 if B::BOUNDED == B2::BOUNDED {
2285 Stream::new(
2286 self.location.clone(),
2287 self.ir_node.replace(HydroNode::Placeholder),
2288 )
2289 } else {
2290 // We can always weaken the boundedness
2291 Stream::new(
2292 self.location.clone(),
2293 HydroNode::Cast {
2294 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2295 metadata: self
2296 .location
2297 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2298 },
2299 )
2300 }
2301 }
2302}
2303
2304impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2305where
2306 L: Location<'a>,
2307{
2308 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2309 ///
2310 /// # Example
2311 /// ```rust
2312 /// # #[cfg(feature = "deploy")] {
2313 /// # use hydro_lang::prelude::*;
2314 /// # use futures::StreamExt;
2315 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2316 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2317 /// # }, |mut stream| async move {
2318 /// // 1, 2, 3
2319 /// # for w in vec![1, 2, 3] {
2320 /// # assert_eq!(stream.next().await.unwrap(), w);
2321 /// # }
2322 /// # }));
2323 /// # }
2324 /// ```
2325 pub fn cloned(self) -> Stream<T, L, B, O, R>
2326 where
2327 T: Clone,
2328 {
2329 self.map(q!(|d| d.clone()))
2330 }
2331}
2332
2333impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2334where
2335 L: Location<'a>,
2336{
2337 /// Computes the number of elements in the stream as a [`Singleton`].
2338 ///
2339 /// # Example
2340 /// ```rust
2341 /// # #[cfg(feature = "deploy")] {
2342 /// # use hydro_lang::prelude::*;
2343 /// # use futures::StreamExt;
2344 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2345 /// let tick = process.tick();
2346 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2347 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2348 /// batch.count().all_ticks()
2349 /// # }, |mut stream| async move {
2350 /// // 4
2351 /// # assert_eq!(stream.next().await.unwrap(), 4);
2352 /// # }));
2353 /// # }
2354 /// ```
2355 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2356 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2357 /// Order does not affect eventual count, and also does not affect intermediate states.
2358 ))
2359 .fold(
2360 q!(|| 0usize),
2361 q!(
2362 |count, _| *count += 1,
2363 monotone = manual_proof!(/** += 1 is monotone */)
2364 ),
2365 )
2366 }
2367}
2368
2369impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2370 /// Produces a new stream that merges the elements of the two input streams.
2371 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2372 ///
2373 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2374 /// [`Bounded`], you can use [`Stream::chain`] instead.
2375 ///
2376 /// # Example
2377 /// ```rust
2378 /// # #[cfg(feature = "deploy")] {
2379 /// # use hydro_lang::prelude::*;
2380 /// # use futures::StreamExt;
2381 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2382 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2383 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2384 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2385 /// # }, |mut stream| async move {
2386 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2387 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2388 /// # assert_eq!(stream.next().await.unwrap(), w);
2389 /// # }
2390 /// # }));
2391 /// # }
2392 /// ```
2393 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2394 self,
2395 other: Stream<T, L, Unbounded, O2, R2>,
2396 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2397 where
2398 R: MinRetries<R2>,
2399 {
2400 Stream::new(
2401 self.location.clone(),
2402 HydroNode::Chain {
2403 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2404 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2405 metadata: self.location.new_node_metadata(Stream::<
2406 T,
2407 L,
2408 Unbounded,
2409 NoOrder,
2410 <R as MinRetries<R2>>::Min,
2411 >::collection_kind()),
2412 },
2413 )
2414 }
2415
2416 /// Deprecated: use [`Stream::merge_unordered`] instead.
2417 #[deprecated(note = "use `merge_unordered` instead")]
2418 pub fn interleave<O2: Ordering, R2: Retries>(
2419 self,
2420 other: Stream<T, L, Unbounded, O2, R2>,
2421 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2422 where
2423 R: MinRetries<R2>,
2424 {
2425 self.merge_unordered(other)
2426 }
2427}
2428
2429impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2430 /// Produces a new stream that combines the elements of the two input streams,
2431 /// preserving the relative order of elements within each input.
2432 ///
2433 /// # Non-Determinism
2434 /// The order in which elements *across* the two streams will be interleaved is
2435 /// non-deterministic, so the order of elements will vary across runs. If the output
2436 /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2437 /// but emits an unordered stream. For deterministic first-then-second ordering on
2438 /// bounded streams, use [`Stream::chain`].
2439 ///
2440 /// # Example
2441 /// ```rust
2442 /// # #[cfg(feature = "deploy")] {
2443 /// # use hydro_lang::prelude::*;
2444 /// # use futures::StreamExt;
2445 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2446 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2447 /// # process.source_iter(q!(vec![1, 3])).into();
2448 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2449 /// # }, |mut stream| async move {
2450 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2451 /// # for w in vec![1, 3, 2, 4] {
2452 /// # assert_eq!(stream.next().await.unwrap(), w);
2453 /// # }
2454 /// # }));
2455 /// # }
2456 /// ```
2457 pub fn merge_ordered<R2: Retries>(
2458 self,
2459 other: Stream<T, L, B, TotalOrder, R2>,
2460 _nondet: NonDet,
2461 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2462 where
2463 R: MinRetries<R2>,
2464 {
2465 let target_location = self.location().drop_consistency();
2466 Stream::new(
2467 target_location.clone(),
2468 HydroNode::MergeOrdered {
2469 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2470 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2471 metadata: target_location.new_node_metadata(Stream::<
2472 T,
2473 L::DropConsistency,
2474 B,
2475 TotalOrder,
2476 <R as MinRetries<R2>>::Min,
2477 >::collection_kind()),
2478 },
2479 )
2480 }
2481}
2482
2483impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2484where
2485 L: Location<'a>,
2486{
2487 /// Produces a new stream that emits the input elements in sorted order.
2488 ///
2489 /// The input stream can have any ordering guarantee, but the output stream
2490 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2491 /// elements in the input stream are available, so it requires the input stream
2492 /// to be [`Bounded`].
2493 ///
2494 /// # Example
2495 /// ```rust
2496 /// # #[cfg(feature = "deploy")] {
2497 /// # use hydro_lang::prelude::*;
2498 /// # use futures::StreamExt;
2499 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2500 /// let tick = process.tick();
2501 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2502 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2503 /// batch.sort().all_ticks()
2504 /// # }, |mut stream| async move {
2505 /// // 1, 2, 3, 4
2506 /// # for w in (1..5) {
2507 /// # assert_eq!(stream.next().await.unwrap(), w);
2508 /// # }
2509 /// # }));
2510 /// # }
2511 /// ```
2512 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2513 where
2514 B: IsBounded,
2515 T: Ord,
2516 {
2517 let this = self.make_bounded();
2518 Stream::new(
2519 this.location.clone(),
2520 HydroNode::Sort {
2521 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2522 metadata: this
2523 .location
2524 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2525 },
2526 )
2527 }
2528
2529 /// Produces a new stream that first emits the elements of the `self` stream,
2530 /// and then emits the elements of the `other` stream. The output stream has
2531 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2532 /// [`TotalOrder`] guarantee.
2533 ///
2534 /// Currently, both input streams must be [`Bounded`]. This operator will block
2535 /// on the first stream until all its elements are available. In a future version,
2536 /// we will relax the requirement on the `other` stream.
2537 ///
2538 /// # Example
2539 /// ```rust
2540 /// # #[cfg(feature = "deploy")] {
2541 /// # use hydro_lang::prelude::*;
2542 /// # use futures::StreamExt;
2543 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2544 /// let tick = process.tick();
2545 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2546 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2547 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2548 /// # }, |mut stream| async move {
2549 /// // 2, 3, 4, 5, 1, 2, 3, 4
2550 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2551 /// # assert_eq!(stream.next().await.unwrap(), w);
2552 /// # }
2553 /// # }));
2554 /// # }
2555 /// ```
2556 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2557 self,
2558 other: Stream<T, L, B2, O2, R2>,
2559 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2560 where
2561 B: IsBounded,
2562 O: MinOrder<O2>,
2563 R: MinRetries<R2>,
2564 {
2565 check_matching_location(&self.location, &other.location);
2566
2567 Stream::new(
2568 self.location.clone(),
2569 HydroNode::Chain {
2570 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2571 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2572 metadata: self.location.new_node_metadata(Stream::<
2573 T,
2574 L,
2575 B2,
2576 <O as MinOrder<O2>>::Min,
2577 <R as MinRetries<R2>>::Min,
2578 >::collection_kind()),
2579 },
2580 )
2581 }
2582
2583 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2584 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2585 /// because this is compiled into a nested loop.
2586 #[expect(
2587 clippy::type_complexity,
2588 reason = "MinRetries projection in return type"
2589 )]
2590 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2591 self,
2592 other: Stream<T2, L, Bounded, O2, R2>,
2593 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2594 where
2595 B: IsBounded,
2596 T: Clone,
2597 T2: Clone,
2598 R: MinRetries<R2>,
2599 {
2600 let this = self.make_bounded();
2601 check_matching_location(&this.location, &other.location);
2602
2603 Stream::new(
2604 this.location.clone(),
2605 HydroNode::CrossProduct {
2606 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2607 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2608 metadata: this.location.new_node_metadata(Stream::<
2609 (T, T2),
2610 L,
2611 Bounded,
2612 <O2 as MinOrder<O>>::Min,
2613 <R as MinRetries<R2>>::Min,
2614 >::collection_kind()),
2615 },
2616 )
2617 }
2618
2619 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2620 /// `self` used as the values for *each* key.
2621 ///
2622 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2623 /// values. For example, it can be used to send the same set of elements to several cluster
2624 /// members, if the membership information is available as a [`KeyedSingleton`].
2625 ///
2626 /// # Example
2627 /// ```rust
2628 /// # #[cfg(feature = "deploy")] {
2629 /// # use hydro_lang::prelude::*;
2630 /// # use futures::StreamExt;
2631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2632 /// # let tick = process.tick();
2633 /// let keyed_singleton = // { 1: (), 2: () }
2634 /// # process
2635 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2636 /// # .into_keyed()
2637 /// # .batch(&tick, nondet!(/** test */))
2638 /// # .first();
2639 /// let stream = // [ "a", "b" ]
2640 /// # process
2641 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2642 /// # .batch(&tick, nondet!(/** test */));
2643 /// stream.repeat_with_keys(keyed_singleton)
2644 /// # .entries().all_ticks()
2645 /// # }, |mut stream| async move {
2646 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2647 /// # let mut results = Vec::new();
2648 /// # for _ in 0..4 {
2649 /// # results.push(stream.next().await.unwrap());
2650 /// # }
2651 /// # results.sort();
2652 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2653 /// # }));
2654 /// # }
2655 /// ```
2656 pub fn repeat_with_keys<K, V2>(
2657 self,
2658 keys: KeyedSingleton<K, V2, L, Bounded>,
2659 ) -> KeyedStream<K, T, L, Bounded, O, R>
2660 where
2661 B: IsBounded,
2662 K: Clone,
2663 T: Clone,
2664 {
2665 keys.keys()
2666 .assume_ordering_trusted::<TotalOrder>(
2667 nondet!(/** keyed stream does not depend on ordering of keys */),
2668 )
2669 .cross_product_nested_loop(self.make_bounded())
2670 .into_keyed()
2671 }
2672
2673 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2674 /// execution until all results are available. The output order is based on when futures
2675 /// complete, and may be different than the input order.
2676 ///
2677 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2678 /// while futures are pending, this variant blocks until the futures resolve.
2679 ///
2680 /// # Example
2681 /// ```rust
2682 /// # #[cfg(feature = "deploy")] {
2683 /// # use std::collections::HashSet;
2684 /// # use futures::StreamExt;
2685 /// # use hydro_lang::prelude::*;
2686 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2687 /// process
2688 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2689 /// .map(q!(|x| async move {
2690 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2691 /// x
2692 /// }))
2693 /// .resolve_futures_blocking()
2694 /// # },
2695 /// # |mut stream| async move {
2696 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2697 /// # let mut output = HashSet::new();
2698 /// # for _ in 1..10 {
2699 /// # output.insert(stream.next().await.unwrap());
2700 /// # }
2701 /// # assert_eq!(
2702 /// # output,
2703 /// # HashSet::<i32>::from_iter(1..10)
2704 /// # );
2705 /// # },
2706 /// # ));
2707 /// # }
2708 /// ```
2709 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2710 where
2711 T: Future,
2712 {
2713 Stream::new(
2714 self.location.clone(),
2715 HydroNode::ResolveFuturesBlocking {
2716 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2717 metadata: self
2718 .location
2719 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2720 },
2721 )
2722 }
2723
2724 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2725 ///
2726 /// # Example
2727 /// ```rust
2728 /// # #[cfg(feature = "deploy")] {
2729 /// # use hydro_lang::prelude::*;
2730 /// # use futures::StreamExt;
2731 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2732 /// let tick = process.tick();
2733 /// let empty: Stream<i32, _, Bounded> = process
2734 /// .source_iter(q!(Vec::<i32>::new()))
2735 /// .batch(&tick, nondet!(/** test */));
2736 /// empty.is_empty().all_ticks()
2737 /// # }, |mut stream| async move {
2738 /// // true
2739 /// # assert_eq!(stream.next().await.unwrap(), true);
2740 /// # }));
2741 /// # }
2742 /// ```
2743 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2744 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2745 where
2746 B: IsBounded,
2747 {
2748 self.make_bounded()
2749 .assume_ordering_trusted::<TotalOrder>(
2750 nondet!(/** is_empty intermediates unaffected by order */),
2751 )
2752 .first()
2753 .is_none()
2754 }
2755}
2756
2757impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2758where
2759 L: Location<'a>,
2760{
2761 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2762 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2763 /// by equi-joining the two streams on the key attribute `K`.
2764 ///
2765 /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2766 /// and streams the left side through, preserving the left side's ordering. When both
2767 /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2768 ///
2769 /// # Example
2770 /// ```rust
2771 /// # #[cfg(feature = "deploy")] {
2772 /// # use hydro_lang::prelude::*;
2773 /// # use std::collections::HashSet;
2774 /// # use futures::StreamExt;
2775 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2776 /// let tick = process.tick();
2777 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2778 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2779 /// stream1.join(stream2)
2780 /// # }, |mut stream| async move {
2781 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2782 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2783 /// # stream.map(|i| assert!(expected.contains(&i)));
2784 /// # }));
2785 /// # }
2786 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2787 self,
2788 n: Stream<(K, V2), L, B2, O2, R2>,
2789 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2790 where
2791 K: Eq + Hash + Clone,
2792 R: MinRetries<R2>,
2793 V1: Clone,
2794 V2: Clone,
2795 {
2796 check_matching_location(&self.location, &n.location);
2797
2798 let ir_node = if B2::BOUNDED {
2799 HydroNode::JoinHalf {
2800 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2801 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2802 metadata: self.location.new_node_metadata(Stream::<
2803 (K, (V1, V2)),
2804 L,
2805 B,
2806 B2::PreserveOrderIfBounded<O>,
2807 <R as MinRetries<R2>>::Min,
2808 >::collection_kind()),
2809 }
2810 } else {
2811 HydroNode::Join {
2812 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2813 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2814 metadata: self.location.new_node_metadata(Stream::<
2815 (K, (V1, V2)),
2816 L,
2817 B,
2818 B2::PreserveOrderIfBounded<O>,
2819 <R as MinRetries<R2>>::Min,
2820 >::collection_kind()),
2821 }
2822 };
2823
2824 Stream::new(self.location.clone(), ir_node)
2825 }
2826
2827 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2828 /// computes the anti-join of the items in the input -- i.e. returns
2829 /// unique items in the first input that do not have a matching key
2830 /// in the second input.
2831 ///
2832 /// # Example
2833 /// ```rust
2834 /// # #[cfg(feature = "deploy")] {
2835 /// # use hydro_lang::prelude::*;
2836 /// # use futures::StreamExt;
2837 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2838 /// let tick = process.tick();
2839 /// let stream = process
2840 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2841 /// .batch(&tick, nondet!(/** test */));
2842 /// let batch = process
2843 /// .source_iter(q!(vec![1, 2]))
2844 /// .batch(&tick, nondet!(/** test */));
2845 /// stream.anti_join(batch).all_ticks()
2846 /// # }, |mut stream| async move {
2847 /// # for w in vec![(3, 'c'), (4, 'd')] {
2848 /// # assert_eq!(stream.next().await.unwrap(), w);
2849 /// # }
2850 /// # }));
2851 /// # }
2852 pub fn anti_join<O2: Ordering, R2: Retries>(
2853 self,
2854 n: Stream<K, L, Bounded, O2, R2>,
2855 ) -> Stream<(K, V1), L, B, O, R>
2856 where
2857 K: Eq + Hash,
2858 {
2859 check_matching_location(&self.location, &n.location);
2860
2861 Stream::new(
2862 self.location.clone(),
2863 HydroNode::AntiJoin {
2864 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2865 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2866 metadata: self
2867 .location
2868 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2869 },
2870 )
2871 }
2872}
2873
2874impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2875 Stream<(K, V), L, B, O, R>
2876{
2877 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2878 /// is used as the key and the second element is added to the entries associated with that key.
2879 ///
2880 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2881 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2882 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2883 /// total ordering _within_ each group but no ordering _across_ groups.
2884 ///
2885 /// # Example
2886 /// ```rust
2887 /// # #[cfg(feature = "deploy")] {
2888 /// # use hydro_lang::prelude::*;
2889 /// # use futures::StreamExt;
2890 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2891 /// process
2892 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2893 /// .into_keyed()
2894 /// # .entries()
2895 /// # }, |mut stream| async move {
2896 /// // { 1: [2, 3], 2: [4] }
2897 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2898 /// # assert_eq!(stream.next().await.unwrap(), w);
2899 /// # }
2900 /// # }));
2901 /// # }
2902 /// ```
2903 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2904 KeyedStream::new(
2905 self.location.clone(),
2906 HydroNode::Cast {
2907 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2908 metadata: self
2909 .location
2910 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2911 },
2912 )
2913 }
2914}
2915
2916impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2917where
2918 K: Eq + Hash,
2919 L: Location<'a>,
2920{
2921 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2922 /// # Example
2923 /// ```rust
2924 /// # #[cfg(feature = "deploy")] {
2925 /// # use hydro_lang::prelude::*;
2926 /// # use futures::StreamExt;
2927 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2928 /// let tick = process.tick();
2929 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2930 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2931 /// batch.keys().all_ticks()
2932 /// # }, |mut stream| async move {
2933 /// // 1, 2
2934 /// # assert_eq!(stream.next().await.unwrap(), 1);
2935 /// # assert_eq!(stream.next().await.unwrap(), 2);
2936 /// # }));
2937 /// # }
2938 /// ```
2939 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2940 self.into_keyed()
2941 .fold(
2942 q!(|| ()),
2943 q!(
2944 |_, _| {},
2945 commutative = manual_proof!(/** values are ignored */),
2946 idempotent = manual_proof!(/** values are ignored */)
2947 ),
2948 )
2949 .keys()
2950 }
2951}
2952
2953impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2954where
2955 L: Location<'a>,
2956{
2957 /// Returns a stream corresponding to the latest batch of elements being atomically
2958 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2959 /// the order of the input.
2960 ///
2961 /// # Non-Determinism
2962 /// The batch boundaries are non-deterministic and may change across executions.
2963 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2964 self,
2965 tick: &Tick<L2>,
2966 _nondet: NonDet,
2967 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2968 Stream::new(
2969 tick.drop_consistency(),
2970 HydroNode::Batch {
2971 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2972 metadata: tick
2973 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2974 },
2975 )
2976 }
2977
2978 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2979 /// See [`Stream::atomic`] for more details.
2980 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2981 Stream::new(
2982 self.location.tick.l.clone(),
2983 HydroNode::EndAtomic {
2984 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2985 metadata: self
2986 .location
2987 .tick
2988 .l
2989 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2990 },
2991 )
2992 }
2993}
2994
2995impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2996where
2997 L: TopLevel<'a>,
2998 F: Future<Output = T>,
2999{
3000 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3001 /// Future outputs are produced as available, regardless of input arrival order.
3002 ///
3003 /// # Example
3004 /// ```rust
3005 /// # #[cfg(feature = "deploy")] {
3006 /// # use std::collections::HashSet;
3007 /// # use futures::StreamExt;
3008 /// # use hydro_lang::prelude::*;
3009 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3010 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3011 /// .map(q!(|x| async move {
3012 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3013 /// x
3014 /// }))
3015 /// .resolve_futures()
3016 /// # },
3017 /// # |mut stream| async move {
3018 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3019 /// # let mut output = HashSet::new();
3020 /// # for _ in 1..10 {
3021 /// # output.insert(stream.next().await.unwrap());
3022 /// # }
3023 /// # assert_eq!(
3024 /// # output,
3025 /// # HashSet::<i32>::from_iter(1..10)
3026 /// # );
3027 /// # },
3028 /// # ));
3029 /// # }
3030 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3031 Stream::new(
3032 self.location.clone(),
3033 HydroNode::ResolveFutures {
3034 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3035 metadata: self
3036 .location
3037 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3038 },
3039 )
3040 }
3041
3042 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3043 /// Future outputs are produced in the same order as the input stream.
3044 ///
3045 /// # Example
3046 /// ```rust
3047 /// # #[cfg(feature = "deploy")] {
3048 /// # use std::collections::HashSet;
3049 /// # use futures::StreamExt;
3050 /// # use hydro_lang::prelude::*;
3051 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3052 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3053 /// .map(q!(|x| async move {
3054 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3055 /// x
3056 /// }))
3057 /// .resolve_futures_ordered()
3058 /// # },
3059 /// # |mut stream| async move {
3060 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3061 /// # let mut output = Vec::new();
3062 /// # for _ in 1..10 {
3063 /// # output.push(stream.next().await.unwrap());
3064 /// # }
3065 /// # assert_eq!(
3066 /// # output,
3067 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3068 /// # );
3069 /// # },
3070 /// # ));
3071 /// # }
3072 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3073 Stream::new(
3074 self.location.clone(),
3075 HydroNode::ResolveFuturesOrdered {
3076 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3077 metadata: self
3078 .location
3079 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3080 },
3081 )
3082 }
3083}
3084
3085impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3086where
3087 L: Location<'a>,
3088{
3089 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3090 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3091 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3092 Stream::new(
3093 self.location.outer().clone(),
3094 HydroNode::YieldConcat {
3095 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3096 metadata: self
3097 .location
3098 .outer()
3099 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3100 },
3101 )
3102 }
3103
3104 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3105 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3106 ///
3107 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3108 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3109 /// stream's [`Tick`] context.
3110 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3111 let out_location = Atomic {
3112 tick: self.location.clone(),
3113 };
3114
3115 Stream::new(
3116 out_location.clone(),
3117 HydroNode::YieldConcat {
3118 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3119 metadata: out_location
3120 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3121 },
3122 )
3123 }
3124
3125 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3126 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3127 /// input.
3128 ///
3129 /// This API is particularly useful for stateful computation on batches of data, such as
3130 /// maintaining an accumulated state that is up to date with the current batch.
3131 ///
3132 /// # Example
3133 /// ```rust
3134 /// # #[cfg(feature = "deploy")] {
3135 /// # use hydro_lang::prelude::*;
3136 /// # use futures::StreamExt;
3137 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3138 /// let tick = process.tick();
3139 /// # // ticks are lazy by default, forces the second tick to run
3140 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3141 /// # let batch_first_tick = process
3142 /// # .source_iter(q!(vec![1, 2, 3, 4]))
3143 /// # .batch(&tick, nondet!(/** test */));
3144 /// # let batch_second_tick = process
3145 /// # .source_iter(q!(vec![5, 6, 7]))
3146 /// # .batch(&tick, nondet!(/** test */))
3147 /// # .defer_tick(); // appears on the second tick
3148 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3149 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3150 ///
3151 /// input.batch(&tick, nondet!(/** test */))
3152 /// .across_ticks(|s| s.count()).all_ticks()
3153 /// # }, |mut stream| async move {
3154 /// // [4, 7]
3155 /// assert_eq!(stream.next().await.unwrap(), 4);
3156 /// assert_eq!(stream.next().await.unwrap(), 7);
3157 /// # }));
3158 /// # }
3159 /// ```
3160 pub fn across_ticks<Out: BatchAtomic<'a>>(
3161 self,
3162 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3163 ) -> Out::Batched {
3164 thunk(self.all_ticks_atomic()).batched_atomic()
3165 }
3166
3167 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3168 /// always has the elements of `self` at tick `T - 1`.
3169 ///
3170 /// At tick `0`, the output stream is empty, since there is no previous tick.
3171 ///
3172 /// This operator enables stateful iterative processing with ticks, by sending data from one
3173 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3174 ///
3175 /// # Example
3176 /// ```rust
3177 /// # #[cfg(feature = "deploy")] {
3178 /// # use hydro_lang::prelude::*;
3179 /// # use futures::StreamExt;
3180 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3181 /// let tick = process.tick();
3182 /// // ticks are lazy by default, forces the second tick to run
3183 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3184 ///
3185 /// let batch_first_tick = process
3186 /// .source_iter(q!(vec![1, 2, 3, 4]))
3187 /// .batch(&tick, nondet!(/** test */));
3188 /// let batch_second_tick = process
3189 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
3190 /// .batch(&tick, nondet!(/** test */))
3191 /// .defer_tick(); // appears on the second tick
3192 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3193 ///
3194 /// changes_across_ticks.clone().filter_not_in(
3195 /// changes_across_ticks.defer_tick() // the elements from the previous tick
3196 /// ).all_ticks()
3197 /// # }, |mut stream| async move {
3198 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3199 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3200 /// # assert_eq!(stream.next().await.unwrap(), w);
3201 /// # }
3202 /// # }));
3203 /// # }
3204 /// ```
3205 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3206 Stream::new(
3207 self.location.clone(),
3208 HydroNode::DeferTick {
3209 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3210 metadata: self
3211 .location
3212 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3213 },
3214 )
3215 }
3216}
3217
3218#[cfg(test)]
3219mod tests {
3220 #[cfg(feature = "deploy")]
3221 use futures::{SinkExt, StreamExt};
3222 #[cfg(feature = "deploy")]
3223 use hydro_deploy::Deployment;
3224 #[cfg(feature = "deploy")]
3225 use serde::{Deserialize, Serialize};
3226 #[cfg(any(feature = "deploy", feature = "sim"))]
3227 use stageleft::q;
3228
3229 #[cfg(any(feature = "deploy", feature = "sim"))]
3230 use crate::compile::builder::FlowBuilder;
3231 #[cfg(feature = "deploy")]
3232 use crate::live_collections::sliced::sliced;
3233 #[cfg(feature = "deploy")]
3234 use crate::live_collections::stream::ExactlyOnce;
3235 #[cfg(feature = "sim")]
3236 use crate::live_collections::stream::NoOrder;
3237 #[cfg(any(feature = "deploy", feature = "sim"))]
3238 use crate::live_collections::stream::TotalOrder;
3239 #[cfg(any(feature = "deploy", feature = "sim"))]
3240 use crate::location::Location;
3241 #[cfg(feature = "sim")]
3242 use crate::networking::TCP;
3243 #[cfg(any(feature = "deploy", feature = "sim"))]
3244 use crate::nondet::nondet;
3245
3246 mod backtrace_chained_ops;
3247
3248 #[cfg(feature = "deploy")]
3249 struct P1 {}
3250 #[cfg(feature = "deploy")]
3251 struct P2 {}
3252
3253 #[cfg(feature = "deploy")]
3254 #[derive(Serialize, Deserialize, Debug)]
3255 struct SendOverNetwork {
3256 n: u32,
3257 }
3258
3259 #[cfg(feature = "deploy")]
3260 #[tokio::test]
3261 async fn first_ten_distributed() {
3262 use crate::networking::TCP;
3263
3264 let mut deployment = Deployment::new();
3265
3266 let mut flow = FlowBuilder::new();
3267 let first_node = flow.process::<P1>();
3268 let second_node = flow.process::<P2>();
3269 let external = flow.external::<P2>();
3270
3271 let numbers = first_node.source_iter(q!(0..10));
3272 let out_port = numbers
3273 .map(q!(|n| SendOverNetwork { n }))
3274 .send(&second_node, TCP.fail_stop().bincode())
3275 .send_bincode_external(&external);
3276
3277 let nodes = flow
3278 .with_process(&first_node, deployment.Localhost())
3279 .with_process(&second_node, deployment.Localhost())
3280 .with_external(&external, deployment.Localhost())
3281 .deploy(&mut deployment);
3282
3283 deployment.deploy().await.unwrap();
3284
3285 let mut external_out = nodes.connect(out_port).await;
3286
3287 deployment.start().await.unwrap();
3288
3289 for i in 0..10 {
3290 assert_eq!(external_out.next().await.unwrap().n, i);
3291 }
3292 }
3293
3294 #[cfg(feature = "deploy")]
3295 #[tokio::test]
3296 async fn first_cardinality() {
3297 let mut deployment = Deployment::new();
3298
3299 let mut flow = FlowBuilder::new();
3300 let node = flow.process::<()>();
3301 let external = flow.external::<()>();
3302
3303 let node_tick = node.tick();
3304 let count = node_tick
3305 .singleton(q!([1, 2, 3]))
3306 .into_stream()
3307 .flatten_ordered()
3308 .first()
3309 .into_stream()
3310 .count()
3311 .all_ticks()
3312 .send_bincode_external(&external);
3313
3314 let nodes = flow
3315 .with_process(&node, deployment.Localhost())
3316 .with_external(&external, deployment.Localhost())
3317 .deploy(&mut deployment);
3318
3319 deployment.deploy().await.unwrap();
3320
3321 let mut external_out = nodes.connect(count).await;
3322
3323 deployment.start().await.unwrap();
3324
3325 assert_eq!(external_out.next().await.unwrap(), 1);
3326 }
3327
3328 #[cfg(feature = "deploy")]
3329 #[tokio::test]
3330 async fn unbounded_reduce_remembers_state() {
3331 let mut deployment = Deployment::new();
3332
3333 let mut flow = FlowBuilder::new();
3334 let node = flow.process::<()>();
3335 let external = flow.external::<()>();
3336
3337 let (input_port, input) = node.source_external_bincode(&external);
3338 let out = input
3339 .reduce(q!(|acc, v| *acc += v))
3340 .sample_eager(nondet!(/** test */))
3341 .send_bincode_external(&external);
3342
3343 let nodes = flow
3344 .with_process(&node, deployment.Localhost())
3345 .with_external(&external, deployment.Localhost())
3346 .deploy(&mut deployment);
3347
3348 deployment.deploy().await.unwrap();
3349
3350 let mut external_in = nodes.connect(input_port).await;
3351 let mut external_out = nodes.connect(out).await;
3352
3353 deployment.start().await.unwrap();
3354
3355 external_in.send(1).await.unwrap();
3356 assert_eq!(external_out.next().await.unwrap(), 1);
3357
3358 external_in.send(2).await.unwrap();
3359 assert_eq!(external_out.next().await.unwrap(), 3);
3360 }
3361
3362 #[cfg(feature = "deploy")]
3363 #[tokio::test]
3364 async fn top_level_bounded_cross_singleton() {
3365 let mut deployment = Deployment::new();
3366
3367 let mut flow = FlowBuilder::new();
3368 let node = flow.process::<()>();
3369 let external = flow.external::<()>();
3370
3371 let (input_port, input) =
3372 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3373
3374 let out = input
3375 .cross_singleton(
3376 node.source_iter(q!(vec![1, 2, 3]))
3377 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3378 )
3379 .send_bincode_external(&external);
3380
3381 let nodes = flow
3382 .with_process(&node, deployment.Localhost())
3383 .with_external(&external, deployment.Localhost())
3384 .deploy(&mut deployment);
3385
3386 deployment.deploy().await.unwrap();
3387
3388 let mut external_in = nodes.connect(input_port).await;
3389 let mut external_out = nodes.connect(out).await;
3390
3391 deployment.start().await.unwrap();
3392
3393 external_in.send(1).await.unwrap();
3394 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3395
3396 external_in.send(2).await.unwrap();
3397 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3398 }
3399
3400 #[cfg(feature = "deploy")]
3401 #[tokio::test]
3402 async fn top_level_bounded_reduce_cardinality() {
3403 let mut deployment = Deployment::new();
3404
3405 let mut flow = FlowBuilder::new();
3406 let node = flow.process::<()>();
3407 let external = flow.external::<()>();
3408
3409 let (input_port, input) =
3410 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3411
3412 let out = sliced! {
3413 let input = use(input, nondet!(/** test */));
3414 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3415 input.cross_singleton(v.into_stream().count())
3416 }
3417 .send_bincode_external(&external);
3418
3419 let nodes = flow
3420 .with_process(&node, deployment.Localhost())
3421 .with_external(&external, deployment.Localhost())
3422 .deploy(&mut deployment);
3423
3424 deployment.deploy().await.unwrap();
3425
3426 let mut external_in = nodes.connect(input_port).await;
3427 let mut external_out = nodes.connect(out).await;
3428
3429 deployment.start().await.unwrap();
3430
3431 external_in.send(1).await.unwrap();
3432 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3433
3434 external_in.send(2).await.unwrap();
3435 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3436 }
3437
3438 #[cfg(feature = "deploy")]
3439 #[tokio::test]
3440 async fn top_level_bounded_into_singleton_cardinality() {
3441 let mut deployment = Deployment::new();
3442
3443 let mut flow = FlowBuilder::new();
3444 let node = flow.process::<()>();
3445 let external = flow.external::<()>();
3446
3447 let (input_port, input) =
3448 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3449
3450 let out = sliced! {
3451 let input = use(input, nondet!(/** test */));
3452 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3453 input.cross_singleton(v.into_stream().count())
3454 }
3455 .send_bincode_external(&external);
3456
3457 let nodes = flow
3458 .with_process(&node, deployment.Localhost())
3459 .with_external(&external, deployment.Localhost())
3460 .deploy(&mut deployment);
3461
3462 deployment.deploy().await.unwrap();
3463
3464 let mut external_in = nodes.connect(input_port).await;
3465 let mut external_out = nodes.connect(out).await;
3466
3467 deployment.start().await.unwrap();
3468
3469 external_in.send(1).await.unwrap();
3470 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3471
3472 external_in.send(2).await.unwrap();
3473 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3474 }
3475
3476 #[cfg(feature = "deploy")]
3477 #[tokio::test]
3478 async fn atomic_fold_replays_each_tick() {
3479 let mut deployment = Deployment::new();
3480
3481 let mut flow = FlowBuilder::new();
3482 let node = flow.process::<()>();
3483 let external = flow.external::<()>();
3484
3485 let (input_port, input) =
3486 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3487 let tick = node.tick();
3488
3489 let out = input
3490 .batch(&tick, nondet!(/** test */))
3491 .cross_singleton(
3492 node.source_iter(q!(vec![1, 2, 3]))
3493 .atomic()
3494 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3495 .snapshot_atomic(&tick, nondet!(/** test */)),
3496 )
3497 .all_ticks()
3498 .send_bincode_external(&external);
3499
3500 let nodes = flow
3501 .with_process(&node, deployment.Localhost())
3502 .with_external(&external, deployment.Localhost())
3503 .deploy(&mut deployment);
3504
3505 deployment.deploy().await.unwrap();
3506
3507 let mut external_in = nodes.connect(input_port).await;
3508 let mut external_out = nodes.connect(out).await;
3509
3510 deployment.start().await.unwrap();
3511
3512 external_in.send(1).await.unwrap();
3513 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3514
3515 external_in.send(2).await.unwrap();
3516 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3517 }
3518
3519 #[cfg(feature = "deploy")]
3520 #[tokio::test]
3521 async fn unbounded_scan_remembers_state() {
3522 let mut deployment = Deployment::new();
3523
3524 let mut flow = FlowBuilder::new();
3525 let node = flow.process::<()>();
3526 let external = flow.external::<()>();
3527
3528 let (input_port, input) = node.source_external_bincode(&external);
3529 let out = input
3530 .scan(
3531 q!(|| 0),
3532 q!(|acc, v| {
3533 *acc += v;
3534 Some(*acc)
3535 }),
3536 )
3537 .send_bincode_external(&external);
3538
3539 let nodes = flow
3540 .with_process(&node, deployment.Localhost())
3541 .with_external(&external, deployment.Localhost())
3542 .deploy(&mut deployment);
3543
3544 deployment.deploy().await.unwrap();
3545
3546 let mut external_in = nodes.connect(input_port).await;
3547 let mut external_out = nodes.connect(out).await;
3548
3549 deployment.start().await.unwrap();
3550
3551 external_in.send(1).await.unwrap();
3552 assert_eq!(external_out.next().await.unwrap(), 1);
3553
3554 external_in.send(2).await.unwrap();
3555 assert_eq!(external_out.next().await.unwrap(), 3);
3556 }
3557
3558 #[cfg(feature = "deploy")]
3559 #[tokio::test]
3560 async fn unbounded_enumerate_remembers_state() {
3561 let mut deployment = Deployment::new();
3562
3563 let mut flow = FlowBuilder::new();
3564 let node = flow.process::<()>();
3565 let external = flow.external::<()>();
3566
3567 let (input_port, input) = node.source_external_bincode(&external);
3568 let out = input.enumerate().send_bincode_external(&external);
3569
3570 let nodes = flow
3571 .with_process(&node, deployment.Localhost())
3572 .with_external(&external, deployment.Localhost())
3573 .deploy(&mut deployment);
3574
3575 deployment.deploy().await.unwrap();
3576
3577 let mut external_in = nodes.connect(input_port).await;
3578 let mut external_out = nodes.connect(out).await;
3579
3580 deployment.start().await.unwrap();
3581
3582 external_in.send(1).await.unwrap();
3583 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3584
3585 external_in.send(2).await.unwrap();
3586 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3587 }
3588
3589 #[cfg(feature = "deploy")]
3590 #[tokio::test]
3591 async fn unbounded_unique_remembers_state() {
3592 let mut deployment = Deployment::new();
3593
3594 let mut flow = FlowBuilder::new();
3595 let node = flow.process::<()>();
3596 let external = flow.external::<()>();
3597
3598 let (input_port, input) =
3599 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3600 let out = input.unique().send_bincode_external(&external);
3601
3602 let nodes = flow
3603 .with_process(&node, deployment.Localhost())
3604 .with_external(&external, deployment.Localhost())
3605 .deploy(&mut deployment);
3606
3607 deployment.deploy().await.unwrap();
3608
3609 let mut external_in = nodes.connect(input_port).await;
3610 let mut external_out = nodes.connect(out).await;
3611
3612 deployment.start().await.unwrap();
3613
3614 external_in.send(1).await.unwrap();
3615 assert_eq!(external_out.next().await.unwrap(), 1);
3616
3617 external_in.send(2).await.unwrap();
3618 assert_eq!(external_out.next().await.unwrap(), 2);
3619
3620 external_in.send(1).await.unwrap();
3621 external_in.send(3).await.unwrap();
3622 assert_eq!(external_out.next().await.unwrap(), 3);
3623 }
3624
3625 #[cfg(feature = "sim")]
3626 #[test]
3627 #[should_panic]
3628 fn sim_batch_nondet_size() {
3629 let mut flow = FlowBuilder::new();
3630 let node = flow.process::<()>();
3631
3632 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3633
3634 let tick = node.tick();
3635 let out_recv = input
3636 .batch(&tick, nondet!(/** test */))
3637 .count()
3638 .all_ticks()
3639 .sim_output();
3640
3641 flow.sim().exhaustive(async || {
3642 in_send.send(());
3643 in_send.send(());
3644 in_send.send(());
3645
3646 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3647 });
3648 }
3649
3650 #[cfg(feature = "sim")]
3651 #[test]
3652 fn sim_batch_preserves_order() {
3653 let mut flow = FlowBuilder::new();
3654 let node = flow.process::<()>();
3655
3656 let (in_send, input) = node.sim_input();
3657
3658 let tick = node.tick();
3659 let out_recv = input
3660 .batch(&tick, nondet!(/** test */))
3661 .all_ticks()
3662 .sim_output();
3663
3664 flow.sim().exhaustive(async || {
3665 in_send.send(1);
3666 in_send.send(2);
3667 in_send.send(3);
3668
3669 out_recv.assert_yields_only([1, 2, 3]).await;
3670 });
3671 }
3672
3673 #[cfg(feature = "sim")]
3674 #[test]
3675 #[should_panic]
3676 fn sim_batch_unordered_shuffles() {
3677 let mut flow = FlowBuilder::new();
3678 let node = flow.process::<()>();
3679
3680 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3681
3682 let tick = node.tick();
3683 let batch = input.batch(&tick, nondet!(/** test */));
3684 let out_recv = batch
3685 .clone()
3686 .min()
3687 .zip(batch.max())
3688 .all_ticks()
3689 .sim_output();
3690
3691 flow.sim().exhaustive(async || {
3692 in_send.send_many_unordered([1, 2, 3]);
3693
3694 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3695 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3696 }
3697 });
3698 }
3699
3700 #[cfg(feature = "sim")]
3701 #[test]
3702 fn sim_batch_unordered_shuffles_count() {
3703 let mut flow = FlowBuilder::new();
3704 let node = flow.process::<()>();
3705
3706 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3707
3708 let tick = node.tick();
3709 let batch = input.batch(&tick, nondet!(/** test */));
3710 let out_recv = batch.all_ticks().sim_output();
3711
3712 let instance_count = flow.sim().exhaustive(async || {
3713 in_send.send_many_unordered([1, 2, 3, 4]);
3714 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3715 });
3716
3717 assert_eq!(
3718 instance_count,
3719 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3720 )
3721 }
3722
3723 #[cfg(feature = "sim")]
3724 #[test]
3725 #[should_panic]
3726 fn sim_observe_order_batched() {
3727 let mut flow = FlowBuilder::new();
3728 let node = flow.process::<()>();
3729
3730 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3731
3732 let tick = node.tick();
3733 let batch = input.batch(&tick, nondet!(/** test */));
3734 let out_recv = batch
3735 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3736 .all_ticks()
3737 .sim_output();
3738
3739 flow.sim().exhaustive(async || {
3740 in_send.send_many_unordered([1, 2, 3, 4]);
3741 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3742 });
3743 }
3744
3745 #[cfg(feature = "sim")]
3746 #[test]
3747 fn sim_observe_order_batched_count() {
3748 let mut flow = FlowBuilder::new();
3749 let node = flow.process::<()>();
3750
3751 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3752
3753 let tick = node.tick();
3754 let batch = input.batch(&tick, nondet!(/** test */));
3755 let out_recv = batch
3756 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3757 .all_ticks()
3758 .sim_output();
3759
3760 let instance_count = flow.sim().exhaustive(async || {
3761 in_send.send_many_unordered([1, 2, 3, 4]);
3762 let _ = out_recv.collect::<Vec<_>>().await;
3763 });
3764
3765 assert_eq!(
3766 instance_count,
3767 192 // 4! * 2^{4 - 1}
3768 )
3769 }
3770
3771 #[cfg(feature = "sim")]
3772 #[test]
3773 fn sim_unordered_count_instance_count() {
3774 let mut flow = FlowBuilder::new();
3775 let node = flow.process::<()>();
3776
3777 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3778
3779 let tick = node.tick();
3780 let out_recv = input
3781 .count()
3782 .snapshot(&tick, nondet!(/** test */))
3783 .all_ticks()
3784 .sim_output();
3785
3786 let instance_count = flow.sim().exhaustive(async || {
3787 in_send.send_many_unordered([1, 2, 3, 4]);
3788 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3789 });
3790
3791 assert_eq!(
3792 instance_count,
3793 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3794 )
3795 }
3796
3797 #[cfg(feature = "sim")]
3798 #[test]
3799 fn sim_top_level_assume_ordering() {
3800 let mut flow = FlowBuilder::new();
3801 let node = flow.process::<()>();
3802
3803 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3804
3805 let out_recv = input
3806 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3807 .sim_output();
3808
3809 let instance_count = flow.sim().exhaustive(async || {
3810 in_send.send_many_unordered([1, 2, 3]);
3811 let mut out = out_recv.collect::<Vec<_>>().await;
3812 out.sort();
3813 assert_eq!(out, vec![1, 2, 3]);
3814 });
3815
3816 assert_eq!(instance_count, 6)
3817 }
3818
3819 #[cfg(feature = "sim")]
3820 #[test]
3821 fn sim_top_level_assume_ordering_cycle_back() {
3822 let mut flow = FlowBuilder::new();
3823 let node = flow.process::<()>();
3824 let node2 = flow.process::<()>();
3825
3826 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3827
3828 let (complete_cycle_back, cycle_back) =
3829 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3830 let ordered = input
3831 .merge_unordered(cycle_back)
3832 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3833 complete_cycle_back.complete(
3834 ordered
3835 .clone()
3836 .map(q!(|v| v + 1))
3837 .filter(q!(|v| v % 2 == 1))
3838 .send(&node2, TCP.fail_stop().bincode())
3839 .send(&node, TCP.fail_stop().bincode()),
3840 );
3841
3842 let out_recv = ordered.sim_output();
3843
3844 let mut saw = false;
3845 let instance_count = flow.sim().exhaustive(async || {
3846 in_send.send_many_unordered([0, 2]);
3847 let out = out_recv.collect::<Vec<_>>().await;
3848
3849 if out.starts_with(&[0, 1, 2]) {
3850 saw = true;
3851 }
3852 });
3853
3854 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3855 assert_eq!(instance_count, 6);
3856 }
3857
3858 #[cfg(feature = "sim")]
3859 #[test]
3860 fn sim_top_level_assume_ordering_cycle_back_tick() {
3861 let mut flow = FlowBuilder::new();
3862 let node = flow.process::<()>();
3863 let node2 = flow.process::<()>();
3864
3865 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3866
3867 let (complete_cycle_back, cycle_back) =
3868 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3869 let ordered = input
3870 .merge_unordered(cycle_back)
3871 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3872 complete_cycle_back.complete(
3873 ordered
3874 .clone()
3875 .batch(&node.tick(), nondet!(/** test */))
3876 .all_ticks()
3877 .map(q!(|v| v + 1))
3878 .filter(q!(|v| v % 2 == 1))
3879 .send(&node2, TCP.fail_stop().bincode())
3880 .send(&node, TCP.fail_stop().bincode()),
3881 );
3882
3883 let out_recv = ordered.sim_output();
3884
3885 let mut saw = false;
3886 let instance_count = flow.sim().exhaustive(async || {
3887 in_send.send_many_unordered([0, 2]);
3888 let out = out_recv.collect::<Vec<_>>().await;
3889
3890 if out.starts_with(&[0, 1, 2]) {
3891 saw = true;
3892 }
3893 });
3894
3895 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3896 assert_eq!(instance_count, 58);
3897 }
3898
3899 #[cfg(feature = "sim")]
3900 #[test]
3901 fn sim_top_level_assume_ordering_multiple() {
3902 let mut flow = FlowBuilder::new();
3903 let node = flow.process::<()>();
3904 let node2 = flow.process::<()>();
3905
3906 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3907 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3908
3909 let (complete_cycle_back, cycle_back) =
3910 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3911 let input1_ordered = input
3912 .clone()
3913 .merge_unordered(cycle_back)
3914 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3915 let foo = input1_ordered
3916 .clone()
3917 .map(q!(|v| v + 3))
3918 .weaken_ordering::<NoOrder>()
3919 .merge_unordered(input2)
3920 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3921
3922 complete_cycle_back.complete(
3923 foo.filter(q!(|v| *v == 3))
3924 .send(&node2, TCP.fail_stop().bincode())
3925 .send(&node, TCP.fail_stop().bincode()),
3926 );
3927
3928 let out_recv = input1_ordered.sim_output();
3929
3930 let mut saw = false;
3931 let instance_count = flow.sim().exhaustive(async || {
3932 in_send.send_many_unordered([0, 1]);
3933 let out = out_recv.collect::<Vec<_>>().await;
3934
3935 if out.starts_with(&[0, 3, 1]) {
3936 saw = true;
3937 }
3938 });
3939
3940 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3941 assert_eq!(instance_count, 24);
3942 }
3943
3944 #[cfg(feature = "sim")]
3945 #[test]
3946 fn sim_atomic_assume_ordering_cycle_back() {
3947 let mut flow = FlowBuilder::new();
3948 let node = flow.process::<()>();
3949 let node2 = flow.process::<()>();
3950
3951 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3952
3953 let (complete_cycle_back, cycle_back) =
3954 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3955 let ordered = input
3956 .merge_unordered(cycle_back)
3957 .atomic()
3958 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3959 .end_atomic();
3960 complete_cycle_back.complete(
3961 ordered
3962 .clone()
3963 .map(q!(|v| v + 1))
3964 .filter(q!(|v| v % 2 == 1))
3965 .send(&node2, TCP.fail_stop().bincode())
3966 .send(&node, TCP.fail_stop().bincode()),
3967 );
3968
3969 let out_recv = ordered.sim_output();
3970
3971 let instance_count = flow.sim().exhaustive(async || {
3972 in_send.send_many_unordered([0, 2]);
3973 let out = out_recv.collect::<Vec<_>>().await;
3974 assert_eq!(out.len(), 4);
3975 });
3976 assert_eq!(instance_count, 22);
3977 }
3978
3979 #[cfg(feature = "deploy")]
3980 #[tokio::test]
3981 async fn partition_evens_odds() {
3982 let mut deployment = Deployment::new();
3983
3984 let mut flow = FlowBuilder::new();
3985 let node = flow.process::<()>();
3986 let external = flow.external::<()>();
3987
3988 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3989 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3990 let evens_port = evens.send_bincode_external(&external);
3991 let odds_port = odds.send_bincode_external(&external);
3992
3993 let nodes = flow
3994 .with_process(&node, deployment.Localhost())
3995 .with_external(&external, deployment.Localhost())
3996 .deploy(&mut deployment);
3997
3998 deployment.deploy().await.unwrap();
3999
4000 let mut evens_out = nodes.connect(evens_port).await;
4001 let mut odds_out = nodes.connect(odds_port).await;
4002
4003 deployment.start().await.unwrap();
4004
4005 let mut even_results = Vec::new();
4006 for _ in 0..3 {
4007 even_results.push(evens_out.next().await.unwrap());
4008 }
4009 even_results.sort();
4010 assert_eq!(even_results, vec![2, 4, 6]);
4011
4012 let mut odd_results = Vec::new();
4013 for _ in 0..3 {
4014 odd_results.push(odds_out.next().await.unwrap());
4015 }
4016 odd_results.sort();
4017 assert_eq!(odd_results, vec![1, 3, 5]);
4018 }
4019
4020 #[cfg(feature = "deploy")]
4021 #[tokio::test]
4022 async fn unconsumed_inspect_still_runs() {
4023 use crate::deploy::DeployCrateWrapper;
4024
4025 let mut deployment = Deployment::new();
4026
4027 let mut flow = FlowBuilder::new();
4028 let node = flow.process::<()>();
4029
4030 // The return value of .inspect() is intentionally dropped.
4031 // Before the Null-root fix, this would silently do nothing.
4032 node.source_iter(q!(0..5))
4033 .inspect(q!(|x| println!("inspect: {}", x)));
4034
4035 let nodes = flow
4036 .with_process(&node, deployment.Localhost())
4037 .deploy(&mut deployment);
4038
4039 deployment.deploy().await.unwrap();
4040
4041 let mut stdout = nodes.get_process(&node).stdout();
4042
4043 deployment.start().await.unwrap();
4044
4045 let mut lines = Vec::new();
4046 for _ in 0..5 {
4047 lines.push(stdout.recv().await.unwrap());
4048 }
4049 lines.sort();
4050 assert_eq!(
4051 lines,
4052 vec![
4053 "inspect: 0",
4054 "inspect: 1",
4055 "inspect: 2",
4056 "inspect: 3",
4057 "inspect: 4",
4058 ]
4059 );
4060 }
4061
4062 #[cfg(feature = "sim")]
4063 #[test]
4064 fn sim_limit() {
4065 let mut flow = FlowBuilder::new();
4066 let node = flow.process::<()>();
4067
4068 let (in_send, input) = node.sim_input();
4069
4070 let out_recv = input.limit(q!(3)).sim_output();
4071
4072 flow.sim().exhaustive(async || {
4073 in_send.send(1);
4074 in_send.send(2);
4075 in_send.send(3);
4076 in_send.send(4);
4077 in_send.send(5);
4078
4079 out_recv.assert_yields_only([1, 2, 3]).await;
4080 });
4081 }
4082
4083 #[cfg(feature = "sim")]
4084 #[test]
4085 fn sim_limit_zero() {
4086 let mut flow = FlowBuilder::new();
4087 let node = flow.process::<()>();
4088
4089 let (in_send, input) = node.sim_input();
4090
4091 let out_recv = input.limit(q!(0)).sim_output();
4092
4093 flow.sim().exhaustive(async || {
4094 in_send.send(1);
4095 in_send.send(2);
4096
4097 out_recv.assert_yields_only::<i32, _>([]).await;
4098 });
4099 }
4100
4101 #[cfg(feature = "sim")]
4102 #[test]
4103 fn sim_merge_ordered() {
4104 let mut flow = FlowBuilder::new();
4105 let node = flow.process::<()>();
4106
4107 let (in_send, input) = node.sim_input();
4108 let (in_send2, input2) = node.sim_input();
4109
4110 let out_recv = input
4111 .merge_ordered(input2, nondet!(/** test */))
4112 .sim_output();
4113
4114 let mut saw_out_of_order = false;
4115 let instances = flow.sim().exhaustive(async || {
4116 in_send.send(1);
4117 in_send.send(2);
4118 in_send2.send(3);
4119 in_send2.send(4);
4120
4121 let out = out_recv.collect::<Vec<_>>().await;
4122
4123 if out == [1, 3, 2, 4] {
4124 saw_out_of_order = true;
4125 }
4126
4127 // Assert ordering preservation: elements from each input must
4128 // appear in their original relative order.
4129 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4130 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4131 assert_eq!(
4132 first_elements,
4133 vec![1, 2],
4134 "first input order violated: {:?}",
4135 out
4136 );
4137 assert_eq!(
4138 second_elements,
4139 vec![3, 4],
4140 "second input order violated: {:?}",
4141 out
4142 );
4143
4144 first_elements.append(&mut second_elements);
4145 first_elements.sort();
4146 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4147 });
4148
4149 assert!(saw_out_of_order);
4150 assert_eq!(instances, 6);
4151 }
4152
4153 /// Tests that merge_ordered passes through elements when only one input
4154 /// has data.
4155 #[cfg(feature = "sim")]
4156 #[test]
4157 fn sim_merge_ordered_one_empty() {
4158 let mut flow = FlowBuilder::new();
4159 let node = flow.process::<()>();
4160
4161 let (in_send, input) = node.sim_input();
4162 let (_in_send2, input2) = node.sim_input();
4163
4164 let out_recv = input
4165 .merge_ordered(input2, nondet!(/** test */))
4166 .sim_output();
4167
4168 let instances = flow.sim().exhaustive(async || {
4169 in_send.send(1);
4170 in_send.send(2);
4171
4172 let out = out_recv.collect::<Vec<_>>().await;
4173 assert_eq!(out, vec![1, 2]);
4174 });
4175
4176 // Only one possible interleaving when one input is empty
4177 assert_eq!(instances, 1);
4178 }
4179
4180 /// Tests that merge_ordered correctly handles feedback cycles.
4181 /// An element output from merge_ordered is filtered and cycled back to
4182 /// one of its inputs. The one-at-a-time release must allow the cycled-back
4183 /// element to arrive and potentially be emitted before elements still
4184 /// waiting on the other input.
4185 #[cfg(feature = "sim")]
4186 #[test]
4187 fn sim_merge_ordered_cycle_back() {
4188 let mut flow = FlowBuilder::new();
4189 let node = flow.process::<()>();
4190
4191 let (in_send, input) = node.sim_input();
4192
4193 // Create a forward ref for the cycle back
4194 let (complete_cycle_back, cycle_back) =
4195 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4196
4197 // merge_ordered: input (external) with cycle_back
4198 let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4199
4200 // Cycle back: elements equal to 1 get mapped to 10 and fed back
4201 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4202
4203 let out_recv = merged.sim_output();
4204
4205 // Send 1 and 2. Element 1 should cycle back as 10.
4206 // Valid orderings must have 1 before 10 (since 10 depends on 1).
4207 let mut saw_cycle_before_second = false;
4208 flow.sim().exhaustive(async || {
4209 in_send.send(1);
4210 in_send.send(2);
4211
4212 let out = out_recv.collect::<Vec<_>>().await;
4213
4214 // 10 must always come after 1 (causal dependency)
4215 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4216 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4217 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4218
4219 // Check if we see [1, 10, 2] — the cycled element beats the second input
4220 if out == [1, 10, 2] {
4221 saw_cycle_before_second = true;
4222 }
4223
4224 let mut sorted = out;
4225 sorted.sort();
4226 assert_eq!(sorted, vec![1, 2, 10]);
4227 });
4228
4229 assert!(
4230 saw_cycle_before_second,
4231 "never saw the cycled element arrive before the second input element"
4232 );
4233 }
4234
4235 /// Tests that merge_ordered correctly interleaves when one input has a
4236 /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4237 /// element 2 should be able to appear after b's elements.
4238 #[cfg(feature = "sim")]
4239 #[test]
4240 fn sim_merge_ordered_delayed() {
4241 let mut flow = FlowBuilder::new();
4242 let node = flow.process::<()>();
4243
4244 let (in_send, input) = node.sim_input();
4245 let (in_send2, input2) = node.sim_input();
4246
4247 let out_recv = input
4248 .merge_ordered(input2, nondet!(/** test */))
4249 .sim_output();
4250
4251 let mut saw_delayed_interleaving = false;
4252 flow.sim().exhaustive(async || {
4253 // Send 1 from a, and 3, 4 from b
4254 in_send.send(1);
4255 in_send2.send(3);
4256 in_send2.send(4);
4257
4258 // Collect what's available so far
4259 let first_batch = out_recv.collect::<Vec<_>>().await;
4260
4261 // Now send the delayed element 2 from a
4262 in_send.send(2);
4263 let second_batch = out_recv.collect::<Vec<_>>().await;
4264
4265 let mut all: Vec<_> = first_batch
4266 .iter()
4267 .chain(second_batch.iter())
4268 .copied()
4269 .collect();
4270
4271 // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4272 if all == [1, 3, 4, 2] {
4273 saw_delayed_interleaving = true;
4274 }
4275
4276 all.sort();
4277 assert_eq!(all, vec![1, 2, 3, 4]);
4278 });
4279
4280 assert!(saw_delayed_interleaving);
4281 }
4282
4283 /// Deploy test: merge_ordered with a delayed element on one input.
4284 /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4285 /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4286 /// both inputs are pulled and the delayed element arrives later.
4287 #[cfg(feature = "deploy")]
4288 #[tokio::test]
4289 async fn deploy_merge_ordered_delayed() {
4290 let mut deployment = Deployment::new();
4291
4292 let mut flow = FlowBuilder::new();
4293 let node = flow.process::<()>();
4294 let external = flow.external::<()>();
4295
4296 let (input_a_port, input_a) = node.source_external_bincode(&external);
4297 let (input_b_port, input_b) = node.source_external_bincode(&external);
4298
4299 let out = input_a
4300 .assume_ordering(nondet!(/** test */))
4301 .merge_ordered(
4302 input_b.assume_ordering(nondet!(/** test */)),
4303 nondet!(/** test */),
4304 )
4305 .send_bincode_external(&external);
4306
4307 let nodes = flow
4308 .with_process(&node, deployment.Localhost())
4309 .with_external(&external, deployment.Localhost())
4310 .deploy(&mut deployment);
4311
4312 deployment.deploy().await.unwrap();
4313
4314 let mut ext_a = nodes.connect(input_a_port).await;
4315 let mut ext_b = nodes.connect(input_b_port).await;
4316 let mut ext_out = nodes.connect(out).await;
4317
4318 deployment.start().await.unwrap();
4319
4320 // Send a=1, b=3, b=4
4321 ext_a.send(1).await.unwrap();
4322 ext_b.send(3).await.unwrap();
4323 ext_b.send(4).await.unwrap();
4324
4325 // Collect the first 3 elements
4326 let mut received = Vec::new();
4327 for _ in 0..3 {
4328 received.push(ext_out.next().await.unwrap());
4329 }
4330
4331 // Now send the delayed a=2
4332 ext_a.send(2).await.unwrap();
4333 received.push(ext_out.next().await.unwrap());
4334
4335 // All elements should be present
4336 received.sort();
4337 assert_eq!(received, vec![1, 2, 3, 4]);
4338 }
4339
4340 #[cfg(feature = "deploy")]
4341 #[tokio::test]
4342 async fn monotone_fold_threshold() {
4343 use crate::properties::manual_proof;
4344
4345 let mut deployment = Deployment::new();
4346
4347 let mut flow = FlowBuilder::new();
4348 let node = flow.process::<()>();
4349 let external = flow.external::<()>();
4350
4351 let in_unbounded: super::Stream<_, _> =
4352 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4353 let sum = in_unbounded.fold(
4354 q!(|| 0),
4355 q!(
4356 |sum, v| {
4357 *sum += v;
4358 },
4359 monotone = manual_proof!(/** test */)
4360 ),
4361 );
4362
4363 let threshold_out = sum
4364 .threshold_greater_or_equal(node.singleton(q!(7)))
4365 .send_bincode_external(&external);
4366
4367 let nodes = flow
4368 .with_process(&node, deployment.Localhost())
4369 .with_external(&external, deployment.Localhost())
4370 .deploy(&mut deployment);
4371
4372 deployment.deploy().await.unwrap();
4373
4374 let mut threshold_out = nodes.connect(threshold_out).await;
4375
4376 deployment.start().await.unwrap();
4377
4378 assert_eq!(threshold_out.next().await.unwrap(), 7);
4379 }
4380
4381 #[cfg(feature = "deploy")]
4382 #[tokio::test]
4383 async fn monotone_count_threshold() {
4384 let mut deployment = Deployment::new();
4385
4386 let mut flow = FlowBuilder::new();
4387 let node = flow.process::<()>();
4388 let external = flow.external::<()>();
4389
4390 let in_unbounded: super::Stream<_, _> =
4391 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4392 let sum = in_unbounded.count();
4393
4394 let threshold_out = sum
4395 .threshold_greater_or_equal(node.singleton(q!(3)))
4396 .send_bincode_external(&external);
4397
4398 let nodes = flow
4399 .with_process(&node, deployment.Localhost())
4400 .with_external(&external, deployment.Localhost())
4401 .deploy(&mut deployment);
4402
4403 deployment.deploy().await.unwrap();
4404
4405 let mut threshold_out = nodes.connect(threshold_out).await;
4406
4407 deployment.start().await.unwrap();
4408
4409 assert_eq!(threshold_out.next().await.unwrap(), 3);
4410 }
4411
4412 #[cfg(feature = "deploy")]
4413 #[tokio::test]
4414 async fn monotone_map_order_preserving_threshold() {
4415 use crate::properties::manual_proof;
4416
4417 let mut deployment = Deployment::new();
4418
4419 let mut flow = FlowBuilder::new();
4420 let node = flow.process::<()>();
4421 let external = flow.external::<()>();
4422
4423 let in_unbounded: super::Stream<_, _> =
4424 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4425 let sum = in_unbounded.fold(
4426 q!(|| 0),
4427 q!(
4428 |sum, v| {
4429 *sum += v;
4430 },
4431 monotone = manual_proof!(/** test */)
4432 ),
4433 );
4434
4435 // map with order_preserving should preserve monotonicity
4436 let doubled = sum.map(q!(
4437 |v| v * 2,
4438 order_preserving = manual_proof!(/** doubling preserves order */)
4439 ));
4440
4441 let threshold_out = doubled
4442 .threshold_greater_or_equal(node.singleton(q!(14)))
4443 .send_bincode_external(&external);
4444
4445 let nodes = flow
4446 .with_process(&node, deployment.Localhost())
4447 .with_external(&external, deployment.Localhost())
4448 .deploy(&mut deployment);
4449
4450 deployment.deploy().await.unwrap();
4451
4452 let mut threshold_out = nodes.connect(threshold_out).await;
4453
4454 deployment.start().await.unwrap();
4455
4456 assert_eq!(threshold_out.next().await.unwrap(), 14);
4457 }
4458
4459 // === Compile-time type tests for join/cross_product ordering ===
4460
4461 #[cfg(any(feature = "deploy", feature = "sim"))]
4462 mod join_ordering_type_tests {
4463 use crate::live_collections::boundedness::{Bounded, Unbounded};
4464 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4465 use crate::location::{Location, Process};
4466
4467 #[expect(dead_code, reason = "compile-time type test")]
4468 fn join_unbounded_with_bounded_preserves_order<'a>(
4469 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4470 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4471 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4472 left.join(right)
4473 }
4474
4475 #[expect(dead_code, reason = "compile-time type test")]
4476 fn join_unbounded_with_unbounded_is_no_order<'a>(
4477 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4478 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4479 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4480 left.join(right)
4481 }
4482
4483 #[expect(dead_code, reason = "compile-time type test")]
4484 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4485 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4486 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4487 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4488 left.join(right)
4489 }
4490
4491 #[expect(dead_code, reason = "compile-time type test")]
4492 fn join_unbounded_noorder_with_bounded<'a>(
4493 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4494 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4495 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4496 left.join(right)
4497 }
4498
4499 // === Compile-time type tests for cross_product ordering ===
4500
4501 #[expect(dead_code, reason = "compile-time type test")]
4502 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4503 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4504 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4505 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4506 left.cross_product(right)
4507 }
4508
4509 #[expect(dead_code, reason = "compile-time type test")]
4510 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4511 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4512 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4513 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4514 left.cross_product(right)
4515 }
4516
4517 #[expect(dead_code, reason = "compile-time type test")]
4518 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4519 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4520 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4521 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4522 left.cross_product(right)
4523 }
4524 } // mod join_ordering_type_tests
4525
4526 // === Runtime correctness tests for bounded join/cross_product ===
4527
4528 #[cfg(feature = "sim")]
4529 #[test]
4530 fn cross_product_mixed_boundedness_correctness() {
4531 use stageleft::q;
4532
4533 use crate::compile::builder::FlowBuilder;
4534 use crate::nondet::nondet;
4535
4536 let mut flow = FlowBuilder::new();
4537 let process = flow.process::<()>();
4538 let tick = process.tick();
4539
4540 let left = process.source_iter(q!(vec![1, 2]));
4541 let right = process
4542 .source_iter(q!(vec!['a', 'b']))
4543 .batch(&tick, nondet!(/** test */))
4544 .all_ticks();
4545
4546 let out = left.cross_product(right).sim_output();
4547
4548 flow.sim().exhaustive(async || {
4549 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4550 .await;
4551 });
4552 }
4553
4554 #[cfg(feature = "sim")]
4555 #[test]
4556 fn join_mixed_boundedness_correctness() {
4557 use stageleft::q;
4558
4559 use crate::compile::builder::FlowBuilder;
4560 use crate::nondet::nondet;
4561
4562 let mut flow = FlowBuilder::new();
4563 let process = flow.process::<()>();
4564 let tick = process.tick();
4565
4566 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4567 let right = process
4568 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4569 .batch(&tick, nondet!(/** test */))
4570 .all_ticks();
4571
4572 let out = left.join(right).sim_output();
4573
4574 flow.sim().exhaustive(async || {
4575 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4576 .await;
4577 });
4578 }
4579
4580 #[cfg(feature = "sim")]
4581 #[test]
4582 fn sim_merge_unordered_independent_atomics() {
4583 let mut flow = FlowBuilder::new();
4584 let node = flow.process::<()>();
4585
4586 let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4587 let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4588
4589 let out = input1
4590 .atomic()
4591 .merge_unordered(input2.atomic())
4592 .end_atomic()
4593 .sim_output();
4594
4595 flow.sim().exhaustive(async || {
4596 in1_send.send(1);
4597 in2_send.send(2);
4598
4599 out.assert_yields_only_unordered(vec![1, 2]).await;
4600 });
4601 }
4602}