hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::nondet::{NonDet, nondet};
31use crate::prelude::manual_proof;
32use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
33
34pub mod networking;
35
36/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
37#[sealed::sealed]
38pub trait Ordering:
39 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
40{
41 /// The [`StreamOrder`] corresponding to this type.
42 const ORDERING_KIND: StreamOrder;
43}
44
45/// Marks the stream as being totally ordered, which means that there are
46/// no sources of non-determinism (other than intentional ones) that will
47/// affect the order of elements.
48pub enum TotalOrder {}
49
50#[sealed::sealed]
51impl Ordering for TotalOrder {
52 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
53}
54
55/// Marks the stream as having no order, which means that the order of
56/// elements may be affected by non-determinism.
57///
58/// This restricts certain operators, such as `fold` and `reduce`, to only
59/// be used with commutative aggregation functions.
60pub enum NoOrder {}
61
62#[sealed::sealed]
63impl Ordering for NoOrder {
64 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
65}
66
67/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
68/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
69/// have `Self` guarantees instead.
70#[sealed::sealed]
71pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
72#[sealed::sealed]
73impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
74
75/// Helper trait for determining the weakest of two orderings.
76#[sealed::sealed]
77pub trait MinOrder<Other: ?Sized> {
78 /// The weaker of the two orderings.
79 type Min: Ordering;
80}
81
82#[sealed::sealed]
83impl<O: Ordering> MinOrder<O> for TotalOrder {
84 type Min = O;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for NoOrder {
89 type Min = NoOrder;
90}
91
92/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
93#[sealed::sealed]
94pub trait Retries:
95 MinRetries<Self, Min = Self>
96 + MinRetries<ExactlyOnce, Min = Self>
97 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
98{
99 /// The [`StreamRetry`] corresponding to this type.
100 const RETRIES_KIND: StreamRetry;
101}
102
103/// Marks the stream as having deterministic message cardinality, with no
104/// possibility of duplicates.
105pub enum ExactlyOnce {}
106
107#[sealed::sealed]
108impl Retries for ExactlyOnce {
109 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
110}
111
112/// Marks the stream as having non-deterministic message cardinality, which
113/// means that duplicates may occur, but messages will not be dropped.
114pub enum AtLeastOnce {}
115
116#[sealed::sealed]
117impl Retries for AtLeastOnce {
118 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
119}
120
121/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
122/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
123/// have `Self` guarantees instead.
124#[sealed::sealed]
125pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
126#[sealed::sealed]
127impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
128
129/// Helper trait for determining the weakest of two retry guarantees.
130#[sealed::sealed]
131pub trait MinRetries<Other: ?Sized> {
132 /// The weaker of the two retry guarantees.
133 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
134}
135
136#[sealed::sealed]
137impl<R: Retries> MinRetries<R> for ExactlyOnce {
138 type Min = R;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for AtLeastOnce {
143 type Min = AtLeastOnce;
144}
145
146#[sealed::sealed]
147#[diagnostic::on_unimplemented(
148 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
149 label = "required here",
150 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
151)]
152/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
153pub trait IsOrdered: Ordering {}
154
155#[sealed::sealed]
156#[diagnostic::do_not_recommend]
157impl IsOrdered for TotalOrder {}
158
159#[sealed::sealed]
160#[diagnostic::on_unimplemented(
161 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
162 label = "required here",
163 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
164)]
165/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
166pub trait IsExactlyOnce: Retries {}
167
168#[sealed::sealed]
169#[diagnostic::do_not_recommend]
170impl IsExactlyOnce for ExactlyOnce {}
171
172/// Streaming sequence of elements with type `Type`.
173///
174/// This live collection represents a growing sequence of elements, with new elements being
175/// asynchronously appended to the end of the sequence. This can be used to model the arrival
176/// of network input, such as API requests, or streaming ingestion.
177///
178/// By default, all streams have deterministic ordering and each element is materialized exactly
179/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
180/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
181/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
182///
183/// Type Parameters:
184/// - `Type`: the type of elements in the stream
185/// - `Loc`: the location where the stream is being materialized
186/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
187/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
188/// (default is [`TotalOrder`])
189/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
190/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
191pub struct Stream<
192 Type,
193 Loc,
194 Bound: Boundedness = Unbounded,
195 Order: Ordering = TotalOrder,
196 Retry: Retries = ExactlyOnce,
197> {
198 pub(crate) location: Loc,
199 pub(crate) ir_node: RefCell<HydroNode>,
200
201 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
202}
203
204impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
205 for Stream<T, L, Unbounded, O, R>
206where
207 L: Location<'a>,
208{
209 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
210 let new_meta = stream
211 .location
212 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
213
214 Stream {
215 location: stream.location,
216 ir_node: RefCell::new(HydroNode::Cast {
217 inner: Box::new(stream.ir_node.into_inner()),
218 metadata: new_meta,
219 }),
220 _phantom: PhantomData,
221 }
222 }
223}
224
225impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
226 for Stream<T, L, B, NoOrder, R>
227where
228 L: Location<'a>,
229{
230 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
231 stream.weaken_ordering()
232 }
233}
234
235impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
236 for Stream<T, L, B, O, AtLeastOnce>
237where
238 L: Location<'a>,
239{
240 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
241 stream.weaken_retries()
242 }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
246where
247 L: Location<'a>,
248{
249 fn defer_tick(self) -> Self {
250 Stream::defer_tick(self)
251 }
252}
253
254impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
255 for Stream<T, Tick<L>, Bounded, O, R>
256where
257 L: Location<'a>,
258{
259 type Location = Tick<L>;
260
261 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262 Stream::new(
263 location.clone(),
264 HydroNode::CycleSource {
265 cycle_id,
266 metadata: location.new_node_metadata(Self::collection_kind()),
267 },
268 )
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
280 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
281 location.clone(),
282 HydroNode::DeferTick {
283 input: Box::new(HydroNode::CycleSource {
284 cycle_id,
285 metadata: location.new_node_metadata(Self::collection_kind()),
286 }),
287 metadata: location.new_node_metadata(Self::collection_kind()),
288 },
289 );
290
291 from_previous_tick.chain(initial.filter_if_some(location.optional_first_tick(q!(()))))
292 }
293}
294
295impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
296 for Stream<T, Tick<L>, Bounded, O, R>
297where
298 L: Location<'a>,
299{
300 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
301 assert_eq!(
302 Location::id(&self.location),
303 expected_location,
304 "locations do not match"
305 );
306 self.location
307 .flow_state()
308 .borrow_mut()
309 .push_root(HydroRoot::CycleSink {
310 cycle_id,
311 input: Box::new(self.ir_node.into_inner()),
312 op_metadata: HydroIrOpMetadata::new(),
313 });
314 }
315}
316
317impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
318 for Stream<T, L, B, O, R>
319where
320 L: Location<'a> + NoTick,
321{
322 type Location = L;
323
324 fn create_source(cycle_id: CycleId, location: L) -> Self {
325 Stream::new(
326 location.clone(),
327 HydroNode::CycleSource {
328 cycle_id,
329 metadata: location.new_node_metadata(Self::collection_kind()),
330 },
331 )
332 }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
336 for Stream<T, L, B, O, R>
337where
338 L: Location<'a> + NoTick,
339{
340 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
341 assert_eq!(
342 Location::id(&self.location),
343 expected_location,
344 "locations do not match"
345 );
346 self.location
347 .flow_state()
348 .borrow_mut()
349 .push_root(HydroRoot::CycleSink {
350 cycle_id,
351 input: Box::new(self.ir_node.into_inner()),
352 op_metadata: HydroIrOpMetadata::new(),
353 });
354 }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
358where
359 T: Clone,
360 L: Location<'a>,
361{
362 fn clone(&self) -> Self {
363 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
364 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
365 *self.ir_node.borrow_mut() = HydroNode::Tee {
366 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
367 metadata: self.location.new_node_metadata(Self::collection_kind()),
368 };
369 }
370
371 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
372 Stream {
373 location: self.location.clone(),
374 ir_node: HydroNode::Tee {
375 inner: SharedNode(inner.0.clone()),
376 metadata: metadata.clone(),
377 }
378 .into(),
379 _phantom: PhantomData,
380 }
381 } else {
382 unreachable!()
383 }
384 }
385}
386
387impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
388where
389 L: Location<'a>,
390{
391 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
392 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
393 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
394
395 Stream {
396 location,
397 ir_node: RefCell::new(ir_node),
398 _phantom: PhantomData,
399 }
400 }
401
402 /// Returns the [`Location`] where this stream is being materialized.
403 pub fn location(&self) -> &L {
404 &self.location
405 }
406
407 pub(crate) fn collection_kind() -> CollectionKind {
408 CollectionKind::Stream {
409 bound: B::BOUND_KIND,
410 order: O::ORDERING_KIND,
411 retry: R::RETRIES_KIND,
412 element_type: quote_type::<T>().into(),
413 }
414 }
415
416 /// Produces a stream based on invoking `f` on each element.
417 /// If you do not want to modify the stream and instead only want to view
418 /// each item use [`Stream::inspect`] instead.
419 ///
420 /// # Example
421 /// ```rust
422 /// # #[cfg(feature = "deploy")] {
423 /// # use hydro_lang::prelude::*;
424 /// # use futures::StreamExt;
425 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
426 /// let words = process.source_iter(q!(vec!["hello", "world"]));
427 /// words.map(q!(|x| x.to_uppercase()))
428 /// # }, |mut stream| async move {
429 /// # for w in vec!["HELLO", "WORLD"] {
430 /// # assert_eq!(stream.next().await.unwrap(), w);
431 /// # }
432 /// # }));
433 /// # }
434 /// ```
435 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
436 where
437 F: Fn(T) -> U + 'a,
438 {
439 let f = f.splice_fn1_ctx(&self.location).into();
440 Stream::new(
441 self.location.clone(),
442 HydroNode::Map {
443 f,
444 input: Box::new(self.ir_node.into_inner()),
445 metadata: self
446 .location
447 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
448 },
449 )
450 }
451
452 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
453 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
454 /// for the output type `U` must produce items in a **deterministic** order.
455 ///
456 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
457 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// process
466 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
467 /// .flat_map_ordered(q!(|x| x))
468 /// # }, |mut stream| async move {
469 /// // 1, 2, 3, 4
470 /// # for w in (1..5) {
471 /// # assert_eq!(stream.next().await.unwrap(), w);
472 /// # }
473 /// # }));
474 /// # }
475 /// ```
476 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
477 where
478 I: IntoIterator<Item = U>,
479 F: Fn(T) -> I + 'a,
480 {
481 let f = f.splice_fn1_ctx(&self.location).into();
482 Stream::new(
483 self.location.clone(),
484 HydroNode::FlatMap {
485 f,
486 input: Box::new(self.ir_node.into_inner()),
487 metadata: self
488 .location
489 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
490 },
491 )
492 }
493
494 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
495 /// for the output type `U` to produce items in any order.
496 ///
497 /// # Example
498 /// ```rust
499 /// # #[cfg(feature = "deploy")] {
500 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
501 /// # use futures::StreamExt;
502 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
503 /// process
504 /// .source_iter(q!(vec![
505 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
506 /// std::collections::HashSet::from_iter(vec![3, 4]),
507 /// ]))
508 /// .flat_map_unordered(q!(|x| x))
509 /// # }, |mut stream| async move {
510 /// // 1, 2, 3, 4, but in no particular order
511 /// # let mut results = Vec::new();
512 /// # for w in (1..5) {
513 /// # results.push(stream.next().await.unwrap());
514 /// # }
515 /// # results.sort();
516 /// # assert_eq!(results, vec![1, 2, 3, 4]);
517 /// # }));
518 /// # }
519 /// ```
520 pub fn flat_map_unordered<U, I, F>(
521 self,
522 f: impl IntoQuotedMut<'a, F, L>,
523 ) -> Stream<U, L, B, NoOrder, R>
524 where
525 I: IntoIterator<Item = U>,
526 F: Fn(T) -> I + 'a,
527 {
528 let f = f.splice_fn1_ctx(&self.location).into();
529 Stream::new(
530 self.location.clone(),
531 HydroNode::FlatMap {
532 f,
533 input: Box::new(self.ir_node.into_inner()),
534 metadata: self
535 .location
536 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
537 },
538 )
539 }
540
541 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
542 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
543 ///
544 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
545 /// not deterministic, use [`Stream::flatten_unordered`] instead.
546 ///
547 /// ```rust
548 /// # #[cfg(feature = "deploy")] {
549 /// # use hydro_lang::prelude::*;
550 /// # use futures::StreamExt;
551 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552 /// process
553 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
554 /// .flatten_ordered()
555 /// # }, |mut stream| async move {
556 /// // 1, 2, 3, 4
557 /// # for w in (1..5) {
558 /// # assert_eq!(stream.next().await.unwrap(), w);
559 /// # }
560 /// # }));
561 /// # }
562 /// ```
563 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
564 where
565 T: IntoIterator<Item = U>,
566 {
567 self.flat_map_ordered(q!(|d| d))
568 }
569
570 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
571 /// for the element type `T` to produce items in any order.
572 ///
573 /// # Example
574 /// ```rust
575 /// # #[cfg(feature = "deploy")] {
576 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
577 /// # use futures::StreamExt;
578 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
579 /// process
580 /// .source_iter(q!(vec![
581 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
582 /// std::collections::HashSet::from_iter(vec![3, 4]),
583 /// ]))
584 /// .flatten_unordered()
585 /// # }, |mut stream| async move {
586 /// // 1, 2, 3, 4, but in no particular order
587 /// # let mut results = Vec::new();
588 /// # for w in (1..5) {
589 /// # results.push(stream.next().await.unwrap());
590 /// # }
591 /// # results.sort();
592 /// # assert_eq!(results, vec![1, 2, 3, 4]);
593 /// # }));
594 /// # }
595 /// ```
596 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
597 where
598 T: IntoIterator<Item = U>,
599 {
600 self.flat_map_unordered(q!(|d| d))
601 }
602
603 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
604 /// `f`, preserving the order of the elements.
605 ///
606 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
607 /// not modify or take ownership of the values. If you need to modify the values while filtering
608 /// use [`Stream::filter_map`] instead.
609 ///
610 /// # Example
611 /// ```rust
612 /// # #[cfg(feature = "deploy")] {
613 /// # use hydro_lang::prelude::*;
614 /// # use futures::StreamExt;
615 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616 /// process
617 /// .source_iter(q!(vec![1, 2, 3, 4]))
618 /// .filter(q!(|&x| x > 2))
619 /// # }, |mut stream| async move {
620 /// // 3, 4
621 /// # for w in (3..5) {
622 /// # assert_eq!(stream.next().await.unwrap(), w);
623 /// # }
624 /// # }));
625 /// # }
626 /// ```
627 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
628 where
629 F: Fn(&T) -> bool + 'a,
630 {
631 let f = f.splice_fn1_borrow_ctx(&self.location).into();
632 Stream::new(
633 self.location.clone(),
634 HydroNode::Filter {
635 f,
636 input: Box::new(self.ir_node.into_inner()),
637 metadata: self.location.new_node_metadata(Self::collection_kind()),
638 },
639 )
640 }
641
642 /// Splits the stream into two streams based on a predicate, without cloning elements.
643 ///
644 /// Elements for which `f` returns `true` are sent to the first output stream,
645 /// and elements for which `f` returns `false` are sent to the second output stream.
646 ///
647 /// Unlike using `filter` twice, this only evaluates the predicate once per element
648 /// and does not require `T: Clone`.
649 ///
650 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
651 /// the predicate is only used for routing; the element itself is moved to the
652 /// appropriate output stream.
653 ///
654 /// # Example
655 /// ```rust
656 /// # #[cfg(feature = "deploy")] {
657 /// # use hydro_lang::prelude::*;
658 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
659 /// # use futures::StreamExt;
660 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
661 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
662 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
663 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
664 /// evens.map(q!(|x| (x, true)))
665 /// .interleave(odds.map(q!(|x| (x, false))))
666 /// # }, |mut stream| async move {
667 /// # let mut results = Vec::new();
668 /// # for _ in 0..6 {
669 /// # results.push(stream.next().await.unwrap());
670 /// # }
671 /// # results.sort();
672 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
673 /// # }));
674 /// # }
675 /// ```
676 #[expect(
677 clippy::type_complexity,
678 reason = "return type mirrors the input stream type"
679 )]
680 pub fn partition<F>(
681 self,
682 f: impl IntoQuotedMut<'a, F, L>,
683 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
684 where
685 F: Fn(&T) -> bool + 'a,
686 {
687 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
688 let shared = SharedNode(Rc::new(RefCell::new(self.ir_node.into_inner())));
689
690 let true_stream = Stream::new(
691 self.location.clone(),
692 HydroNode::Partition {
693 inner: SharedNode(shared.0.clone()),
694 f: f.clone(),
695 is_true: true,
696 metadata: self.location.new_node_metadata(Self::collection_kind()),
697 },
698 );
699
700 let false_stream = Stream::new(
701 self.location.clone(),
702 HydroNode::Partition {
703 inner: SharedNode(shared.0),
704 f,
705 is_true: false,
706 metadata: self.location.new_node_metadata(Self::collection_kind()),
707 },
708 );
709
710 (true_stream, false_stream)
711 }
712
713 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
714 ///
715 /// # Example
716 /// ```rust
717 /// # #[cfg(feature = "deploy")] {
718 /// # use hydro_lang::prelude::*;
719 /// # use futures::StreamExt;
720 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
721 /// process
722 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
723 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
724 /// # }, |mut stream| async move {
725 /// // 1, 2
726 /// # for w in (1..3) {
727 /// # assert_eq!(stream.next().await.unwrap(), w);
728 /// # }
729 /// # }));
730 /// # }
731 /// ```
732 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
733 where
734 F: Fn(T) -> Option<U> + 'a,
735 {
736 let f = f.splice_fn1_ctx(&self.location).into();
737 Stream::new(
738 self.location.clone(),
739 HydroNode::FilterMap {
740 f,
741 input: Box::new(self.ir_node.into_inner()),
742 metadata: self
743 .location
744 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
745 },
746 )
747 }
748
749 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
750 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
751 /// If `other` is an empty [`Optional`], no values will be produced.
752 ///
753 /// # Example
754 /// ```rust
755 /// # #[cfg(feature = "deploy")] {
756 /// # use hydro_lang::prelude::*;
757 /// # use futures::StreamExt;
758 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
759 /// let tick = process.tick();
760 /// let batch = process
761 /// .source_iter(q!(vec![1, 2, 3, 4]))
762 /// .batch(&tick, nondet!(/** test */));
763 /// let count = batch.clone().count(); // `count()` returns a singleton
764 /// batch.cross_singleton(count).all_ticks()
765 /// # }, |mut stream| async move {
766 /// // (1, 4), (2, 4), (3, 4), (4, 4)
767 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
768 /// # assert_eq!(stream.next().await.unwrap(), w);
769 /// # }
770 /// # }));
771 /// # }
772 /// ```
773 pub fn cross_singleton<O2>(
774 self,
775 other: impl Into<Optional<O2, L, Bounded>>,
776 ) -> Stream<(T, O2), L, B, O, R>
777 where
778 O2: Clone,
779 {
780 let other: Optional<O2, L, Bounded> = other.into();
781 check_matching_location(&self.location, &other.location);
782
783 Stream::new(
784 self.location.clone(),
785 HydroNode::CrossSingleton {
786 left: Box::new(self.ir_node.into_inner()),
787 right: Box::new(other.ir_node.into_inner()),
788 metadata: self
789 .location
790 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
791 },
792 )
793 }
794
795 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
796 ///
797 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
798 /// leader of a cluster.
799 ///
800 /// # Example
801 /// ```rust
802 /// # #[cfg(feature = "deploy")] {
803 /// # use hydro_lang::prelude::*;
804 /// # use futures::StreamExt;
805 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
806 /// let tick = process.tick();
807 /// // ticks are lazy by default, forces the second tick to run
808 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
809 ///
810 /// let batch_first_tick = process
811 /// .source_iter(q!(vec![1, 2, 3, 4]))
812 /// .batch(&tick, nondet!(/** test */));
813 /// let batch_second_tick = process
814 /// .source_iter(q!(vec![5, 6, 7, 8]))
815 /// .batch(&tick, nondet!(/** test */))
816 /// .defer_tick(); // appears on the second tick
817 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
818 /// batch_first_tick.chain(batch_second_tick)
819 /// .filter_if_some(some_on_first_tick)
820 /// .all_ticks()
821 /// # }, |mut stream| async move {
822 /// // [1, 2, 3, 4]
823 /// # for w in vec![1, 2, 3, 4] {
824 /// # assert_eq!(stream.next().await.unwrap(), w);
825 /// # }
826 /// # }));
827 /// # }
828 /// ```
829 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
830 self.cross_singleton(signal.map(q!(|_u| ())))
831 .map(q!(|(d, _signal)| d))
832 }
833
834 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
835 ///
836 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
837 /// some local state.
838 ///
839 /// # Example
840 /// ```rust
841 /// # #[cfg(feature = "deploy")] {
842 /// # use hydro_lang::prelude::*;
843 /// # use futures::StreamExt;
844 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845 /// let tick = process.tick();
846 /// // ticks are lazy by default, forces the second tick to run
847 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
848 ///
849 /// let batch_first_tick = process
850 /// .source_iter(q!(vec![1, 2, 3, 4]))
851 /// .batch(&tick, nondet!(/** test */));
852 /// let batch_second_tick = process
853 /// .source_iter(q!(vec![5, 6, 7, 8]))
854 /// .batch(&tick, nondet!(/** test */))
855 /// .defer_tick(); // appears on the second tick
856 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
857 /// batch_first_tick.chain(batch_second_tick)
858 /// .filter_if_none(some_on_first_tick)
859 /// .all_ticks()
860 /// # }, |mut stream| async move {
861 /// // [5, 6, 7, 8]
862 /// # for w in vec![5, 6, 7, 8] {
863 /// # assert_eq!(stream.next().await.unwrap(), w);
864 /// # }
865 /// # }));
866 /// # }
867 /// ```
868 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
869 self.filter_if_some(
870 other
871 .map(q!(|_| ()))
872 .into_singleton()
873 .filter(q!(|o| o.is_none())),
874 )
875 }
876
877 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
878 /// tupled pairs in a non-deterministic order.
879 ///
880 /// # Example
881 /// ```rust
882 /// # #[cfg(feature = "deploy")] {
883 /// # use hydro_lang::prelude::*;
884 /// # use std::collections::HashSet;
885 /// # use futures::StreamExt;
886 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887 /// let tick = process.tick();
888 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
889 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
890 /// stream1.cross_product(stream2)
891 /// # }, |mut stream| async move {
892 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
893 /// # stream.map(|i| assert!(expected.contains(&i)));
894 /// # }));
895 /// # }
896 /// ```
897 pub fn cross_product<T2, O2: Ordering>(
898 self,
899 other: Stream<T2, L, B, O2, R>,
900 ) -> Stream<(T, T2), L, B, NoOrder, R>
901 where
902 T: Clone,
903 T2: Clone,
904 {
905 check_matching_location(&self.location, &other.location);
906
907 Stream::new(
908 self.location.clone(),
909 HydroNode::CrossProduct {
910 left: Box::new(self.ir_node.into_inner()),
911 right: Box::new(other.ir_node.into_inner()),
912 metadata: self
913 .location
914 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
915 },
916 )
917 }
918
919 /// Takes one stream as input and filters out any duplicate occurrences. The output
920 /// contains all unique values from the input.
921 ///
922 /// # Example
923 /// ```rust
924 /// # #[cfg(feature = "deploy")] {
925 /// # use hydro_lang::prelude::*;
926 /// # use futures::StreamExt;
927 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
928 /// let tick = process.tick();
929 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
930 /// # }, |mut stream| async move {
931 /// # for w in vec![1, 2, 3, 4] {
932 /// # assert_eq!(stream.next().await.unwrap(), w);
933 /// # }
934 /// # }));
935 /// # }
936 /// ```
937 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
938 where
939 T: Eq + Hash,
940 {
941 Stream::new(
942 self.location.clone(),
943 HydroNode::Unique {
944 input: Box::new(self.ir_node.into_inner()),
945 metadata: self
946 .location
947 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
948 },
949 )
950 }
951
952 /// Outputs everything in this stream that is *not* contained in the `other` stream.
953 ///
954 /// The `other` stream must be [`Bounded`], since this function will wait until
955 /// all its elements are available before producing any output.
956 /// # Example
957 /// ```rust
958 /// # #[cfg(feature = "deploy")] {
959 /// # use hydro_lang::prelude::*;
960 /// # use futures::StreamExt;
961 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
962 /// let tick = process.tick();
963 /// let stream = process
964 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
965 /// .batch(&tick, nondet!(/** test */));
966 /// let batch = process
967 /// .source_iter(q!(vec![1, 2]))
968 /// .batch(&tick, nondet!(/** test */));
969 /// stream.filter_not_in(batch).all_ticks()
970 /// # }, |mut stream| async move {
971 /// # for w in vec![3, 4] {
972 /// # assert_eq!(stream.next().await.unwrap(), w);
973 /// # }
974 /// # }));
975 /// # }
976 /// ```
977 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
978 where
979 T: Eq + Hash,
980 B2: IsBounded,
981 {
982 check_matching_location(&self.location, &other.location);
983
984 Stream::new(
985 self.location.clone(),
986 HydroNode::Difference {
987 pos: Box::new(self.ir_node.into_inner()),
988 neg: Box::new(other.ir_node.into_inner()),
989 metadata: self
990 .location
991 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
992 },
993 )
994 }
995
996 /// An operator which allows you to "inspect" each element of a stream without
997 /// modifying it. The closure `f` is called on a reference to each item. This is
998 /// mainly useful for debugging, and should not be used to generate side-effects.
999 ///
1000 /// # Example
1001 /// ```rust
1002 /// # #[cfg(feature = "deploy")] {
1003 /// # use hydro_lang::prelude::*;
1004 /// # use futures::StreamExt;
1005 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1006 /// let nums = process.source_iter(q!(vec![1, 2]));
1007 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1008 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1009 /// # }, |mut stream| async move {
1010 /// # for w in vec![1, 2] {
1011 /// # assert_eq!(stream.next().await.unwrap(), w);
1012 /// # }
1013 /// # }));
1014 /// # }
1015 /// ```
1016 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1017 where
1018 F: Fn(&T) + 'a,
1019 {
1020 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1021
1022 Stream::new(
1023 self.location.clone(),
1024 HydroNode::Inspect {
1025 f,
1026 input: Box::new(self.ir_node.into_inner()),
1027 metadata: self.location.new_node_metadata(Self::collection_kind()),
1028 },
1029 )
1030 }
1031
1032 /// Executes the provided closure for every element in this stream.
1033 ///
1034 /// Because the closure may have side effects, the stream must have deterministic order
1035 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1036 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1037 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1038 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1039 where
1040 O: IsOrdered,
1041 R: IsExactlyOnce,
1042 {
1043 let f = f.splice_fn1_ctx(&self.location).into();
1044 self.location
1045 .flow_state()
1046 .borrow_mut()
1047 .push_root(HydroRoot::ForEach {
1048 input: Box::new(self.ir_node.into_inner()),
1049 f,
1050 op_metadata: HydroIrOpMetadata::new(),
1051 });
1052 }
1053
1054 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1055 /// TCP socket to some other server. You should _not_ use this API for interacting with
1056 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1057 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1058 /// interaction with asynchronous sinks.
1059 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1060 where
1061 O: IsOrdered,
1062 R: IsExactlyOnce,
1063 S: 'a + futures::Sink<T> + Unpin,
1064 {
1065 self.location
1066 .flow_state()
1067 .borrow_mut()
1068 .push_root(HydroRoot::DestSink {
1069 sink: sink.splice_typed_ctx(&self.location).into(),
1070 input: Box::new(self.ir_node.into_inner()),
1071 op_metadata: HydroIrOpMetadata::new(),
1072 });
1073 }
1074
1075 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1076 ///
1077 /// # Example
1078 /// ```rust
1079 /// # #[cfg(feature = "deploy")] {
1080 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1081 /// # use futures::StreamExt;
1082 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1083 /// let tick = process.tick();
1084 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1085 /// numbers.enumerate()
1086 /// # }, |mut stream| async move {
1087 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1088 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1089 /// # assert_eq!(stream.next().await.unwrap(), w);
1090 /// # }
1091 /// # }));
1092 /// # }
1093 /// ```
1094 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1095 where
1096 O: IsOrdered,
1097 R: IsExactlyOnce,
1098 {
1099 Stream::new(
1100 self.location.clone(),
1101 HydroNode::Enumerate {
1102 input: Box::new(self.ir_node.into_inner()),
1103 metadata: self.location.new_node_metadata(Stream::<
1104 (usize, T),
1105 L,
1106 B,
1107 TotalOrder,
1108 ExactlyOnce,
1109 >::collection_kind()),
1110 },
1111 )
1112 }
1113
1114 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1115 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1116 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1117 ///
1118 /// Depending on the input stream guarantees, the closure may need to be commutative
1119 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1120 ///
1121 /// # Example
1122 /// ```rust
1123 /// # #[cfg(feature = "deploy")] {
1124 /// # use hydro_lang::prelude::*;
1125 /// # use futures::StreamExt;
1126 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1127 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1128 /// words
1129 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1130 /// .into_stream()
1131 /// # }, |mut stream| async move {
1132 /// // "HELLOWORLD"
1133 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1134 /// # }));
1135 /// # }
1136 /// ```
1137 pub fn fold<A, I, F, C, Idemp>(
1138 self,
1139 init: impl IntoQuotedMut<'a, I, L>,
1140 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1141 ) -> Singleton<A, L, B>
1142 where
1143 I: Fn() -> A + 'a,
1144 F: Fn(&mut A, T),
1145 C: ValidCommutativityFor<O>,
1146 Idemp: ValidIdempotenceFor<R>,
1147 {
1148 let init = init.splice_fn0_ctx(&self.location).into();
1149 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1150 proof.register_proof(&comb);
1151
1152 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1153 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1154
1155 let core = HydroNode::Fold {
1156 init,
1157 acc: comb.into(),
1158 input: Box::new(ordered_etc.ir_node.into_inner()),
1159 metadata: ordered_etc
1160 .location
1161 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1162 };
1163
1164 Singleton::new(ordered_etc.location, core)
1165 }
1166
1167 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1168 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1169 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1170 /// reference, so that it can be modified in place.
1171 ///
1172 /// Depending on the input stream guarantees, the closure may need to be commutative
1173 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1174 ///
1175 /// # Example
1176 /// ```rust
1177 /// # #[cfg(feature = "deploy")] {
1178 /// # use hydro_lang::prelude::*;
1179 /// # use futures::StreamExt;
1180 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1181 /// let bools = process.source_iter(q!(vec![false, true, false]));
1182 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1183 /// # }, |mut stream| async move {
1184 /// // true
1185 /// # assert_eq!(stream.next().await.unwrap(), true);
1186 /// # }));
1187 /// # }
1188 /// ```
1189 pub fn reduce<F, C, Idemp>(
1190 self,
1191 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1192 ) -> Optional<T, L, B>
1193 where
1194 F: Fn(&mut T, T) + 'a,
1195 C: ValidCommutativityFor<O>,
1196 Idemp: ValidIdempotenceFor<R>,
1197 {
1198 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1199 proof.register_proof(&f);
1200
1201 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1202 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1203
1204 let core = HydroNode::Reduce {
1205 f: f.into(),
1206 input: Box::new(ordered_etc.ir_node.into_inner()),
1207 metadata: ordered_etc
1208 .location
1209 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1210 };
1211
1212 Optional::new(ordered_etc.location, core)
1213 }
1214
1215 /// Computes the maximum element in the stream as an [`Optional`], which
1216 /// will be empty until the first element in the input arrives.
1217 ///
1218 /// # Example
1219 /// ```rust
1220 /// # #[cfg(feature = "deploy")] {
1221 /// # use hydro_lang::prelude::*;
1222 /// # use futures::StreamExt;
1223 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224 /// let tick = process.tick();
1225 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1226 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1227 /// batch.max().all_ticks()
1228 /// # }, |mut stream| async move {
1229 /// // 4
1230 /// # assert_eq!(stream.next().await.unwrap(), 4);
1231 /// # }));
1232 /// # }
1233 /// ```
1234 pub fn max(self) -> Optional<T, L, B>
1235 where
1236 T: Ord,
1237 {
1238 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1239 .assume_ordering_trusted_bounded::<TotalOrder>(
1240 nondet!(/** max is commutative, but order affects intermediates */),
1241 )
1242 .reduce(q!(|curr, new| {
1243 if new > *curr {
1244 *curr = new;
1245 }
1246 }))
1247 }
1248
1249 /// Computes the minimum element in the stream as an [`Optional`], which
1250 /// will be empty until the first element in the input arrives.
1251 ///
1252 /// # Example
1253 /// ```rust
1254 /// # #[cfg(feature = "deploy")] {
1255 /// # use hydro_lang::prelude::*;
1256 /// # use futures::StreamExt;
1257 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1258 /// let tick = process.tick();
1259 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1260 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1261 /// batch.min().all_ticks()
1262 /// # }, |mut stream| async move {
1263 /// // 1
1264 /// # assert_eq!(stream.next().await.unwrap(), 1);
1265 /// # }));
1266 /// # }
1267 /// ```
1268 pub fn min(self) -> Optional<T, L, B>
1269 where
1270 T: Ord,
1271 {
1272 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1273 .assume_ordering_trusted_bounded::<TotalOrder>(
1274 nondet!(/** max is commutative, but order affects intermediates */),
1275 )
1276 .reduce(q!(|curr, new| {
1277 if new < *curr {
1278 *curr = new;
1279 }
1280 }))
1281 }
1282
1283 /// Computes the first element in the stream as an [`Optional`], which
1284 /// will be empty until the first element in the input arrives.
1285 ///
1286 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1287 /// re-ordering of elements may cause the first element to change.
1288 ///
1289 /// # Example
1290 /// ```rust
1291 /// # #[cfg(feature = "deploy")] {
1292 /// # use hydro_lang::prelude::*;
1293 /// # use futures::StreamExt;
1294 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1295 /// let tick = process.tick();
1296 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1297 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1298 /// batch.first().all_ticks()
1299 /// # }, |mut stream| async move {
1300 /// // 1
1301 /// # assert_eq!(stream.next().await.unwrap(), 1);
1302 /// # }));
1303 /// # }
1304 /// ```
1305 pub fn first(self) -> Optional<T, L, B>
1306 where
1307 O: IsOrdered,
1308 {
1309 self.make_totally_ordered()
1310 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1311 .reduce(q!(|_, _| {}))
1312 }
1313
1314 /// Computes the last element in the stream as an [`Optional`], which
1315 /// will be empty until an element in the input arrives.
1316 ///
1317 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1318 /// re-ordering of elements may cause the last element to change.
1319 ///
1320 /// # Example
1321 /// ```rust
1322 /// # #[cfg(feature = "deploy")] {
1323 /// # use hydro_lang::prelude::*;
1324 /// # use futures::StreamExt;
1325 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1326 /// let tick = process.tick();
1327 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1328 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1329 /// batch.last().all_ticks()
1330 /// # }, |mut stream| async move {
1331 /// // 4
1332 /// # assert_eq!(stream.next().await.unwrap(), 4);
1333 /// # }));
1334 /// # }
1335 /// ```
1336 pub fn last(self) -> Optional<T, L, B>
1337 where
1338 O: IsOrdered,
1339 {
1340 self.make_totally_ordered()
1341 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1342 .reduce(q!(|curr, new| *curr = new))
1343 }
1344
1345 /// Collects all the elements of this stream into a single [`Vec`] element.
1346 ///
1347 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1348 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1349 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1350 /// the vector at an arbitrary point in time.
1351 ///
1352 /// # Example
1353 /// ```rust
1354 /// # #[cfg(feature = "deploy")] {
1355 /// # use hydro_lang::prelude::*;
1356 /// # use futures::StreamExt;
1357 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1358 /// let tick = process.tick();
1359 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1360 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1361 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1362 /// # }, |mut stream| async move {
1363 /// // [ vec![1, 2, 3, 4] ]
1364 /// # for w in vec![vec![1, 2, 3, 4]] {
1365 /// # assert_eq!(stream.next().await.unwrap(), w);
1366 /// # }
1367 /// # }));
1368 /// # }
1369 /// ```
1370 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1371 where
1372 O: IsOrdered,
1373 R: IsExactlyOnce,
1374 {
1375 self.make_totally_ordered().make_exactly_once().fold(
1376 q!(|| vec![]),
1377 q!(|acc, v| {
1378 acc.push(v);
1379 }),
1380 )
1381 }
1382
1383 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1384 /// and emitting each intermediate result.
1385 ///
1386 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1387 /// containing all intermediate accumulated values. The scan operation can also terminate early
1388 /// by returning `None`.
1389 ///
1390 /// The function takes a mutable reference to the accumulator and the current element, and returns
1391 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1392 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1393 ///
1394 /// # Examples
1395 ///
1396 /// Basic usage - running sum:
1397 /// ```rust
1398 /// # #[cfg(feature = "deploy")] {
1399 /// # use hydro_lang::prelude::*;
1400 /// # use futures::StreamExt;
1401 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1402 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1403 /// q!(|| 0),
1404 /// q!(|acc, x| {
1405 /// *acc += x;
1406 /// Some(*acc)
1407 /// }),
1408 /// )
1409 /// # }, |mut stream| async move {
1410 /// // Output: 1, 3, 6, 10
1411 /// # for w in vec![1, 3, 6, 10] {
1412 /// # assert_eq!(stream.next().await.unwrap(), w);
1413 /// # }
1414 /// # }));
1415 /// # }
1416 /// ```
1417 ///
1418 /// Early termination example:
1419 /// ```rust
1420 /// # #[cfg(feature = "deploy")] {
1421 /// # use hydro_lang::prelude::*;
1422 /// # use futures::StreamExt;
1423 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1424 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1425 /// q!(|| 1),
1426 /// q!(|state, x| {
1427 /// *state = *state * x;
1428 /// if *state > 6 {
1429 /// None // Terminate the stream
1430 /// } else {
1431 /// Some(-*state)
1432 /// }
1433 /// }),
1434 /// )
1435 /// # }, |mut stream| async move {
1436 /// // Output: -1, -2, -6
1437 /// # for w in vec![-1, -2, -6] {
1438 /// # assert_eq!(stream.next().await.unwrap(), w);
1439 /// # }
1440 /// # }));
1441 /// # }
1442 /// ```
1443 pub fn scan<A, U, I, F>(
1444 self,
1445 init: impl IntoQuotedMut<'a, I, L>,
1446 f: impl IntoQuotedMut<'a, F, L>,
1447 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1448 where
1449 O: IsOrdered,
1450 R: IsExactlyOnce,
1451 I: Fn() -> A + 'a,
1452 F: Fn(&mut A, T) -> Option<U> + 'a,
1453 {
1454 let init = init.splice_fn0_ctx(&self.location).into();
1455 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1456
1457 Stream::new(
1458 self.location.clone(),
1459 HydroNode::Scan {
1460 init,
1461 acc: f,
1462 input: Box::new(self.ir_node.into_inner()),
1463 metadata: self.location.new_node_metadata(
1464 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1465 ),
1466 },
1467 )
1468 }
1469
1470 /// Given a time interval, returns a stream corresponding to samples taken from the
1471 /// stream roughly at that interval. The output will have elements in the same order
1472 /// as the input, but with arbitrary elements skipped between samples. There is also
1473 /// no guarantee on the exact timing of the samples.
1474 ///
1475 /// # Non-Determinism
1476 /// The output stream is non-deterministic in which elements are sampled, since this
1477 /// is controlled by a clock.
1478 pub fn sample_every(
1479 self,
1480 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1481 nondet: NonDet,
1482 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1483 where
1484 L: NoTick + NoAtomic,
1485 {
1486 let samples = self.location.source_interval(interval, nondet);
1487
1488 let tick = self.location.tick();
1489 self.batch(&tick, nondet)
1490 .filter_if_some(samples.batch(&tick, nondet).first())
1491 .all_ticks()
1492 .weaken_retries()
1493 }
1494
1495 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1496 /// stream has not emitted a value since that duration.
1497 ///
1498 /// # Non-Determinism
1499 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1500 /// samples take place, timeouts may be non-deterministically generated or missed,
1501 /// and the notification of the timeout may be delayed as well. There is also no
1502 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1503 /// detected based on when the next sample is taken.
1504 pub fn timeout(
1505 self,
1506 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1507 nondet: NonDet,
1508 ) -> Optional<(), L, Unbounded>
1509 where
1510 L: NoTick + NoAtomic,
1511 {
1512 let tick = self.location.tick();
1513
1514 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1515 q!(|| None),
1516 q!(
1517 |latest, _| {
1518 *latest = Some(Instant::now());
1519 },
1520 commutative = manual_proof!(/** TODO */)
1521 ),
1522 );
1523
1524 latest_received
1525 .snapshot(&tick, nondet)
1526 .filter_map(q!(move |latest_received| {
1527 if let Some(latest_received) = latest_received {
1528 if Instant::now().duration_since(latest_received) > duration {
1529 Some(())
1530 } else {
1531 None
1532 }
1533 } else {
1534 Some(())
1535 }
1536 }))
1537 .latest()
1538 }
1539
1540 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1541 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1542 ///
1543 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1544 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1545 /// argument that declares where the stream will be atomically processed. Batching a stream into
1546 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1547 /// [`Tick`] will introduce asynchrony.
1548 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1549 let out_location = Atomic { tick: tick.clone() };
1550 Stream::new(
1551 out_location.clone(),
1552 HydroNode::BeginAtomic {
1553 inner: Box::new(self.ir_node.into_inner()),
1554 metadata: out_location
1555 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1556 },
1557 )
1558 }
1559
1560 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1561 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1562 /// the order of the input. The output stream will execute in the [`Tick`] that was
1563 /// used to create the atomic section.
1564 ///
1565 /// # Non-Determinism
1566 /// The batch boundaries are non-deterministic and may change across executions.
1567 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1568 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1569 Stream::new(
1570 tick.clone(),
1571 HydroNode::Batch {
1572 inner: Box::new(self.ir_node.into_inner()),
1573 metadata: tick
1574 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1575 },
1576 )
1577 }
1578
1579 /// An operator which allows you to "name" a `HydroNode`.
1580 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1581 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1582 {
1583 let mut node = self.ir_node.borrow_mut();
1584 let metadata = node.metadata_mut();
1585 metadata.tag = Some(name.to_owned());
1586 }
1587 self
1588 }
1589
1590 /// Explicitly "casts" the stream to a type with a different ordering
1591 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1592 /// by the type-system.
1593 ///
1594 /// # Non-Determinism
1595 /// This function is used as an escape hatch, and any mistakes in the
1596 /// provided ordering guarantee will propagate into the guarantees
1597 /// for the rest of the program.
1598 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1599 if O::ORDERING_KIND == O2::ORDERING_KIND {
1600 Stream::new(self.location, self.ir_node.into_inner())
1601 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1602 // We can always weaken the ordering guarantee
1603 Stream::new(
1604 self.location.clone(),
1605 HydroNode::Cast {
1606 inner: Box::new(self.ir_node.into_inner()),
1607 metadata: self
1608 .location
1609 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1610 },
1611 )
1612 } else {
1613 Stream::new(
1614 self.location.clone(),
1615 HydroNode::ObserveNonDet {
1616 inner: Box::new(self.ir_node.into_inner()),
1617 trusted: false,
1618 metadata: self
1619 .location
1620 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1621 },
1622 )
1623 }
1624 }
1625
1626 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1627 // intermediate states will not be revealed
1628 fn assume_ordering_trusted_bounded<O2: Ordering>(
1629 self,
1630 nondet: NonDet,
1631 ) -> Stream<T, L, B, O2, R> {
1632 if B::BOUNDED {
1633 self.assume_ordering_trusted(nondet)
1634 } else {
1635 self.assume_ordering(nondet)
1636 }
1637 }
1638
1639 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1640 // is not observable
1641 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1642 self,
1643 _nondet: NonDet,
1644 ) -> Stream<T, L, B, O2, R> {
1645 if O::ORDERING_KIND == O2::ORDERING_KIND {
1646 Stream::new(self.location, self.ir_node.into_inner())
1647 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1648 // We can always weaken the ordering guarantee
1649 Stream::new(
1650 self.location.clone(),
1651 HydroNode::Cast {
1652 inner: Box::new(self.ir_node.into_inner()),
1653 metadata: self
1654 .location
1655 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1656 },
1657 )
1658 } else {
1659 Stream::new(
1660 self.location.clone(),
1661 HydroNode::ObserveNonDet {
1662 inner: Box::new(self.ir_node.into_inner()),
1663 trusted: true,
1664 metadata: self
1665 .location
1666 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1667 },
1668 )
1669 }
1670 }
1671
1672 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1673 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1674 /// which is always safe because that is the weakest possible guarantee.
1675 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1676 self.weaken_ordering::<NoOrder>()
1677 }
1678
1679 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1680 /// enforcing that `O2` is weaker than the input ordering guarantee.
1681 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1682 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1683 self.assume_ordering::<O2>(nondet)
1684 }
1685
1686 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1687 /// implies that `O == TotalOrder`.
1688 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1689 where
1690 O: IsOrdered,
1691 {
1692 self.assume_ordering(nondet!(/** no-op */))
1693 }
1694
1695 /// Explicitly "casts" the stream to a type with a different retries
1696 /// guarantee. Useful in unsafe code where the lack of retries cannot
1697 /// be proven by the type-system.
1698 ///
1699 /// # Non-Determinism
1700 /// This function is used as an escape hatch, and any mistakes in the
1701 /// provided retries guarantee will propagate into the guarantees
1702 /// for the rest of the program.
1703 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1704 if R::RETRIES_KIND == R2::RETRIES_KIND {
1705 Stream::new(self.location, self.ir_node.into_inner())
1706 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1707 // We can always weaken the retries guarantee
1708 Stream::new(
1709 self.location.clone(),
1710 HydroNode::Cast {
1711 inner: Box::new(self.ir_node.into_inner()),
1712 metadata: self
1713 .location
1714 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1715 },
1716 )
1717 } else {
1718 Stream::new(
1719 self.location.clone(),
1720 HydroNode::ObserveNonDet {
1721 inner: Box::new(self.ir_node.into_inner()),
1722 trusted: false,
1723 metadata: self
1724 .location
1725 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1726 },
1727 )
1728 }
1729 }
1730
1731 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1732 // is not observable
1733 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1734 if R::RETRIES_KIND == R2::RETRIES_KIND {
1735 Stream::new(self.location, self.ir_node.into_inner())
1736 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1737 // We can always weaken the retries guarantee
1738 Stream::new(
1739 self.location.clone(),
1740 HydroNode::Cast {
1741 inner: Box::new(self.ir_node.into_inner()),
1742 metadata: self
1743 .location
1744 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1745 },
1746 )
1747 } else {
1748 Stream::new(
1749 self.location.clone(),
1750 HydroNode::ObserveNonDet {
1751 inner: Box::new(self.ir_node.into_inner()),
1752 trusted: true,
1753 metadata: self
1754 .location
1755 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1756 },
1757 )
1758 }
1759 }
1760
1761 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1762 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1763 /// which is always safe because that is the weakest possible guarantee.
1764 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1765 self.weaken_retries::<AtLeastOnce>()
1766 }
1767
1768 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1769 /// enforcing that `R2` is weaker than the input retries guarantee.
1770 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1771 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1772 self.assume_retries::<R2>(nondet)
1773 }
1774
1775 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1776 /// implies that `R == ExactlyOnce`.
1777 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1778 where
1779 R: IsExactlyOnce,
1780 {
1781 self.assume_retries(nondet!(/** no-op */))
1782 }
1783
1784 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1785 /// implies that `B == Bounded`.
1786 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1787 where
1788 B: IsBounded,
1789 {
1790 Stream::new(self.location, self.ir_node.into_inner())
1791 }
1792}
1793
1794impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1795where
1796 L: Location<'a>,
1797{
1798 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1799 ///
1800 /// # Example
1801 /// ```rust
1802 /// # #[cfg(feature = "deploy")] {
1803 /// # use hydro_lang::prelude::*;
1804 /// # use futures::StreamExt;
1805 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1806 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1807 /// # }, |mut stream| async move {
1808 /// // 1, 2, 3
1809 /// # for w in vec![1, 2, 3] {
1810 /// # assert_eq!(stream.next().await.unwrap(), w);
1811 /// # }
1812 /// # }));
1813 /// # }
1814 /// ```
1815 pub fn cloned(self) -> Stream<T, L, B, O, R>
1816 where
1817 T: Clone,
1818 {
1819 self.map(q!(|d| d.clone()))
1820 }
1821}
1822
1823impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1824where
1825 L: Location<'a>,
1826{
1827 /// Computes the number of elements in the stream as a [`Singleton`].
1828 ///
1829 /// # Example
1830 /// ```rust
1831 /// # #[cfg(feature = "deploy")] {
1832 /// # use hydro_lang::prelude::*;
1833 /// # use futures::StreamExt;
1834 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1835 /// let tick = process.tick();
1836 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1837 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1838 /// batch.count().all_ticks()
1839 /// # }, |mut stream| async move {
1840 /// // 4
1841 /// # assert_eq!(stream.next().await.unwrap(), 4);
1842 /// # }));
1843 /// # }
1844 /// ```
1845 pub fn count(self) -> Singleton<usize, L, B> {
1846 self.assume_ordering_trusted::<TotalOrder>(nondet!(
1847 /// Order does not affect eventual count, and also does not affect intermediate states.
1848 ))
1849 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1850 }
1851}
1852
1853impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1854 /// Produces a new stream that interleaves the elements of the two input streams.
1855 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1856 ///
1857 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1858 /// [`Bounded`], you can use [`Stream::chain`] instead.
1859 ///
1860 /// # Example
1861 /// ```rust
1862 /// # #[cfg(feature = "deploy")] {
1863 /// # use hydro_lang::prelude::*;
1864 /// # use futures::StreamExt;
1865 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1866 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1867 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1868 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1869 /// # }, |mut stream| async move {
1870 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1871 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1872 /// # assert_eq!(stream.next().await.unwrap(), w);
1873 /// # }
1874 /// # }));
1875 /// # }
1876 /// ```
1877 pub fn interleave<O2: Ordering, R2: Retries>(
1878 self,
1879 other: Stream<T, L, Unbounded, O2, R2>,
1880 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1881 where
1882 R: MinRetries<R2>,
1883 {
1884 Stream::new(
1885 self.location.clone(),
1886 HydroNode::Chain {
1887 first: Box::new(self.ir_node.into_inner()),
1888 second: Box::new(other.ir_node.into_inner()),
1889 metadata: self.location.new_node_metadata(Stream::<
1890 T,
1891 L,
1892 Unbounded,
1893 NoOrder,
1894 <R as MinRetries<R2>>::Min,
1895 >::collection_kind()),
1896 },
1897 )
1898 }
1899}
1900
1901impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1902 /// Produces a new stream that combines the elements of the two input streams,
1903 /// preserving the relative order of elements within each input.
1904 ///
1905 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1906 /// [`Bounded`], you can use [`Stream::chain`] instead.
1907 ///
1908 /// # Non-Determinism
1909 /// The order in which elements *across* the two streams will be interleaved is
1910 /// non-deterministic, so the order of elements will vary across runs. If the output order
1911 /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1912 /// unordered stream.
1913 ///
1914 /// # Example
1915 /// ```rust
1916 /// # #[cfg(feature = "deploy")] {
1917 /// # use hydro_lang::prelude::*;
1918 /// # use futures::StreamExt;
1919 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1920 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1921 /// # process.source_iter(q!(vec![1, 3])).into();
1922 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1923 /// # }, |mut stream| async move {
1924 /// // 1, 3 and 2, 4 in some order, preserving the original local order
1925 /// # for w in vec![1, 3, 2, 4] {
1926 /// # assert_eq!(stream.next().await.unwrap(), w);
1927 /// # }
1928 /// # }));
1929 /// # }
1930 /// ```
1931 pub fn merge_ordered<R2: Retries>(
1932 self,
1933 other: Stream<T, L, Unbounded, TotalOrder, R2>,
1934 _nondet: NonDet,
1935 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1936 where
1937 R: MinRetries<R2>,
1938 {
1939 Stream::new(
1940 self.location.clone(),
1941 HydroNode::Chain {
1942 first: Box::new(self.ir_node.into_inner()),
1943 second: Box::new(other.ir_node.into_inner()),
1944 metadata: self.location.new_node_metadata(Stream::<
1945 T,
1946 L,
1947 Unbounded,
1948 TotalOrder,
1949 <R as MinRetries<R2>>::Min,
1950 >::collection_kind()),
1951 },
1952 )
1953 }
1954}
1955
1956impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1957where
1958 L: Location<'a>,
1959{
1960 /// Produces a new stream that emits the input elements in sorted order.
1961 ///
1962 /// The input stream can have any ordering guarantee, but the output stream
1963 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1964 /// elements in the input stream are available, so it requires the input stream
1965 /// to be [`Bounded`].
1966 ///
1967 /// # Example
1968 /// ```rust
1969 /// # #[cfg(feature = "deploy")] {
1970 /// # use hydro_lang::prelude::*;
1971 /// # use futures::StreamExt;
1972 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1973 /// let tick = process.tick();
1974 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1975 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1976 /// batch.sort().all_ticks()
1977 /// # }, |mut stream| async move {
1978 /// // 1, 2, 3, 4
1979 /// # for w in (1..5) {
1980 /// # assert_eq!(stream.next().await.unwrap(), w);
1981 /// # }
1982 /// # }));
1983 /// # }
1984 /// ```
1985 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1986 where
1987 B: IsBounded,
1988 T: Ord,
1989 {
1990 let this = self.make_bounded();
1991 Stream::new(
1992 this.location.clone(),
1993 HydroNode::Sort {
1994 input: Box::new(this.ir_node.into_inner()),
1995 metadata: this
1996 .location
1997 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1998 },
1999 )
2000 }
2001
2002 /// Produces a new stream that first emits the elements of the `self` stream,
2003 /// and then emits the elements of the `other` stream. The output stream has
2004 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2005 /// [`TotalOrder`] guarantee.
2006 ///
2007 /// Currently, both input streams must be [`Bounded`]. This operator will block
2008 /// on the first stream until all its elements are available. In a future version,
2009 /// we will relax the requirement on the `other` stream.
2010 ///
2011 /// # Example
2012 /// ```rust
2013 /// # #[cfg(feature = "deploy")] {
2014 /// # use hydro_lang::prelude::*;
2015 /// # use futures::StreamExt;
2016 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2017 /// let tick = process.tick();
2018 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2019 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2020 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2021 /// # }, |mut stream| async move {
2022 /// // 2, 3, 4, 5, 1, 2, 3, 4
2023 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2024 /// # assert_eq!(stream.next().await.unwrap(), w);
2025 /// # }
2026 /// # }));
2027 /// # }
2028 /// ```
2029 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2030 self,
2031 other: Stream<T, L, B2, O2, R2>,
2032 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2033 where
2034 B: IsBounded,
2035 O: MinOrder<O2>,
2036 R: MinRetries<R2>,
2037 {
2038 check_matching_location(&self.location, &other.location);
2039
2040 Stream::new(
2041 self.location.clone(),
2042 HydroNode::Chain {
2043 first: Box::new(self.ir_node.into_inner()),
2044 second: Box::new(other.ir_node.into_inner()),
2045 metadata: self.location.new_node_metadata(Stream::<
2046 T,
2047 L,
2048 B2,
2049 <O as MinOrder<O2>>::Min,
2050 <R as MinRetries<R2>>::Min,
2051 >::collection_kind()),
2052 },
2053 )
2054 }
2055
2056 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2057 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2058 /// because this is compiled into a nested loop.
2059 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2060 self,
2061 other: Stream<T2, L, Bounded, O2, R>,
2062 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2063 where
2064 B: IsBounded,
2065 T: Clone,
2066 T2: Clone,
2067 {
2068 let this = self.make_bounded();
2069 check_matching_location(&this.location, &other.location);
2070
2071 Stream::new(
2072 this.location.clone(),
2073 HydroNode::CrossProduct {
2074 left: Box::new(this.ir_node.into_inner()),
2075 right: Box::new(other.ir_node.into_inner()),
2076 metadata: this.location.new_node_metadata(Stream::<
2077 (T, T2),
2078 L,
2079 Bounded,
2080 <O2 as MinOrder<O>>::Min,
2081 R,
2082 >::collection_kind()),
2083 },
2084 )
2085 }
2086
2087 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2088 /// `self` used as the values for *each* key.
2089 ///
2090 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2091 /// values. For example, it can be used to send the same set of elements to several cluster
2092 /// members, if the membership information is available as a [`KeyedSingleton`].
2093 ///
2094 /// # Example
2095 /// ```rust
2096 /// # #[cfg(feature = "deploy")] {
2097 /// # use hydro_lang::prelude::*;
2098 /// # use futures::StreamExt;
2099 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2100 /// # let tick = process.tick();
2101 /// let keyed_singleton = // { 1: (), 2: () }
2102 /// # process
2103 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2104 /// # .into_keyed()
2105 /// # .batch(&tick, nondet!(/** test */))
2106 /// # .first();
2107 /// let stream = // [ "a", "b" ]
2108 /// # process
2109 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2110 /// # .batch(&tick, nondet!(/** test */));
2111 /// stream.repeat_with_keys(keyed_singleton)
2112 /// # .entries().all_ticks()
2113 /// # }, |mut stream| async move {
2114 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2115 /// # let mut results = Vec::new();
2116 /// # for _ in 0..4 {
2117 /// # results.push(stream.next().await.unwrap());
2118 /// # }
2119 /// # results.sort();
2120 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2121 /// # }));
2122 /// # }
2123 /// ```
2124 pub fn repeat_with_keys<K, V2>(
2125 self,
2126 keys: KeyedSingleton<K, V2, L, Bounded>,
2127 ) -> KeyedStream<K, T, L, Bounded, O, R>
2128 where
2129 B: IsBounded,
2130 K: Clone,
2131 T: Clone,
2132 {
2133 keys.keys()
2134 .weaken_retries()
2135 .assume_ordering_trusted::<TotalOrder>(
2136 nondet!(/** keyed stream does not depend on ordering of keys */),
2137 )
2138 .cross_product_nested_loop(self.make_bounded())
2139 .into_keyed()
2140 }
2141}
2142
2143impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2144where
2145 L: Location<'a>,
2146{
2147 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2148 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2149 /// by equi-joining the two streams on the key attribute `K`.
2150 ///
2151 /// # Example
2152 /// ```rust
2153 /// # #[cfg(feature = "deploy")] {
2154 /// # use hydro_lang::prelude::*;
2155 /// # use std::collections::HashSet;
2156 /// # use futures::StreamExt;
2157 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2158 /// let tick = process.tick();
2159 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2160 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2161 /// stream1.join(stream2)
2162 /// # }, |mut stream| async move {
2163 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2164 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2165 /// # stream.map(|i| assert!(expected.contains(&i)));
2166 /// # }));
2167 /// # }
2168 pub fn join<V2, O2: Ordering, R2: Retries>(
2169 self,
2170 n: Stream<(K, V2), L, B, O2, R2>,
2171 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2172 where
2173 K: Eq + Hash,
2174 R: MinRetries<R2>,
2175 {
2176 check_matching_location(&self.location, &n.location);
2177
2178 Stream::new(
2179 self.location.clone(),
2180 HydroNode::Join {
2181 left: Box::new(self.ir_node.into_inner()),
2182 right: Box::new(n.ir_node.into_inner()),
2183 metadata: self.location.new_node_metadata(Stream::<
2184 (K, (V1, V2)),
2185 L,
2186 B,
2187 NoOrder,
2188 <R as MinRetries<R2>>::Min,
2189 >::collection_kind()),
2190 },
2191 )
2192 }
2193
2194 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2195 /// computes the anti-join of the items in the input -- i.e. returns
2196 /// unique items in the first input that do not have a matching key
2197 /// in the second input.
2198 ///
2199 /// # Example
2200 /// ```rust
2201 /// # #[cfg(feature = "deploy")] {
2202 /// # use hydro_lang::prelude::*;
2203 /// # use futures::StreamExt;
2204 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2205 /// let tick = process.tick();
2206 /// let stream = process
2207 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2208 /// .batch(&tick, nondet!(/** test */));
2209 /// let batch = process
2210 /// .source_iter(q!(vec![1, 2]))
2211 /// .batch(&tick, nondet!(/** test */));
2212 /// stream.anti_join(batch).all_ticks()
2213 /// # }, |mut stream| async move {
2214 /// # for w in vec![(3, 'c'), (4, 'd')] {
2215 /// # assert_eq!(stream.next().await.unwrap(), w);
2216 /// # }
2217 /// # }));
2218 /// # }
2219 pub fn anti_join<O2: Ordering, R2: Retries>(
2220 self,
2221 n: Stream<K, L, Bounded, O2, R2>,
2222 ) -> Stream<(K, V1), L, B, O, R>
2223 where
2224 K: Eq + Hash,
2225 {
2226 check_matching_location(&self.location, &n.location);
2227
2228 Stream::new(
2229 self.location.clone(),
2230 HydroNode::AntiJoin {
2231 pos: Box::new(self.ir_node.into_inner()),
2232 neg: Box::new(n.ir_node.into_inner()),
2233 metadata: self
2234 .location
2235 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2236 },
2237 )
2238 }
2239}
2240
2241impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2242 Stream<(K, V), L, B, O, R>
2243{
2244 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2245 /// is used as the key and the second element is added to the entries associated with that key.
2246 ///
2247 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2248 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2249 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2250 /// total ordering _within_ each group but no ordering _across_ groups.
2251 ///
2252 /// # Example
2253 /// ```rust
2254 /// # #[cfg(feature = "deploy")] {
2255 /// # use hydro_lang::prelude::*;
2256 /// # use futures::StreamExt;
2257 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258 /// process
2259 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2260 /// .into_keyed()
2261 /// # .entries()
2262 /// # }, |mut stream| async move {
2263 /// // { 1: [2, 3], 2: [4] }
2264 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2265 /// # assert_eq!(stream.next().await.unwrap(), w);
2266 /// # }
2267 /// # }));
2268 /// # }
2269 /// ```
2270 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2271 KeyedStream::new(
2272 self.location.clone(),
2273 HydroNode::Cast {
2274 inner: Box::new(self.ir_node.into_inner()),
2275 metadata: self
2276 .location
2277 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2278 },
2279 )
2280 }
2281}
2282
2283impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2284where
2285 K: Eq + Hash,
2286 L: Location<'a>,
2287{
2288 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2289 /// # Example
2290 /// ```rust
2291 /// # #[cfg(feature = "deploy")] {
2292 /// # use hydro_lang::prelude::*;
2293 /// # use futures::StreamExt;
2294 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2295 /// let tick = process.tick();
2296 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2297 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2298 /// batch.keys().all_ticks()
2299 /// # }, |mut stream| async move {
2300 /// // 1, 2
2301 /// # assert_eq!(stream.next().await.unwrap(), 1);
2302 /// # assert_eq!(stream.next().await.unwrap(), 2);
2303 /// # }));
2304 /// # }
2305 /// ```
2306 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2307 self.into_keyed()
2308 .fold(
2309 q!(|| ()),
2310 q!(
2311 |_, _| {},
2312 commutative = manual_proof!(/** values are ignored */),
2313 idempotent = manual_proof!(/** values are ignored */)
2314 ),
2315 )
2316 .keys()
2317 }
2318}
2319
2320impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2321where
2322 L: Location<'a> + NoTick,
2323{
2324 /// Returns a stream corresponding to the latest batch of elements being atomically
2325 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2326 /// the order of the input.
2327 ///
2328 /// # Non-Determinism
2329 /// The batch boundaries are non-deterministic and may change across executions.
2330 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2331 Stream::new(
2332 self.location.clone().tick,
2333 HydroNode::Batch {
2334 inner: Box::new(self.ir_node.into_inner()),
2335 metadata: self
2336 .location
2337 .tick
2338 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2339 },
2340 )
2341 }
2342
2343 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2344 /// See [`Stream::atomic`] for more details.
2345 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2346 Stream::new(
2347 self.location.tick.l.clone(),
2348 HydroNode::EndAtomic {
2349 inner: Box::new(self.ir_node.into_inner()),
2350 metadata: self
2351 .location
2352 .tick
2353 .l
2354 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2355 },
2356 )
2357 }
2358}
2359
2360impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2361where
2362 L: Location<'a> + NoTick + NoAtomic,
2363 F: Future<Output = T>,
2364{
2365 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2366 /// Future outputs are produced as available, regardless of input arrival order.
2367 ///
2368 /// # Example
2369 /// ```rust
2370 /// # #[cfg(feature = "deploy")] {
2371 /// # use std::collections::HashSet;
2372 /// # use futures::StreamExt;
2373 /// # use hydro_lang::prelude::*;
2374 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2375 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2376 /// .map(q!(|x| async move {
2377 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2378 /// x
2379 /// }))
2380 /// .resolve_futures()
2381 /// # },
2382 /// # |mut stream| async move {
2383 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2384 /// # let mut output = HashSet::new();
2385 /// # for _ in 1..10 {
2386 /// # output.insert(stream.next().await.unwrap());
2387 /// # }
2388 /// # assert_eq!(
2389 /// # output,
2390 /// # HashSet::<i32>::from_iter(1..10)
2391 /// # );
2392 /// # },
2393 /// # ));
2394 /// # }
2395 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2396 Stream::new(
2397 self.location.clone(),
2398 HydroNode::ResolveFutures {
2399 input: Box::new(self.ir_node.into_inner()),
2400 metadata: self
2401 .location
2402 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2403 },
2404 )
2405 }
2406
2407 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2408 /// Future outputs are produced in the same order as the input stream.
2409 ///
2410 /// # Example
2411 /// ```rust
2412 /// # #[cfg(feature = "deploy")] {
2413 /// # use std::collections::HashSet;
2414 /// # use futures::StreamExt;
2415 /// # use hydro_lang::prelude::*;
2416 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2417 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2418 /// .map(q!(|x| async move {
2419 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2420 /// x
2421 /// }))
2422 /// .resolve_futures_ordered()
2423 /// # },
2424 /// # |mut stream| async move {
2425 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2426 /// # let mut output = Vec::new();
2427 /// # for _ in 1..10 {
2428 /// # output.push(stream.next().await.unwrap());
2429 /// # }
2430 /// # assert_eq!(
2431 /// # output,
2432 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2433 /// # );
2434 /// # },
2435 /// # ));
2436 /// # }
2437 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2438 Stream::new(
2439 self.location.clone(),
2440 HydroNode::ResolveFuturesOrdered {
2441 input: Box::new(self.ir_node.into_inner()),
2442 metadata: self
2443 .location
2444 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2445 },
2446 )
2447 }
2448}
2449
2450impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2451where
2452 L: Location<'a>,
2453{
2454 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2455 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2456 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2457 Stream::new(
2458 self.location.outer().clone(),
2459 HydroNode::YieldConcat {
2460 inner: Box::new(self.ir_node.into_inner()),
2461 metadata: self
2462 .location
2463 .outer()
2464 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2465 },
2466 )
2467 }
2468
2469 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2470 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2471 ///
2472 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2473 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2474 /// stream's [`Tick`] context.
2475 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2476 let out_location = Atomic {
2477 tick: self.location.clone(),
2478 };
2479
2480 Stream::new(
2481 out_location.clone(),
2482 HydroNode::YieldConcat {
2483 inner: Box::new(self.ir_node.into_inner()),
2484 metadata: out_location
2485 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2486 },
2487 )
2488 }
2489
2490 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2491 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2492 /// input.
2493 ///
2494 /// This API is particularly useful for stateful computation on batches of data, such as
2495 /// maintaining an accumulated state that is up to date with the current batch.
2496 ///
2497 /// # Example
2498 /// ```rust
2499 /// # #[cfg(feature = "deploy")] {
2500 /// # use hydro_lang::prelude::*;
2501 /// # use futures::StreamExt;
2502 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2503 /// let tick = process.tick();
2504 /// # // ticks are lazy by default, forces the second tick to run
2505 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2506 /// # let batch_first_tick = process
2507 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2508 /// # .batch(&tick, nondet!(/** test */));
2509 /// # let batch_second_tick = process
2510 /// # .source_iter(q!(vec![5, 6, 7]))
2511 /// # .batch(&tick, nondet!(/** test */))
2512 /// # .defer_tick(); // appears on the second tick
2513 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2514 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2515 ///
2516 /// input.batch(&tick, nondet!(/** test */))
2517 /// .across_ticks(|s| s.count()).all_ticks()
2518 /// # }, |mut stream| async move {
2519 /// // [4, 7]
2520 /// assert_eq!(stream.next().await.unwrap(), 4);
2521 /// assert_eq!(stream.next().await.unwrap(), 7);
2522 /// # }));
2523 /// # }
2524 /// ```
2525 pub fn across_ticks<Out: BatchAtomic>(
2526 self,
2527 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2528 ) -> Out::Batched {
2529 thunk(self.all_ticks_atomic()).batched_atomic()
2530 }
2531
2532 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2533 /// always has the elements of `self` at tick `T - 1`.
2534 ///
2535 /// At tick `0`, the output stream is empty, since there is no previous tick.
2536 ///
2537 /// This operator enables stateful iterative processing with ticks, by sending data from one
2538 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2539 ///
2540 /// # Example
2541 /// ```rust
2542 /// # #[cfg(feature = "deploy")] {
2543 /// # use hydro_lang::prelude::*;
2544 /// # use futures::StreamExt;
2545 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2546 /// let tick = process.tick();
2547 /// // ticks are lazy by default, forces the second tick to run
2548 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2549 ///
2550 /// let batch_first_tick = process
2551 /// .source_iter(q!(vec![1, 2, 3, 4]))
2552 /// .batch(&tick, nondet!(/** test */));
2553 /// let batch_second_tick = process
2554 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2555 /// .batch(&tick, nondet!(/** test */))
2556 /// .defer_tick(); // appears on the second tick
2557 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2558 ///
2559 /// changes_across_ticks.clone().filter_not_in(
2560 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2561 /// ).all_ticks()
2562 /// # }, |mut stream| async move {
2563 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2564 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2565 /// # assert_eq!(stream.next().await.unwrap(), w);
2566 /// # }
2567 /// # }));
2568 /// # }
2569 /// ```
2570 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2571 Stream::new(
2572 self.location.clone(),
2573 HydroNode::DeferTick {
2574 input: Box::new(self.ir_node.into_inner()),
2575 metadata: self
2576 .location
2577 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2578 },
2579 )
2580 }
2581}
2582
2583#[cfg(test)]
2584mod tests {
2585 #[cfg(feature = "deploy")]
2586 use futures::{SinkExt, StreamExt};
2587 #[cfg(feature = "deploy")]
2588 use hydro_deploy::Deployment;
2589 #[cfg(feature = "deploy")]
2590 use serde::{Deserialize, Serialize};
2591 #[cfg(any(feature = "deploy", feature = "sim"))]
2592 use stageleft::q;
2593
2594 #[cfg(any(feature = "deploy", feature = "sim"))]
2595 use crate::compile::builder::FlowBuilder;
2596 #[cfg(feature = "deploy")]
2597 use crate::live_collections::sliced::sliced;
2598 #[cfg(feature = "deploy")]
2599 use crate::live_collections::stream::ExactlyOnce;
2600 #[cfg(feature = "sim")]
2601 use crate::live_collections::stream::NoOrder;
2602 #[cfg(any(feature = "deploy", feature = "sim"))]
2603 use crate::live_collections::stream::TotalOrder;
2604 #[cfg(any(feature = "deploy", feature = "sim"))]
2605 use crate::location::Location;
2606 #[cfg(any(feature = "deploy", feature = "sim"))]
2607 use crate::nondet::nondet;
2608
2609 mod backtrace_chained_ops;
2610
2611 #[cfg(feature = "deploy")]
2612 struct P1 {}
2613 #[cfg(feature = "deploy")]
2614 struct P2 {}
2615
2616 #[cfg(feature = "deploy")]
2617 #[derive(Serialize, Deserialize, Debug)]
2618 struct SendOverNetwork {
2619 n: u32,
2620 }
2621
2622 #[cfg(feature = "deploy")]
2623 #[tokio::test]
2624 async fn first_ten_distributed() {
2625 use crate::networking::TCP;
2626
2627 let mut deployment = Deployment::new();
2628
2629 let mut flow = FlowBuilder::new();
2630 let first_node = flow.process::<P1>();
2631 let second_node = flow.process::<P2>();
2632 let external = flow.external::<P2>();
2633
2634 let numbers = first_node.source_iter(q!(0..10));
2635 let out_port = numbers
2636 .map(q!(|n| SendOverNetwork { n }))
2637 .send(&second_node, TCP.fail_stop().bincode())
2638 .send_bincode_external(&external);
2639
2640 let nodes = flow
2641 .with_process(&first_node, deployment.Localhost())
2642 .with_process(&second_node, deployment.Localhost())
2643 .with_external(&external, deployment.Localhost())
2644 .deploy(&mut deployment);
2645
2646 deployment.deploy().await.unwrap();
2647
2648 let mut external_out = nodes.connect(out_port).await;
2649
2650 deployment.start().await.unwrap();
2651
2652 for i in 0..10 {
2653 assert_eq!(external_out.next().await.unwrap().n, i);
2654 }
2655 }
2656
2657 #[cfg(feature = "deploy")]
2658 #[tokio::test]
2659 async fn first_cardinality() {
2660 let mut deployment = Deployment::new();
2661
2662 let mut flow = FlowBuilder::new();
2663 let node = flow.process::<()>();
2664 let external = flow.external::<()>();
2665
2666 let node_tick = node.tick();
2667 let count = node_tick
2668 .singleton(q!([1, 2, 3]))
2669 .into_stream()
2670 .flatten_ordered()
2671 .first()
2672 .into_stream()
2673 .count()
2674 .all_ticks()
2675 .send_bincode_external(&external);
2676
2677 let nodes = flow
2678 .with_process(&node, deployment.Localhost())
2679 .with_external(&external, deployment.Localhost())
2680 .deploy(&mut deployment);
2681
2682 deployment.deploy().await.unwrap();
2683
2684 let mut external_out = nodes.connect(count).await;
2685
2686 deployment.start().await.unwrap();
2687
2688 assert_eq!(external_out.next().await.unwrap(), 1);
2689 }
2690
2691 #[cfg(feature = "deploy")]
2692 #[tokio::test]
2693 async fn unbounded_reduce_remembers_state() {
2694 let mut deployment = Deployment::new();
2695
2696 let mut flow = FlowBuilder::new();
2697 let node = flow.process::<()>();
2698 let external = flow.external::<()>();
2699
2700 let (input_port, input) = node.source_external_bincode(&external);
2701 let out = input
2702 .reduce(q!(|acc, v| *acc += v))
2703 .sample_eager(nondet!(/** test */))
2704 .send_bincode_external(&external);
2705
2706 let nodes = flow
2707 .with_process(&node, deployment.Localhost())
2708 .with_external(&external, deployment.Localhost())
2709 .deploy(&mut deployment);
2710
2711 deployment.deploy().await.unwrap();
2712
2713 let mut external_in = nodes.connect(input_port).await;
2714 let mut external_out = nodes.connect(out).await;
2715
2716 deployment.start().await.unwrap();
2717
2718 external_in.send(1).await.unwrap();
2719 assert_eq!(external_out.next().await.unwrap(), 1);
2720
2721 external_in.send(2).await.unwrap();
2722 assert_eq!(external_out.next().await.unwrap(), 3);
2723 }
2724
2725 #[cfg(feature = "deploy")]
2726 #[tokio::test]
2727 async fn top_level_bounded_cross_singleton() {
2728 let mut deployment = Deployment::new();
2729
2730 let mut flow = FlowBuilder::new();
2731 let node = flow.process::<()>();
2732 let external = flow.external::<()>();
2733
2734 let (input_port, input) =
2735 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2736
2737 let out = input
2738 .cross_singleton(
2739 node.source_iter(q!(vec![1, 2, 3]))
2740 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2741 )
2742 .send_bincode_external(&external);
2743
2744 let nodes = flow
2745 .with_process(&node, deployment.Localhost())
2746 .with_external(&external, deployment.Localhost())
2747 .deploy(&mut deployment);
2748
2749 deployment.deploy().await.unwrap();
2750
2751 let mut external_in = nodes.connect(input_port).await;
2752 let mut external_out = nodes.connect(out).await;
2753
2754 deployment.start().await.unwrap();
2755
2756 external_in.send(1).await.unwrap();
2757 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2758
2759 external_in.send(2).await.unwrap();
2760 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2761 }
2762
2763 #[cfg(feature = "deploy")]
2764 #[tokio::test]
2765 async fn top_level_bounded_reduce_cardinality() {
2766 let mut deployment = Deployment::new();
2767
2768 let mut flow = FlowBuilder::new();
2769 let node = flow.process::<()>();
2770 let external = flow.external::<()>();
2771
2772 let (input_port, input) =
2773 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2774
2775 let out = sliced! {
2776 let input = use(input, nondet!(/** test */));
2777 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2778 input.cross_singleton(v.into_stream().count())
2779 }
2780 .send_bincode_external(&external);
2781
2782 let nodes = flow
2783 .with_process(&node, deployment.Localhost())
2784 .with_external(&external, deployment.Localhost())
2785 .deploy(&mut deployment);
2786
2787 deployment.deploy().await.unwrap();
2788
2789 let mut external_in = nodes.connect(input_port).await;
2790 let mut external_out = nodes.connect(out).await;
2791
2792 deployment.start().await.unwrap();
2793
2794 external_in.send(1).await.unwrap();
2795 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2796
2797 external_in.send(2).await.unwrap();
2798 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2799 }
2800
2801 #[cfg(feature = "deploy")]
2802 #[tokio::test]
2803 async fn top_level_bounded_into_singleton_cardinality() {
2804 let mut deployment = Deployment::new();
2805
2806 let mut flow = FlowBuilder::new();
2807 let node = flow.process::<()>();
2808 let external = flow.external::<()>();
2809
2810 let (input_port, input) =
2811 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2812
2813 let out = sliced! {
2814 let input = use(input, nondet!(/** test */));
2815 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2816 input.cross_singleton(v.into_stream().count())
2817 }
2818 .send_bincode_external(&external);
2819
2820 let nodes = flow
2821 .with_process(&node, deployment.Localhost())
2822 .with_external(&external, deployment.Localhost())
2823 .deploy(&mut deployment);
2824
2825 deployment.deploy().await.unwrap();
2826
2827 let mut external_in = nodes.connect(input_port).await;
2828 let mut external_out = nodes.connect(out).await;
2829
2830 deployment.start().await.unwrap();
2831
2832 external_in.send(1).await.unwrap();
2833 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2834
2835 external_in.send(2).await.unwrap();
2836 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2837 }
2838
2839 #[cfg(feature = "deploy")]
2840 #[tokio::test]
2841 async fn atomic_fold_replays_each_tick() {
2842 let mut deployment = Deployment::new();
2843
2844 let mut flow = FlowBuilder::new();
2845 let node = flow.process::<()>();
2846 let external = flow.external::<()>();
2847
2848 let (input_port, input) =
2849 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2850 let tick = node.tick();
2851
2852 let out = input
2853 .batch(&tick, nondet!(/** test */))
2854 .cross_singleton(
2855 node.source_iter(q!(vec![1, 2, 3]))
2856 .atomic(&tick)
2857 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2858 .snapshot_atomic(nondet!(/** test */)),
2859 )
2860 .all_ticks()
2861 .send_bincode_external(&external);
2862
2863 let nodes = flow
2864 .with_process(&node, deployment.Localhost())
2865 .with_external(&external, deployment.Localhost())
2866 .deploy(&mut deployment);
2867
2868 deployment.deploy().await.unwrap();
2869
2870 let mut external_in = nodes.connect(input_port).await;
2871 let mut external_out = nodes.connect(out).await;
2872
2873 deployment.start().await.unwrap();
2874
2875 external_in.send(1).await.unwrap();
2876 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2877
2878 external_in.send(2).await.unwrap();
2879 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2880 }
2881
2882 #[cfg(feature = "deploy")]
2883 #[tokio::test]
2884 async fn unbounded_scan_remembers_state() {
2885 let mut deployment = Deployment::new();
2886
2887 let mut flow = FlowBuilder::new();
2888 let node = flow.process::<()>();
2889 let external = flow.external::<()>();
2890
2891 let (input_port, input) = node.source_external_bincode(&external);
2892 let out = input
2893 .scan(
2894 q!(|| 0),
2895 q!(|acc, v| {
2896 *acc += v;
2897 Some(*acc)
2898 }),
2899 )
2900 .send_bincode_external(&external);
2901
2902 let nodes = flow
2903 .with_process(&node, deployment.Localhost())
2904 .with_external(&external, deployment.Localhost())
2905 .deploy(&mut deployment);
2906
2907 deployment.deploy().await.unwrap();
2908
2909 let mut external_in = nodes.connect(input_port).await;
2910 let mut external_out = nodes.connect(out).await;
2911
2912 deployment.start().await.unwrap();
2913
2914 external_in.send(1).await.unwrap();
2915 assert_eq!(external_out.next().await.unwrap(), 1);
2916
2917 external_in.send(2).await.unwrap();
2918 assert_eq!(external_out.next().await.unwrap(), 3);
2919 }
2920
2921 #[cfg(feature = "deploy")]
2922 #[tokio::test]
2923 async fn unbounded_enumerate_remembers_state() {
2924 let mut deployment = Deployment::new();
2925
2926 let mut flow = FlowBuilder::new();
2927 let node = flow.process::<()>();
2928 let external = flow.external::<()>();
2929
2930 let (input_port, input) = node.source_external_bincode(&external);
2931 let out = input.enumerate().send_bincode_external(&external);
2932
2933 let nodes = flow
2934 .with_process(&node, deployment.Localhost())
2935 .with_external(&external, deployment.Localhost())
2936 .deploy(&mut deployment);
2937
2938 deployment.deploy().await.unwrap();
2939
2940 let mut external_in = nodes.connect(input_port).await;
2941 let mut external_out = nodes.connect(out).await;
2942
2943 deployment.start().await.unwrap();
2944
2945 external_in.send(1).await.unwrap();
2946 assert_eq!(external_out.next().await.unwrap(), (0, 1));
2947
2948 external_in.send(2).await.unwrap();
2949 assert_eq!(external_out.next().await.unwrap(), (1, 2));
2950 }
2951
2952 #[cfg(feature = "deploy")]
2953 #[tokio::test]
2954 async fn unbounded_unique_remembers_state() {
2955 let mut deployment = Deployment::new();
2956
2957 let mut flow = FlowBuilder::new();
2958 let node = flow.process::<()>();
2959 let external = flow.external::<()>();
2960
2961 let (input_port, input) =
2962 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2963 let out = input.unique().send_bincode_external(&external);
2964
2965 let nodes = flow
2966 .with_process(&node, deployment.Localhost())
2967 .with_external(&external, deployment.Localhost())
2968 .deploy(&mut deployment);
2969
2970 deployment.deploy().await.unwrap();
2971
2972 let mut external_in = nodes.connect(input_port).await;
2973 let mut external_out = nodes.connect(out).await;
2974
2975 deployment.start().await.unwrap();
2976
2977 external_in.send(1).await.unwrap();
2978 assert_eq!(external_out.next().await.unwrap(), 1);
2979
2980 external_in.send(2).await.unwrap();
2981 assert_eq!(external_out.next().await.unwrap(), 2);
2982
2983 external_in.send(1).await.unwrap();
2984 external_in.send(3).await.unwrap();
2985 assert_eq!(external_out.next().await.unwrap(), 3);
2986 }
2987
2988 #[cfg(feature = "sim")]
2989 #[test]
2990 #[should_panic]
2991 fn sim_batch_nondet_size() {
2992 let mut flow = FlowBuilder::new();
2993 let node = flow.process::<()>();
2994
2995 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2996
2997 let tick = node.tick();
2998 let out_recv = input
2999 .batch(&tick, nondet!(/** test */))
3000 .count()
3001 .all_ticks()
3002 .sim_output();
3003
3004 flow.sim().exhaustive(async || {
3005 in_send.send(());
3006 in_send.send(());
3007 in_send.send(());
3008
3009 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3010 });
3011 }
3012
3013 #[cfg(feature = "sim")]
3014 #[test]
3015 fn sim_batch_preserves_order() {
3016 let mut flow = FlowBuilder::new();
3017 let node = flow.process::<()>();
3018
3019 let (in_send, input) = node.sim_input();
3020
3021 let tick = node.tick();
3022 let out_recv = input
3023 .batch(&tick, nondet!(/** test */))
3024 .all_ticks()
3025 .sim_output();
3026
3027 flow.sim().exhaustive(async || {
3028 in_send.send(1);
3029 in_send.send(2);
3030 in_send.send(3);
3031
3032 out_recv.assert_yields_only([1, 2, 3]).await;
3033 });
3034 }
3035
3036 #[cfg(feature = "sim")]
3037 #[test]
3038 #[should_panic]
3039 fn sim_batch_unordered_shuffles() {
3040 let mut flow = FlowBuilder::new();
3041 let node = flow.process::<()>();
3042
3043 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3044
3045 let tick = node.tick();
3046 let batch = input.batch(&tick, nondet!(/** test */));
3047 let out_recv = batch
3048 .clone()
3049 .min()
3050 .zip(batch.max())
3051 .all_ticks()
3052 .sim_output();
3053
3054 flow.sim().exhaustive(async || {
3055 in_send.send_many_unordered([1, 2, 3]);
3056
3057 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3058 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3059 }
3060 });
3061 }
3062
3063 #[cfg(feature = "sim")]
3064 #[test]
3065 fn sim_batch_unordered_shuffles_count() {
3066 let mut flow = FlowBuilder::new();
3067 let node = flow.process::<()>();
3068
3069 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3070
3071 let tick = node.tick();
3072 let batch = input.batch(&tick, nondet!(/** test */));
3073 let out_recv = batch.all_ticks().sim_output();
3074
3075 let instance_count = flow.sim().exhaustive(async || {
3076 in_send.send_many_unordered([1, 2, 3, 4]);
3077 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3078 });
3079
3080 assert_eq!(
3081 instance_count,
3082 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3083 )
3084 }
3085
3086 #[cfg(feature = "sim")]
3087 #[test]
3088 #[should_panic]
3089 fn sim_observe_order_batched() {
3090 let mut flow = FlowBuilder::new();
3091 let node = flow.process::<()>();
3092
3093 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3094
3095 let tick = node.tick();
3096 let batch = input.batch(&tick, nondet!(/** test */));
3097 let out_recv = batch
3098 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3099 .all_ticks()
3100 .sim_output();
3101
3102 flow.sim().exhaustive(async || {
3103 in_send.send_many_unordered([1, 2, 3, 4]);
3104 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3105 });
3106 }
3107
3108 #[cfg(feature = "sim")]
3109 #[test]
3110 fn sim_observe_order_batched_count() {
3111 let mut flow = FlowBuilder::new();
3112 let node = flow.process::<()>();
3113
3114 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3115
3116 let tick = node.tick();
3117 let batch = input.batch(&tick, nondet!(/** test */));
3118 let out_recv = batch
3119 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3120 .all_ticks()
3121 .sim_output();
3122
3123 let instance_count = flow.sim().exhaustive(async || {
3124 in_send.send_many_unordered([1, 2, 3, 4]);
3125 let _ = out_recv.collect::<Vec<_>>().await;
3126 });
3127
3128 assert_eq!(
3129 instance_count,
3130 192 // 4! * 2^{4 - 1}
3131 )
3132 }
3133
3134 #[cfg(feature = "sim")]
3135 #[test]
3136 fn sim_unordered_count_instance_count() {
3137 let mut flow = FlowBuilder::new();
3138 let node = flow.process::<()>();
3139
3140 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3141
3142 let tick = node.tick();
3143 let out_recv = input
3144 .count()
3145 .snapshot(&tick, nondet!(/** test */))
3146 .all_ticks()
3147 .sim_output();
3148
3149 let instance_count = flow.sim().exhaustive(async || {
3150 in_send.send_many_unordered([1, 2, 3, 4]);
3151 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3152 });
3153
3154 assert_eq!(
3155 instance_count,
3156 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3157 )
3158 }
3159
3160 #[cfg(feature = "sim")]
3161 #[test]
3162 fn sim_top_level_assume_ordering() {
3163 let mut flow = FlowBuilder::new();
3164 let node = flow.process::<()>();
3165
3166 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3167
3168 let out_recv = input
3169 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3170 .sim_output();
3171
3172 let instance_count = flow.sim().exhaustive(async || {
3173 in_send.send_many_unordered([1, 2, 3]);
3174 let mut out = out_recv.collect::<Vec<_>>().await;
3175 out.sort();
3176 assert_eq!(out, vec![1, 2, 3]);
3177 });
3178
3179 assert_eq!(instance_count, 6)
3180 }
3181
3182 #[cfg(feature = "sim")]
3183 #[test]
3184 fn sim_top_level_assume_ordering_cycle_back() {
3185 let mut flow = FlowBuilder::new();
3186 let node = flow.process::<()>();
3187
3188 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3189
3190 let (complete_cycle_back, cycle_back) =
3191 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3192 let ordered = input
3193 .interleave(cycle_back)
3194 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3195 complete_cycle_back.complete(
3196 ordered
3197 .clone()
3198 .map(q!(|v| v + 1))
3199 .filter(q!(|v| v % 2 == 1)),
3200 );
3201
3202 let out_recv = ordered.sim_output();
3203
3204 let mut saw = false;
3205 let instance_count = flow.sim().exhaustive(async || {
3206 in_send.send_many_unordered([0, 2]);
3207 let out = out_recv.collect::<Vec<_>>().await;
3208
3209 if out.starts_with(&[0, 1, 2]) {
3210 saw = true;
3211 }
3212 });
3213
3214 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3215 assert_eq!(instance_count, 6)
3216 }
3217
3218 #[cfg(feature = "sim")]
3219 #[test]
3220 fn sim_top_level_assume_ordering_cycle_back_tick() {
3221 let mut flow = FlowBuilder::new();
3222 let node = flow.process::<()>();
3223
3224 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3225
3226 let (complete_cycle_back, cycle_back) =
3227 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3228 let ordered = input
3229 .interleave(cycle_back)
3230 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3231 complete_cycle_back.complete(
3232 ordered
3233 .clone()
3234 .batch(&node.tick(), nondet!(/** test */))
3235 .all_ticks()
3236 .map(q!(|v| v + 1))
3237 .filter(q!(|v| v % 2 == 1)),
3238 );
3239
3240 let out_recv = ordered.sim_output();
3241
3242 let mut saw = false;
3243 let instance_count = flow.sim().exhaustive(async || {
3244 in_send.send_many_unordered([0, 2]);
3245 let out = out_recv.collect::<Vec<_>>().await;
3246
3247 if out.starts_with(&[0, 1, 2]) {
3248 saw = true;
3249 }
3250 });
3251
3252 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3253 assert_eq!(instance_count, 58)
3254 }
3255
3256 #[cfg(feature = "sim")]
3257 #[test]
3258 fn sim_top_level_assume_ordering_multiple() {
3259 let mut flow = FlowBuilder::new();
3260 let node = flow.process::<()>();
3261
3262 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3263 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3264
3265 let (complete_cycle_back, cycle_back) =
3266 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3267 let input1_ordered = input
3268 .clone()
3269 .interleave(cycle_back)
3270 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3271 let foo = input1_ordered
3272 .clone()
3273 .map(q!(|v| v + 3))
3274 .weaken_ordering::<NoOrder>()
3275 .interleave(input2)
3276 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3277
3278 complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3279
3280 let out_recv = input1_ordered.sim_output();
3281
3282 let mut saw = false;
3283 let instance_count = flow.sim().exhaustive(async || {
3284 in_send.send_many_unordered([0, 1]);
3285 let out = out_recv.collect::<Vec<_>>().await;
3286
3287 if out.starts_with(&[0, 3, 1]) {
3288 saw = true;
3289 }
3290 });
3291
3292 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3293 assert_eq!(instance_count, 24)
3294 }
3295
3296 #[cfg(feature = "sim")]
3297 #[test]
3298 fn sim_atomic_assume_ordering_cycle_back() {
3299 let mut flow = FlowBuilder::new();
3300 let node = flow.process::<()>();
3301
3302 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3303
3304 let (complete_cycle_back, cycle_back) =
3305 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3306 let ordered = input
3307 .interleave(cycle_back)
3308 .atomic(&node.tick())
3309 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3310 .end_atomic();
3311 complete_cycle_back.complete(
3312 ordered
3313 .clone()
3314 .map(q!(|v| v + 1))
3315 .filter(q!(|v| v % 2 == 1)),
3316 );
3317
3318 let out_recv = ordered.sim_output();
3319
3320 let instance_count = flow.sim().exhaustive(async || {
3321 in_send.send_many_unordered([0, 2]);
3322 let out = out_recv.collect::<Vec<_>>().await;
3323 assert_eq!(out.len(), 4);
3324 });
3325
3326 assert_eq!(instance_count, 22)
3327 }
3328
3329 #[cfg(feature = "deploy")]
3330 #[tokio::test]
3331 async fn partition_evens_odds() {
3332 let mut deployment = Deployment::new();
3333
3334 let mut flow = FlowBuilder::new();
3335 let node = flow.process::<()>();
3336 let external = flow.external::<()>();
3337
3338 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3339 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3340 let evens_port = evens.send_bincode_external(&external);
3341 let odds_port = odds.send_bincode_external(&external);
3342
3343 let nodes = flow
3344 .with_process(&node, deployment.Localhost())
3345 .with_external(&external, deployment.Localhost())
3346 .deploy(&mut deployment);
3347
3348 deployment.deploy().await.unwrap();
3349
3350 let mut evens_out = nodes.connect(evens_port).await;
3351 let mut odds_out = nodes.connect(odds_port).await;
3352
3353 deployment.start().await.unwrap();
3354
3355 let mut even_results = Vec::new();
3356 for _ in 0..3 {
3357 even_results.push(evens_out.next().await.unwrap());
3358 }
3359 even_results.sort();
3360 assert_eq!(even_results, vec![2, 4, 6]);
3361
3362 let mut odd_results = Vec::new();
3363 for _ in 0..3 {
3364 odd_results.push(odds_out.next().await.unwrap());
3365 }
3366 odd_results.sort();
3367 assert_eq!(odd_results, vec![1, 3, 5]);
3368 }
3369}