Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28#[cfg(feature = "build")]
29use crate::compile::builder::StmtId;
30use crate::compile::builder::{CycleId, ExternalPortId};
31#[cfg(feature = "build")]
32use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub expr: DebugExpr,
46    pub singleton_refs: Vec<(syn::Ident, HydroNode)>,
47}
48
49impl Clone for ClosureExpr {
50    fn clone(&self) -> Self {
51        Self {
52            expr: self.expr.clone(),
53            singleton_refs: self
54                .singleton_refs
55                .iter()
56                .map(|(ident, node)| {
57                    let cloned_node = match node {
58                        HydroNode::Singleton { inner, metadata } => HydroNode::Singleton {
59                            inner: SharedNode(inner.0.clone()),
60                            metadata: metadata.clone(),
61                        },
62                        _ => panic!("singleton_refs should only contain HydroNode::Singleton"),
63                    };
64                    (ident.clone(), cloned_node)
65                })
66                .collect(),
67        }
68    }
69}
70
71impl Hash for ClosureExpr {
72    fn hash<H: Hasher>(&self, state: &mut H) {
73        self.expr.hash(state);
74        // singleton_refs are structural children (like HydroIrMetadata), not
75        // identity-defining. Two closures with the same expr but different
76        // captured refs are the same closure text — the refs only affect codegen.
77    }
78}
79
80impl serde::Serialize for ClosureExpr {
81    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
82        use serde::ser::SerializeStruct;
83        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
84        s.serialize_field("expr", &self.expr)?;
85        s.serialize_field(
86            "singleton_refs",
87            &SerializableSingletonRefs(&self.singleton_refs),
88        )?;
89        s.end()
90    }
91}
92
93struct SerializableSingletonRefs<'a>(&'a [(syn::Ident, HydroNode)]);
94
95impl serde::Serialize for SerializableSingletonRefs<'_> {
96    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
97        use serde::ser::SerializeSeq;
98        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
99        for (ident, node) in self.0 {
100            seq.serialize_element(&(ident.to_string(), node))?;
101        }
102        seq.end()
103    }
104}
105
106impl Debug for ClosureExpr {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        Debug::fmt(&self.expr, f)
109    }
110}
111
112impl Display for ClosureExpr {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        Display::fmt(&self.expr, f)
115    }
116}
117
118impl From<syn::Expr> for ClosureExpr {
119    fn from(expr: syn::Expr) -> Self {
120        Self {
121            expr: DebugExpr(Box::new(expr)),
122            singleton_refs: Vec::new(),
123        }
124    }
125}
126
127impl From<DebugExpr> for ClosureExpr {
128    fn from(expr: DebugExpr) -> Self {
129        Self {
130            expr,
131            singleton_refs: Vec::new(),
132        }
133    }
134}
135
136impl ClosureExpr {
137    pub fn new(expr: DebugExpr, singleton_refs: Vec<(syn::Ident, HydroNode)>) -> Self {
138        Self {
139            expr,
140            singleton_refs,
141        }
142    }
143
144    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
145        Self {
146            expr: self.expr.clone(),
147            singleton_refs: self
148                .singleton_refs
149                .iter()
150                .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
151                .collect(),
152        }
153    }
154
155    pub fn transform_children(
156        &mut self,
157        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
158        seen_tees: &mut SeenSharedNodes,
159    ) {
160        for (_ident, ref_node) in self.singleton_refs.iter_mut() {
161            transform(ref_node, seen_tees);
162        }
163    }
164
165    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
166    /// replacing local singleton ref idents with `#dfir_ident` references.
167    #[cfg(feature = "build")]
168    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
169        if self.singleton_refs.is_empty() {
170            self.expr.0.to_token_stream()
171        } else {
172            let ref_idents = (0..self.singleton_refs.len())
173                .map(|_| ident_stack.pop().unwrap())
174                .collect::<Vec<_>>()
175                .into_iter()
176                .rev()
177                .collect::<Vec<_>>();
178            let local_idents = self
179                .singleton_refs
180                .iter()
181                .map(|(local_ident, _)| local_ident);
182            let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
183            let expr = &self.expr.0;
184            quote! {
185                {
186                    #(
187                        let #local_idents = #hash #ref_idents;
188                    )*
189                    #expr
190                }
191            }
192        }
193    }
194}
195
196/// Wrapper that displays only the tokens of a parsed expr.
197///
198/// Boxes `syn::Type` which is ~240 bytes.
199#[derive(Clone, Hash)]
200pub struct DebugExpr(pub Box<syn::Expr>);
201
202impl serde::Serialize for DebugExpr {
203    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
204        serializer.serialize_str(&self.to_string())
205    }
206}
207
208impl From<syn::Expr> for DebugExpr {
209    fn from(expr: syn::Expr) -> Self {
210        Self(Box::new(expr))
211    }
212}
213
214impl Deref for DebugExpr {
215    type Target = syn::Expr;
216
217    fn deref(&self) -> &Self::Target {
218        &self.0
219    }
220}
221
222impl ToTokens for DebugExpr {
223    fn to_tokens(&self, tokens: &mut TokenStream) {
224        self.0.to_tokens(tokens);
225    }
226}
227
228impl Debug for DebugExpr {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        write!(f, "{}", self.0.to_token_stream())
231    }
232}
233
234impl Display for DebugExpr {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        let original = self.0.as_ref().clone();
237        let simplified = simplify_q_macro(original);
238
239        // For now, just use quote formatting without trying to parse as a statement
240        // This avoids the syn::parse_quote! issues entirely
241        write!(f, "q!({})", quote::quote!(#simplified))
242    }
243}
244
245/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
246fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
247    // Try to parse the token string as a syn::Expr
248    // Use a visitor to simplify q! macro expansions
249    let mut simplifier = QMacroSimplifier::new();
250    simplifier.visit_expr_mut(&mut expr);
251
252    // If we found and simplified a q! macro, return the simplified version
253    if let Some(simplified) = simplifier.simplified_result {
254        simplified
255    } else {
256        expr
257    }
258}
259
260/// AST visitor that simplifies q! macro expansions
261#[derive(Default)]
262pub struct QMacroSimplifier {
263    pub simplified_result: Option<syn::Expr>,
264}
265
266impl QMacroSimplifier {
267    pub fn new() -> Self {
268        Self::default()
269    }
270}
271
272impl VisitMut for QMacroSimplifier {
273    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
274        // Check if we already found a result to avoid further processing
275        if self.simplified_result.is_some() {
276            return;
277        }
278
279        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
280            // Look for calls to stageleft::runtime_support::fn*
281            && self.is_stageleft_runtime_support_call(&path_expr.path)
282            // Try to extract the closure from the arguments
283            && let Some(closure) = self.extract_closure_from_args(&call.args)
284        {
285            self.simplified_result = Some(closure);
286            return;
287        }
288
289        // Continue visiting child expressions using the default implementation
290        // Use the default visitor to avoid infinite recursion
291        syn::visit_mut::visit_expr_mut(self, expr);
292    }
293}
294
295impl QMacroSimplifier {
296    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
297        // Check if this is a call to stageleft::runtime_support::fn*
298        if let Some(last_segment) = path.segments.last() {
299            let fn_name = last_segment.ident.to_string();
300            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
301            fn_name.contains("_type_hint")
302                && path.segments.len() > 2
303                && path.segments[0].ident == "stageleft"
304                && path.segments[1].ident == "runtime_support"
305        } else {
306            false
307        }
308    }
309
310    fn extract_closure_from_args(
311        &self,
312        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
313    ) -> Option<syn::Expr> {
314        // Look through the arguments for a closure expression
315        for arg in args {
316            if let syn::Expr::Closure(_) = arg {
317                return Some(arg.clone());
318            }
319            // Also check for closures nested in other expressions (like blocks)
320            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
321                return Some(closure_expr);
322            }
323        }
324        None
325    }
326
327    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
328        let mut visitor = ClosureFinder {
329            found_closure: None,
330            prefer_inner_blocks: true,
331        };
332        visitor.visit_expr(expr);
333        visitor.found_closure
334    }
335}
336
337/// Visitor that finds closures in expressions with special block handling
338struct ClosureFinder {
339    found_closure: Option<syn::Expr>,
340    prefer_inner_blocks: bool,
341}
342
343impl<'ast> Visit<'ast> for ClosureFinder {
344    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
345        // If we already found a closure, don't continue searching
346        if self.found_closure.is_some() {
347            return;
348        }
349
350        match expr {
351            syn::Expr::Closure(_) => {
352                self.found_closure = Some(expr.clone());
353            }
354            syn::Expr::Block(block) if self.prefer_inner_blocks => {
355                // Special handling for blocks - look for inner blocks that contain closures
356                for stmt in &block.block.stmts {
357                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
358                        && let syn::Expr::Block(_) = stmt_expr
359                    {
360                        // Check if this nested block contains a closure
361                        let mut inner_visitor = ClosureFinder {
362                            found_closure: None,
363                            prefer_inner_blocks: false, // Avoid infinite recursion
364                        };
365                        inner_visitor.visit_expr(stmt_expr);
366                        if inner_visitor.found_closure.is_some() {
367                            // Found a closure in an inner block, return that block
368                            self.found_closure = Some(stmt_expr.clone());
369                            return;
370                        }
371                    }
372                }
373
374                // If no inner block with closure found, continue with normal visitation
375                visit::visit_expr(self, expr);
376
377                // If we found a closure, just return the closure itself, not the whole block
378                // unless we're in the special case where we want the containing block
379                if self.found_closure.is_some() {
380                    // The closure was found during visitation, no need to wrap in block
381                }
382            }
383            _ => {
384                // Use default visitor behavior for all other expressions
385                visit::visit_expr(self, expr);
386            }
387        }
388    }
389}
390
391/// Debug displays the type's tokens.
392///
393/// Boxes `syn::Type` which is ~320 bytes.
394#[derive(Clone, PartialEq, Eq, Hash)]
395pub struct DebugType(pub Box<syn::Type>);
396
397impl From<syn::Type> for DebugType {
398    fn from(t: syn::Type) -> Self {
399        Self(Box::new(t))
400    }
401}
402
403impl Deref for DebugType {
404    type Target = syn::Type;
405
406    fn deref(&self) -> &Self::Target {
407        &self.0
408    }
409}
410
411impl ToTokens for DebugType {
412    fn to_tokens(&self, tokens: &mut TokenStream) {
413        self.0.to_tokens(tokens);
414    }
415}
416
417impl Debug for DebugType {
418    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419        write!(f, "{}", self.0.to_token_stream())
420    }
421}
422
423impl serde::Serialize for DebugType {
424    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
425        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
426    }
427}
428
429fn serialize_backtrace_as_span<S: serde::Serializer>(
430    backtrace: &Backtrace,
431    serializer: S,
432) -> Result<S::Ok, S::Error> {
433    match backtrace.format_span() {
434        Some(span) => serializer.serialize_some(&span),
435        None => serializer.serialize_none(),
436    }
437}
438
439fn serialize_ident<S: serde::Serializer>(
440    ident: &syn::Ident,
441    serializer: S,
442) -> Result<S::Ok, S::Error> {
443    serializer.serialize_str(&ident.to_string())
444}
445
446pub enum DebugInstantiate {
447    Building,
448    Finalized(Box<DebugInstantiateFinalized>),
449}
450
451impl serde::Serialize for DebugInstantiate {
452    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
453        match self {
454            DebugInstantiate::Building => {
455                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
456            }
457            DebugInstantiate::Finalized(_) => {
458                panic!(
459                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
460                )
461            }
462        }
463    }
464}
465
466#[cfg_attr(
467    not(feature = "build"),
468    expect(
469        dead_code,
470        reason = "sink, source unused without `feature = \"build\"`."
471    )
472)]
473pub struct DebugInstantiateFinalized {
474    sink: syn::Expr,
475    source: syn::Expr,
476    connect_fn: Option<Box<dyn FnOnce()>>,
477}
478
479impl From<DebugInstantiateFinalized> for DebugInstantiate {
480    fn from(f: DebugInstantiateFinalized) -> Self {
481        Self::Finalized(Box::new(f))
482    }
483}
484
485impl Debug for DebugInstantiate {
486    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487        write!(f, "<network instantiate>")
488    }
489}
490
491impl Hash for DebugInstantiate {
492    fn hash<H: Hasher>(&self, _state: &mut H) {
493        // Do nothing
494    }
495}
496
497impl Clone for DebugInstantiate {
498    fn clone(&self) -> Self {
499        match self {
500            DebugInstantiate::Building => DebugInstantiate::Building,
501            DebugInstantiate::Finalized(_) => {
502                panic!("DebugInstantiate::Finalized should not be cloned")
503            }
504        }
505    }
506}
507
508/// Tracks the instantiation state of a `ClusterMembers` source.
509///
510/// During `compile_network`, the first `ClusterMembers` node for a given
511/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
512/// receives the expression returned by `Deploy::cluster_membership_stream`.
513/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
514/// during code-gen they simply reference the tee output of the first node
515/// instead of creating a redundant `source_stream`.
516#[derive(Debug, Hash, Clone, serde::Serialize)]
517pub enum ClusterMembersState {
518    /// Not yet instantiated.
519    Uninit,
520    /// The primary instance: holds the stream expression and will emit
521    /// `source_stream(expr) -> tee()` during code-gen.
522    Stream(DebugExpr),
523    /// A secondary instance that references the tee output of the primary.
524    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
525    /// can derive the deterministic tee ident without extra state.
526    Tee(LocationId, LocationId),
527}
528
529/// A source in a Hydro graph, where data enters the graph.
530#[derive(Debug, Hash, Clone, serde::Serialize)]
531pub enum HydroSource {
532    Stream(DebugExpr),
533    ExternalNetwork(),
534    Iter(DebugExpr),
535    Spin(),
536    ClusterMembers(LocationId, ClusterMembersState),
537    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
538    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
539}
540
541#[cfg(feature = "build")]
542/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
543/// and simulations.
544///
545/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
546/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
547pub trait DfirBuilder {
548    /// Whether the representation of singletons should include intermediate states.
549    fn singleton_intermediates(&self) -> bool;
550
551    /// Gets the DFIR builder for the given location, creating it if necessary.
552    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
553
554    #[expect(clippy::too_many_arguments, reason = "TODO")]
555    fn batch(
556        &mut self,
557        in_ident: syn::Ident,
558        in_location: &LocationId,
559        in_kind: &CollectionKind,
560        out_ident: &syn::Ident,
561        out_location: &LocationId,
562        op_meta: &HydroIrOpMetadata,
563        fold_hooked_idents: &HashSet<String>,
564    );
565    fn yield_from_tick(
566        &mut self,
567        in_ident: syn::Ident,
568        in_location: &LocationId,
569        in_kind: &CollectionKind,
570        out_ident: &syn::Ident,
571        out_location: &LocationId,
572    );
573
574    fn begin_atomic(
575        &mut self,
576        in_ident: syn::Ident,
577        in_location: &LocationId,
578        in_kind: &CollectionKind,
579        out_ident: &syn::Ident,
580        out_location: &LocationId,
581        op_meta: &HydroIrOpMetadata,
582    );
583    fn end_atomic(
584        &mut self,
585        in_ident: syn::Ident,
586        in_location: &LocationId,
587        in_kind: &CollectionKind,
588        out_ident: &syn::Ident,
589    );
590
591    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
592    fn observe_nondet(
593        &mut self,
594        trusted: bool,
595        location: &LocationId,
596        in_ident: syn::Ident,
597        in_kind: &CollectionKind,
598        out_ident: &syn::Ident,
599        out_kind: &CollectionKind,
600        op_meta: &HydroIrOpMetadata,
601    );
602
603    #[expect(clippy::too_many_arguments, reason = "TODO")]
604    fn merge_ordered(
605        &mut self,
606        location: &LocationId,
607        first_ident: syn::Ident,
608        second_ident: syn::Ident,
609        out_ident: &syn::Ident,
610        in_kind: &CollectionKind,
611        op_meta: &HydroIrOpMetadata,
612        operator_tag: Option<&str>,
613    );
614
615    #[expect(clippy::too_many_arguments, reason = "TODO")]
616    fn create_network(
617        &mut self,
618        from: &LocationId,
619        to: &LocationId,
620        input_ident: syn::Ident,
621        out_ident: &syn::Ident,
622        serialize: Option<&DebugExpr>,
623        sink: syn::Expr,
624        source: syn::Expr,
625        deserialize: Option<&DebugExpr>,
626        tag_id: StmtId,
627        networking_info: &crate::networking::NetworkingInfo,
628    );
629
630    fn create_external_source(
631        &mut self,
632        on: &LocationId,
633        source_expr: syn::Expr,
634        out_ident: &syn::Ident,
635        deserialize: Option<&DebugExpr>,
636        tag_id: StmtId,
637    );
638
639    fn create_external_output(
640        &mut self,
641        on: &LocationId,
642        sink_expr: syn::Expr,
643        input_ident: &syn::Ident,
644        serialize: Option<&DebugExpr>,
645        tag_id: StmtId,
646    );
647
648    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
649    /// Returns the new input ident to use for the fold if a hook was emitted.
650    fn emit_fold_hook(
651        &mut self,
652        location: &LocationId,
653        in_ident: &syn::Ident,
654        in_kind: &CollectionKind,
655        op_meta: &HydroIrOpMetadata,
656    ) -> Option<syn::Ident>;
657
658    /// Inserts necessary code to validate a manual assertion that at this point the
659    /// input live collection is consistent. In production, this is a no-op, but in simulation
660    /// this will (not yet implemented) inject assertions that validate consistency.
661    fn assert_is_consistent(
662        &mut self,
663        trusted: bool,
664        location: &LocationId,
665        in_ident: syn::Ident,
666        out_ident: &syn::Ident,
667    );
668}
669
670#[cfg(feature = "build")]
671impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
672    fn singleton_intermediates(&self) -> bool {
673        false
674    }
675
676    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
677        self.entry(location.root().key())
678            .expect("location was removed")
679            .or_default()
680    }
681
682    fn batch(
683        &mut self,
684        in_ident: syn::Ident,
685        in_location: &LocationId,
686        in_kind: &CollectionKind,
687        out_ident: &syn::Ident,
688        _out_location: &LocationId,
689        _op_meta: &HydroIrOpMetadata,
690        _fold_hooked_idents: &HashSet<String>,
691    ) {
692        let builder = self.get_dfir_mut(in_location.root());
693        if in_kind.is_bounded()
694            && matches!(
695                in_kind,
696                CollectionKind::Singleton { .. }
697                    | CollectionKind::Optional { .. }
698                    | CollectionKind::KeyedSingleton { .. }
699            )
700        {
701            assert!(in_location.is_top_level());
702            builder.add_dfir(
703                parse_quote! {
704                    #out_ident = #in_ident -> persist::<'static>();
705                },
706                None,
707                None,
708            );
709        } else {
710            builder.add_dfir(
711                parse_quote! {
712                    #out_ident = #in_ident;
713                },
714                None,
715                None,
716            );
717        }
718    }
719
720    fn yield_from_tick(
721        &mut self,
722        in_ident: syn::Ident,
723        in_location: &LocationId,
724        _in_kind: &CollectionKind,
725        out_ident: &syn::Ident,
726        _out_location: &LocationId,
727    ) {
728        let builder = self.get_dfir_mut(in_location.root());
729        builder.add_dfir(
730            parse_quote! {
731                #out_ident = #in_ident;
732            },
733            None,
734            None,
735        );
736    }
737
738    fn begin_atomic(
739        &mut self,
740        in_ident: syn::Ident,
741        in_location: &LocationId,
742        _in_kind: &CollectionKind,
743        out_ident: &syn::Ident,
744        _out_location: &LocationId,
745        _op_meta: &HydroIrOpMetadata,
746    ) {
747        let builder = self.get_dfir_mut(in_location.root());
748        builder.add_dfir(
749            parse_quote! {
750                #out_ident = #in_ident;
751            },
752            None,
753            None,
754        );
755    }
756
757    fn end_atomic(
758        &mut self,
759        in_ident: syn::Ident,
760        in_location: &LocationId,
761        _in_kind: &CollectionKind,
762        out_ident: &syn::Ident,
763    ) {
764        let builder = self.get_dfir_mut(in_location.root());
765        builder.add_dfir(
766            parse_quote! {
767                #out_ident = #in_ident;
768            },
769            None,
770            None,
771        );
772    }
773
774    fn observe_nondet(
775        &mut self,
776        _trusted: bool,
777        location: &LocationId,
778        in_ident: syn::Ident,
779        _in_kind: &CollectionKind,
780        out_ident: &syn::Ident,
781        _out_kind: &CollectionKind,
782        _op_meta: &HydroIrOpMetadata,
783    ) {
784        let builder = self.get_dfir_mut(location);
785        builder.add_dfir(
786            parse_quote! {
787                #out_ident = #in_ident;
788            },
789            None,
790            None,
791        );
792    }
793
794    fn merge_ordered(
795        &mut self,
796        location: &LocationId,
797        first_ident: syn::Ident,
798        second_ident: syn::Ident,
799        out_ident: &syn::Ident,
800        _in_kind: &CollectionKind,
801        _op_meta: &HydroIrOpMetadata,
802        operator_tag: Option<&str>,
803    ) {
804        let builder = self.get_dfir_mut(location);
805        builder.add_dfir(
806            parse_quote! {
807                #out_ident = union();
808                #first_ident -> [0]#out_ident;
809                #second_ident -> [1]#out_ident;
810            },
811            None,
812            operator_tag,
813        );
814    }
815
816    fn create_network(
817        &mut self,
818        from: &LocationId,
819        to: &LocationId,
820        input_ident: syn::Ident,
821        out_ident: &syn::Ident,
822        serialize: Option<&DebugExpr>,
823        sink: syn::Expr,
824        source: syn::Expr,
825        deserialize: Option<&DebugExpr>,
826        tag_id: StmtId,
827        _networking_info: &crate::networking::NetworkingInfo,
828    ) {
829        let sender_builder = self.get_dfir_mut(from);
830        if let Some(serialize_pipeline) = serialize {
831            sender_builder.add_dfir(
832                parse_quote! {
833                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
834                },
835                None,
836                // operator tag separates send and receive, which otherwise have the same next_stmt_id
837                Some(&format!("send{}", tag_id)),
838            );
839        } else {
840            sender_builder.add_dfir(
841                parse_quote! {
842                    #input_ident -> dest_sink(#sink);
843                },
844                None,
845                Some(&format!("send{}", tag_id)),
846            );
847        }
848
849        let receiver_builder = self.get_dfir_mut(to);
850        if let Some(deserialize_pipeline) = deserialize {
851            receiver_builder.add_dfir(
852                parse_quote! {
853                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
854                },
855                None,
856                Some(&format!("recv{}", tag_id)),
857            );
858        } else {
859            receiver_builder.add_dfir(
860                parse_quote! {
861                    #out_ident = source_stream(#source);
862                },
863                None,
864                Some(&format!("recv{}", tag_id)),
865            );
866        }
867    }
868
869    fn create_external_source(
870        &mut self,
871        on: &LocationId,
872        source_expr: syn::Expr,
873        out_ident: &syn::Ident,
874        deserialize: Option<&DebugExpr>,
875        tag_id: StmtId,
876    ) {
877        let receiver_builder = self.get_dfir_mut(on);
878        if let Some(deserialize_pipeline) = deserialize {
879            receiver_builder.add_dfir(
880                parse_quote! {
881                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
882                },
883                None,
884                Some(&format!("recv{}", tag_id)),
885            );
886        } else {
887            receiver_builder.add_dfir(
888                parse_quote! {
889                    #out_ident = source_stream(#source_expr);
890                },
891                None,
892                Some(&format!("recv{}", tag_id)),
893            );
894        }
895    }
896
897    fn create_external_output(
898        &mut self,
899        on: &LocationId,
900        sink_expr: syn::Expr,
901        input_ident: &syn::Ident,
902        serialize: Option<&DebugExpr>,
903        tag_id: StmtId,
904    ) {
905        let sender_builder = self.get_dfir_mut(on);
906        if let Some(serialize_fn) = serialize {
907            sender_builder.add_dfir(
908                parse_quote! {
909                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
910                },
911                None,
912                // operator tag separates send and receive, which otherwise have the same next_stmt_id
913                Some(&format!("send{}", tag_id)),
914            );
915        } else {
916            sender_builder.add_dfir(
917                parse_quote! {
918                    #input_ident -> dest_sink(#sink_expr);
919                },
920                None,
921                Some(&format!("send{}", tag_id)),
922            );
923        }
924    }
925
926    fn emit_fold_hook(
927        &mut self,
928        _location: &LocationId,
929        _in_ident: &syn::Ident,
930        _in_kind: &CollectionKind,
931        _op_meta: &HydroIrOpMetadata,
932    ) -> Option<syn::Ident> {
933        None
934    }
935
936    fn assert_is_consistent(
937        &mut self,
938        _trusted: bool,
939        location: &LocationId,
940        in_ident: syn::Ident,
941        out_ident: &syn::Ident,
942    ) {
943        let builder = self.get_dfir_mut(location);
944        builder.add_dfir(
945            parse_quote! {
946                #out_ident = #in_ident;
947            },
948            None,
949            None,
950        );
951    }
952}
953
954#[cfg(feature = "build")]
955pub enum BuildersOrCallback<'a, L, N>
956where
957    L: FnMut(&mut HydroRoot, &mut StmtId),
958    N: FnMut(&mut HydroNode, &mut StmtId),
959{
960    Builders(&'a mut dyn DfirBuilder),
961    Callback(L, N),
962}
963
964/// An root in a Hydro graph, which is an pipeline that doesn't emit
965/// any downstream values. Traversals over the dataflow graph and
966/// generating DFIR IR start from roots.
967#[derive(Debug, Hash, serde::Serialize)]
968pub enum HydroRoot {
969    ForEach {
970        f: DebugExpr,
971        input: Box<HydroNode>,
972        op_metadata: HydroIrOpMetadata,
973    },
974    SendExternal {
975        to_external_key: LocationKey,
976        to_port_id: ExternalPortId,
977        to_many: bool,
978        unpaired: bool,
979        serialize_fn: Option<DebugExpr>,
980        instantiate_fn: DebugInstantiate,
981        input: Box<HydroNode>,
982        op_metadata: HydroIrOpMetadata,
983    },
984    DestSink {
985        sink: DebugExpr,
986        input: Box<HydroNode>,
987        op_metadata: HydroIrOpMetadata,
988    },
989    CycleSink {
990        cycle_id: CycleId,
991        input: Box<HydroNode>,
992        op_metadata: HydroIrOpMetadata,
993    },
994    EmbeddedOutput {
995        #[serde(serialize_with = "serialize_ident")]
996        ident: syn::Ident,
997        input: Box<HydroNode>,
998        op_metadata: HydroIrOpMetadata,
999    },
1000    Null {
1001        input: Box<HydroNode>,
1002        op_metadata: HydroIrOpMetadata,
1003    },
1004}
1005
1006impl HydroRoot {
1007    #[cfg(feature = "build")]
1008    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1009    pub fn compile_network<'a, D>(
1010        &mut self,
1011        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1012        seen_tees: &mut SeenSharedNodes,
1013        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1014        processes: &SparseSecondaryMap<LocationKey, D::Process>,
1015        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1016        externals: &SparseSecondaryMap<LocationKey, D::External>,
1017        env: &mut D::InstantiateEnv,
1018    ) where
1019        D: Deploy<'a>,
1020    {
1021        let refcell_extra_stmts = RefCell::new(extra_stmts);
1022        let refcell_env = RefCell::new(env);
1023        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1024        self.transform_bottom_up(
1025            &mut |l| {
1026                if let HydroRoot::SendExternal {
1027                    input,
1028                    to_external_key,
1029                    to_port_id,
1030                    to_many,
1031                    unpaired,
1032                    instantiate_fn,
1033                    ..
1034                } = l
1035                {
1036                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1037                        DebugInstantiate::Building => {
1038                            let to_node = externals
1039                                .get(*to_external_key)
1040                                .unwrap_or_else(|| {
1041                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1042                                })
1043                                .clone();
1044
1045                            match input.metadata().location_id.root() {
1046                                &LocationId::Process(process_key) => {
1047                                    if *to_many {
1048                                        (
1049                                            (
1050                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1051                                                parse_quote!(DUMMY),
1052                                            ),
1053                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1054                                        )
1055                                    } else {
1056                                        let from_node = processes
1057                                            .get(process_key)
1058                                            .unwrap_or_else(|| {
1059                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1060                                            })
1061                                            .clone();
1062
1063                                        let sink_port = from_node.next_port();
1064                                        let source_port = to_node.next_port();
1065
1066                                        if *unpaired {
1067                                            use stageleft::quote_type;
1068                                            use tokio_util::codec::LengthDelimitedCodec;
1069
1070                                            to_node.register(*to_port_id, source_port.clone());
1071
1072                                            let _ = D::e2o_source(
1073                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1074                                                &to_node, &source_port,
1075                                                &from_node, &sink_port,
1076                                                &quote_type::<LengthDelimitedCodec>(),
1077                                                format!("{}_{}", *to_external_key, *to_port_id)
1078                                            );
1079                                        }
1080
1081                                        (
1082                                            (
1083                                                D::o2e_sink(
1084                                                    &from_node,
1085                                                    &sink_port,
1086                                                    &to_node,
1087                                                    &source_port,
1088                                                    format!("{}_{}", *to_external_key, *to_port_id)
1089                                                ),
1090                                                parse_quote!(DUMMY),
1091                                            ),
1092                                            if *unpaired {
1093                                                D::e2o_connect(
1094                                                    &to_node,
1095                                                    &source_port,
1096                                                    &from_node,
1097                                                    &sink_port,
1098                                                    *to_many,
1099                                                    NetworkHint::Auto,
1100                                                )
1101                                            } else {
1102                                                Box::new(|| {}) as Box<dyn FnOnce()>
1103                                            },
1104                                        )
1105                                    }
1106                                }
1107                                LocationId::Cluster(cluster_key) => {
1108                                    let from_node = clusters
1109                                        .get(*cluster_key)
1110                                        .unwrap_or_else(|| {
1111                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1112                                        })
1113                                        .clone();
1114
1115                                    let sink_port = from_node.next_port();
1116                                    let source_port = to_node.next_port();
1117
1118                                    if *unpaired {
1119                                        to_node.register(*to_port_id, source_port.clone());
1120                                    }
1121
1122                                    (
1123                                        (
1124                                            D::m2e_sink(
1125                                                &from_node,
1126                                                &sink_port,
1127                                                &to_node,
1128                                                &source_port,
1129                                                format!("{}_{}", *to_external_key, *to_port_id)
1130                                            ),
1131                                            parse_quote!(DUMMY),
1132                                        ),
1133                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1134                                    )
1135                                }
1136                                _ => panic!()
1137                            }
1138                        },
1139
1140                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1141                    };
1142
1143                    *instantiate_fn = DebugInstantiateFinalized {
1144                        sink: sink_expr,
1145                        source: source_expr,
1146                        connect_fn: Some(connect_fn),
1147                    }
1148                    .into();
1149                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1150                    let element_type = match &input.metadata().collection_kind {
1151                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1152                        _ => panic!("Embedded output must have Stream collection kind"),
1153                    };
1154                    let location_key = match input.metadata().location_id.root() {
1155                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1156                        _ => panic!("Embedded output must be on a process or cluster"),
1157                    };
1158                    D::register_embedded_output(
1159                        &mut refcell_env.borrow_mut(),
1160                        location_key,
1161                        ident,
1162                        &element_type,
1163                    );
1164                }
1165            },
1166            &mut |n| {
1167                if let HydroNode::Network {
1168                    name,
1169                    networking_info,
1170                    input,
1171                    instantiate_fn,
1172                    metadata,
1173                    ..
1174                } = n
1175                {
1176                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1177                        DebugInstantiate::Building => instantiate_network::<D>(
1178                            &mut refcell_env.borrow_mut(),
1179                            input.metadata().location_id.root(),
1180                            metadata.location_id.root(),
1181                            processes,
1182                            clusters,
1183                            name.as_deref(),
1184                            networking_info,
1185                        ),
1186
1187                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1188                    };
1189
1190                    *instantiate_fn = DebugInstantiateFinalized {
1191                        sink: sink_expr,
1192                        source: source_expr,
1193                        connect_fn: Some(connect_fn),
1194                    }
1195                    .into();
1196                } else if let HydroNode::ExternalInput {
1197                    from_external_key,
1198                    from_port_id,
1199                    from_many,
1200                    codec_type,
1201                    port_hint,
1202                    instantiate_fn,
1203                    metadata,
1204                    ..
1205                } = n
1206                {
1207                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1208                        DebugInstantiate::Building => {
1209                            let from_node = externals
1210                                .get(*from_external_key)
1211                                .unwrap_or_else(|| {
1212                                    panic!(
1213                                        "A external used in the graph was not instantiated: {}",
1214                                        from_external_key,
1215                                    )
1216                                })
1217                                .clone();
1218
1219                            match metadata.location_id.root() {
1220                                &LocationId::Process(process_key) => {
1221                                    let to_node = processes
1222                                        .get(process_key)
1223                                        .unwrap_or_else(|| {
1224                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1225                                        })
1226                                        .clone();
1227
1228                                    let sink_port = from_node.next_port();
1229                                    let source_port = to_node.next_port();
1230
1231                                    from_node.register(*from_port_id, sink_port.clone());
1232
1233                                    (
1234                                        (
1235                                            parse_quote!(DUMMY),
1236                                            if *from_many {
1237                                                D::e2o_many_source(
1238                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1239                                                    &to_node, &source_port,
1240                                                    codec_type.0.as_ref(),
1241                                                    format!("{}_{}", *from_external_key, *from_port_id)
1242                                                )
1243                                            } else {
1244                                                D::e2o_source(
1245                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1246                                                    &from_node, &sink_port,
1247                                                    &to_node, &source_port,
1248                                                    codec_type.0.as_ref(),
1249                                                    format!("{}_{}", *from_external_key, *from_port_id)
1250                                                )
1251                                            },
1252                                        ),
1253                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1254                                    )
1255                                }
1256                                LocationId::Cluster(cluster_key) => {
1257                                    let to_node = clusters
1258                                        .get(*cluster_key)
1259                                        .unwrap_or_else(|| {
1260                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1261                                        })
1262                                        .clone();
1263
1264                                    let sink_port = from_node.next_port();
1265                                    let source_port = to_node.next_port();
1266
1267                                    from_node.register(*from_port_id, sink_port.clone());
1268
1269                                    (
1270                                        (
1271                                            parse_quote!(DUMMY),
1272                                            D::e2m_source(
1273                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1274                                                &from_node, &sink_port,
1275                                                &to_node, &source_port,
1276                                                codec_type.0.as_ref(),
1277                                                format!("{}_{}", *from_external_key, *from_port_id)
1278                                            ),
1279                                        ),
1280                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1281                                    )
1282                                }
1283                                _ => panic!()
1284                            }
1285                        },
1286
1287                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1288                    };
1289
1290                    *instantiate_fn = DebugInstantiateFinalized {
1291                        sink: sink_expr,
1292                        source: source_expr,
1293                        connect_fn: Some(connect_fn),
1294                    }
1295                    .into();
1296                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1297                    let element_type = match &metadata.collection_kind {
1298                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1299                        _ => panic!("Embedded source must have Stream collection kind"),
1300                    };
1301                    let location_key = match metadata.location_id.root() {
1302                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1303                        _ => panic!("Embedded source must be on a process or cluster"),
1304                    };
1305                    D::register_embedded_stream_input(
1306                        &mut refcell_env.borrow_mut(),
1307                        location_key,
1308                        ident,
1309                        &element_type,
1310                    );
1311                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1312                    let element_type = match &metadata.collection_kind {
1313                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1314                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1315                    };
1316                    let location_key = match metadata.location_id.root() {
1317                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1318                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1319                    };
1320                    D::register_embedded_singleton_input(
1321                        &mut refcell_env.borrow_mut(),
1322                        location_key,
1323                        ident,
1324                        &element_type,
1325                    );
1326                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1327                    match state {
1328                        ClusterMembersState::Uninit => {
1329                            let at_location = metadata.location_id.root().clone();
1330                            let key = (at_location.clone(), location_id.key());
1331                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1332                                // First occurrence: call cluster_membership_stream and mark as Stream.
1333                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1334                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1335                                    &(),
1336                                );
1337                                *state = ClusterMembersState::Stream(expr.into());
1338                            } else {
1339                                // Already instantiated for this (at, target) pair: just tee.
1340                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1341                            }
1342                        }
1343                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1344                            panic!("cluster members already finalized");
1345                        }
1346                    }
1347                }
1348            },
1349            seen_tees,
1350            false,
1351        );
1352    }
1353
1354    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1355        self.transform_bottom_up(
1356            &mut |l| {
1357                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1358                    match instantiate_fn {
1359                        DebugInstantiate::Building => panic!("network not built"),
1360
1361                        DebugInstantiate::Finalized(finalized) => {
1362                            (finalized.connect_fn.take().unwrap())();
1363                        }
1364                    }
1365                }
1366            },
1367            &mut |n| {
1368                if let HydroNode::Network { instantiate_fn, .. }
1369                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1370                {
1371                    match instantiate_fn {
1372                        DebugInstantiate::Building => panic!("network not built"),
1373
1374                        DebugInstantiate::Finalized(finalized) => {
1375                            (finalized.connect_fn.take().unwrap())();
1376                        }
1377                    }
1378                }
1379            },
1380            seen_tees,
1381            false,
1382        );
1383    }
1384
1385    pub fn transform_bottom_up(
1386        &mut self,
1387        transform_root: &mut impl FnMut(&mut HydroRoot),
1388        transform_node: &mut impl FnMut(&mut HydroNode),
1389        seen_tees: &mut SeenSharedNodes,
1390        check_well_formed: bool,
1391    ) {
1392        self.transform_children(
1393            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1394            seen_tees,
1395        );
1396
1397        transform_root(self);
1398    }
1399
1400    pub fn transform_children(
1401        &mut self,
1402        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1403        seen_tees: &mut SeenSharedNodes,
1404    ) {
1405        match self {
1406            HydroRoot::ForEach { input, .. }
1407            | HydroRoot::SendExternal { input, .. }
1408            | HydroRoot::DestSink { input, .. }
1409            | HydroRoot::CycleSink { input, .. }
1410            | HydroRoot::EmbeddedOutput { input, .. }
1411            | HydroRoot::Null { input, .. } => {
1412                transform(input, seen_tees);
1413            }
1414        }
1415    }
1416
1417    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1418        match self {
1419            HydroRoot::ForEach {
1420                f,
1421                input,
1422                op_metadata,
1423            } => HydroRoot::ForEach {
1424                f: f.clone(),
1425                input: Box::new(input.deep_clone(seen_tees)),
1426                op_metadata: op_metadata.clone(),
1427            },
1428            HydroRoot::SendExternal {
1429                to_external_key,
1430                to_port_id,
1431                to_many,
1432                unpaired,
1433                serialize_fn,
1434                instantiate_fn,
1435                input,
1436                op_metadata,
1437            } => HydroRoot::SendExternal {
1438                to_external_key: *to_external_key,
1439                to_port_id: *to_port_id,
1440                to_many: *to_many,
1441                unpaired: *unpaired,
1442                serialize_fn: serialize_fn.clone(),
1443                instantiate_fn: instantiate_fn.clone(),
1444                input: Box::new(input.deep_clone(seen_tees)),
1445                op_metadata: op_metadata.clone(),
1446            },
1447            HydroRoot::DestSink {
1448                sink,
1449                input,
1450                op_metadata,
1451            } => HydroRoot::DestSink {
1452                sink: sink.clone(),
1453                input: Box::new(input.deep_clone(seen_tees)),
1454                op_metadata: op_metadata.clone(),
1455            },
1456            HydroRoot::CycleSink {
1457                cycle_id,
1458                input,
1459                op_metadata,
1460            } => HydroRoot::CycleSink {
1461                cycle_id: *cycle_id,
1462                input: Box::new(input.deep_clone(seen_tees)),
1463                op_metadata: op_metadata.clone(),
1464            },
1465            HydroRoot::EmbeddedOutput {
1466                ident,
1467                input,
1468                op_metadata,
1469            } => HydroRoot::EmbeddedOutput {
1470                ident: ident.clone(),
1471                input: Box::new(input.deep_clone(seen_tees)),
1472                op_metadata: op_metadata.clone(),
1473            },
1474            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1475                input: Box::new(input.deep_clone(seen_tees)),
1476                op_metadata: op_metadata.clone(),
1477            },
1478        }
1479    }
1480
1481    #[cfg(feature = "build")]
1482    pub fn emit(
1483        &mut self,
1484        graph_builders: &mut dyn DfirBuilder,
1485        seen_tees: &mut SeenSharedNodes,
1486        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1487        next_stmt_id: &mut StmtId,
1488        fold_hooked_idents: &mut HashSet<String>,
1489    ) {
1490        self.emit_core(
1491            &mut BuildersOrCallback::<
1492                fn(&mut HydroRoot, &mut StmtId),
1493                fn(&mut HydroNode, &mut StmtId),
1494            >::Builders(graph_builders),
1495            seen_tees,
1496            built_tees,
1497            next_stmt_id,
1498            fold_hooked_idents,
1499        );
1500    }
1501
1502    #[cfg(feature = "build")]
1503    pub fn emit_core(
1504        &mut self,
1505        builders_or_callback: &mut BuildersOrCallback<
1506            impl FnMut(&mut HydroRoot, &mut StmtId),
1507            impl FnMut(&mut HydroNode, &mut StmtId),
1508        >,
1509        seen_tees: &mut SeenSharedNodes,
1510        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1511        next_stmt_id: &mut StmtId,
1512        fold_hooked_idents: &mut HashSet<String>,
1513    ) {
1514        match self {
1515            HydroRoot::ForEach { f, input, .. } => {
1516                let input_ident = input.emit_core(
1517                    builders_or_callback,
1518                    seen_tees,
1519                    built_tees,
1520                    next_stmt_id,
1521                    fold_hooked_idents,
1522                );
1523
1524                match builders_or_callback {
1525                    BuildersOrCallback::Builders(graph_builders) => {
1526                        graph_builders
1527                            .get_dfir_mut(&input.metadata().location_id)
1528                            .add_dfir(
1529                                parse_quote! {
1530                                    #input_ident -> for_each(#f);
1531                                },
1532                                None,
1533                                Some(&next_stmt_id.to_string()),
1534                            );
1535                    }
1536                    BuildersOrCallback::Callback(leaf_callback, _) => {
1537                        leaf_callback(self, next_stmt_id);
1538                    }
1539                }
1540
1541                let _ = next_stmt_id.get_and_increment();
1542            }
1543
1544            HydroRoot::SendExternal {
1545                serialize_fn,
1546                instantiate_fn,
1547                input,
1548                ..
1549            } => {
1550                let input_ident = input.emit_core(
1551                    builders_or_callback,
1552                    seen_tees,
1553                    built_tees,
1554                    next_stmt_id,
1555                    fold_hooked_idents,
1556                );
1557
1558                match builders_or_callback {
1559                    BuildersOrCallback::Builders(graph_builders) => {
1560                        let (sink_expr, _) = match instantiate_fn {
1561                            DebugInstantiate::Building => (
1562                                syn::parse_quote!(DUMMY_SINK),
1563                                syn::parse_quote!(DUMMY_SOURCE),
1564                            ),
1565
1566                            DebugInstantiate::Finalized(finalized) => {
1567                                (finalized.sink.clone(), finalized.source.clone())
1568                            }
1569                        };
1570
1571                        graph_builders.create_external_output(
1572                            &input.metadata().location_id,
1573                            sink_expr,
1574                            &input_ident,
1575                            serialize_fn.as_ref(),
1576                            *next_stmt_id,
1577                        );
1578                    }
1579                    BuildersOrCallback::Callback(leaf_callback, _) => {
1580                        leaf_callback(self, next_stmt_id);
1581                    }
1582                }
1583
1584                let _ = next_stmt_id.get_and_increment();
1585            }
1586
1587            HydroRoot::DestSink { sink, input, .. } => {
1588                let input_ident = input.emit_core(
1589                    builders_or_callback,
1590                    seen_tees,
1591                    built_tees,
1592                    next_stmt_id,
1593                    fold_hooked_idents,
1594                );
1595
1596                match builders_or_callback {
1597                    BuildersOrCallback::Builders(graph_builders) => {
1598                        graph_builders
1599                            .get_dfir_mut(&input.metadata().location_id)
1600                            .add_dfir(
1601                                parse_quote! {
1602                                    #input_ident -> dest_sink(#sink);
1603                                },
1604                                None,
1605                                Some(&next_stmt_id.to_string()),
1606                            );
1607                    }
1608                    BuildersOrCallback::Callback(leaf_callback, _) => {
1609                        leaf_callback(self, next_stmt_id);
1610                    }
1611                }
1612
1613                let _ = next_stmt_id.get_and_increment();
1614            }
1615
1616            HydroRoot::CycleSink {
1617                cycle_id, input, ..
1618            } => {
1619                let input_ident = input.emit_core(
1620                    builders_or_callback,
1621                    seen_tees,
1622                    built_tees,
1623                    next_stmt_id,
1624                    fold_hooked_idents,
1625                );
1626
1627                match builders_or_callback {
1628                    BuildersOrCallback::Builders(graph_builders) => {
1629                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1630                            CollectionKind::KeyedSingleton {
1631                                key_type,
1632                                value_type,
1633                                ..
1634                            }
1635                            | CollectionKind::KeyedStream {
1636                                key_type,
1637                                value_type,
1638                                ..
1639                            } => {
1640                                parse_quote!((#key_type, #value_type))
1641                            }
1642                            CollectionKind::Stream { element_type, .. }
1643                            | CollectionKind::Singleton { element_type, .. }
1644                            | CollectionKind::Optional { element_type, .. } => {
1645                                parse_quote!(#element_type)
1646                            }
1647                        };
1648
1649                        let cycle_id_ident = cycle_id.as_ident();
1650                        graph_builders
1651                            .get_dfir_mut(&input.metadata().location_id)
1652                            .add_dfir(
1653                                parse_quote! {
1654                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1655                                },
1656                                None,
1657                                None,
1658                            );
1659                    }
1660                    // No ID, no callback
1661                    BuildersOrCallback::Callback(_, _) => {}
1662                }
1663            }
1664
1665            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1666                let input_ident = input.emit_core(
1667                    builders_or_callback,
1668                    seen_tees,
1669                    built_tees,
1670                    next_stmt_id,
1671                    fold_hooked_idents,
1672                );
1673
1674                match builders_or_callback {
1675                    BuildersOrCallback::Builders(graph_builders) => {
1676                        graph_builders
1677                            .get_dfir_mut(&input.metadata().location_id)
1678                            .add_dfir(
1679                                parse_quote! {
1680                                    #input_ident -> for_each(&mut #ident);
1681                                },
1682                                None,
1683                                Some(&next_stmt_id.to_string()),
1684                            );
1685                    }
1686                    BuildersOrCallback::Callback(leaf_callback, _) => {
1687                        leaf_callback(self, next_stmt_id);
1688                    }
1689                }
1690
1691                let _ = next_stmt_id.get_and_increment();
1692            }
1693
1694            HydroRoot::Null { input, .. } => {
1695                let input_ident = input.emit_core(
1696                    builders_or_callback,
1697                    seen_tees,
1698                    built_tees,
1699                    next_stmt_id,
1700                    fold_hooked_idents,
1701                );
1702
1703                match builders_or_callback {
1704                    BuildersOrCallback::Builders(graph_builders) => {
1705                        graph_builders
1706                            .get_dfir_mut(&input.metadata().location_id)
1707                            .add_dfir(
1708                                parse_quote! {
1709                                    #input_ident -> for_each(|_| {});
1710                                },
1711                                None,
1712                                Some(&next_stmt_id.to_string()),
1713                            );
1714                    }
1715                    BuildersOrCallback::Callback(leaf_callback, _) => {
1716                        leaf_callback(self, next_stmt_id);
1717                    }
1718                }
1719
1720                let _ = next_stmt_id.get_and_increment();
1721            }
1722        }
1723    }
1724
1725    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1726        match self {
1727            HydroRoot::ForEach { op_metadata, .. }
1728            | HydroRoot::SendExternal { op_metadata, .. }
1729            | HydroRoot::DestSink { op_metadata, .. }
1730            | HydroRoot::CycleSink { op_metadata, .. }
1731            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1732            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1733        }
1734    }
1735
1736    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1737        match self {
1738            HydroRoot::ForEach { op_metadata, .. }
1739            | HydroRoot::SendExternal { op_metadata, .. }
1740            | HydroRoot::DestSink { op_metadata, .. }
1741            | HydroRoot::CycleSink { op_metadata, .. }
1742            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1743            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1744        }
1745    }
1746
1747    pub fn input(&self) -> &HydroNode {
1748        match self {
1749            HydroRoot::ForEach { input, .. }
1750            | HydroRoot::SendExternal { input, .. }
1751            | HydroRoot::DestSink { input, .. }
1752            | HydroRoot::CycleSink { input, .. }
1753            | HydroRoot::EmbeddedOutput { input, .. }
1754            | HydroRoot::Null { input, .. } => input,
1755        }
1756    }
1757
1758    pub fn input_metadata(&self) -> &HydroIrMetadata {
1759        self.input().metadata()
1760    }
1761
1762    pub fn print_root(&self) -> String {
1763        match self {
1764            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1765            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1766            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1767            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1768            HydroRoot::EmbeddedOutput { ident, .. } => {
1769                format!("EmbeddedOutput({})", ident)
1770            }
1771            HydroRoot::Null { .. } => "Null".to_owned(),
1772        }
1773    }
1774
1775    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1776        match self {
1777            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1778                transform(f);
1779            }
1780            HydroRoot::SendExternal { .. }
1781            | HydroRoot::CycleSink { .. }
1782            | HydroRoot::EmbeddedOutput { .. }
1783            | HydroRoot::Null { .. } => {}
1784        }
1785    }
1786}
1787
1788#[cfg(feature = "build")]
1789fn tick_of(loc: &LocationId) -> Option<ClockId> {
1790    match loc {
1791        LocationId::Tick(id, _) => Some(*id),
1792        LocationId::Atomic(inner) => tick_of(inner),
1793        _ => None,
1794    }
1795}
1796
1797#[cfg(feature = "build")]
1798fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1799    match loc {
1800        LocationId::Tick(id, inner) => {
1801            *id = uf_find(uf, *id);
1802            remap_location(inner, uf);
1803        }
1804        LocationId::Atomic(inner) => {
1805            remap_location(inner, uf);
1806        }
1807        LocationId::Process(_) | LocationId::Cluster(_) => {}
1808    }
1809}
1810
1811#[cfg(feature = "build")]
1812fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1813    let p = *parent.get(&x).unwrap_or(&x);
1814    if p == x {
1815        return x;
1816    }
1817    let root = uf_find(parent, p);
1818    parent.insert(x, root);
1819    root
1820}
1821
1822#[cfg(feature = "build")]
1823fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1824    let ra = uf_find(parent, a);
1825    let rb = uf_find(parent, b);
1826    if ra != rb {
1827        parent.insert(ra, rb);
1828    }
1829}
1830
1831/// Traverse the IR to build a union-find that unifies tick IDs connected
1832/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1833/// rewrite all `LocationId`s to use the representative tick ID.
1834#[cfg(feature = "build")]
1835pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1836    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1837
1838    // Pass 1: collect unifications.
1839    transform_bottom_up(
1840        ir,
1841        &mut |_| {},
1842        &mut |node: &mut HydroNode| match node {
1843            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1844                if let (Some(a), Some(b)) = (
1845                    tick_of(&inner.metadata().location_id),
1846                    tick_of(&metadata.location_id),
1847                ) {
1848                    uf_union(&mut uf, a, b);
1849                }
1850            }
1851            HydroNode::Chain {
1852                first,
1853                second,
1854                metadata,
1855            }
1856            | HydroNode::ChainFirst {
1857                first,
1858                second,
1859                metadata,
1860            }
1861            | HydroNode::MergeOrdered {
1862                first,
1863                second,
1864                metadata,
1865            } => {
1866                if let (Some(a), Some(b)) = (
1867                    tick_of(&first.metadata().location_id),
1868                    tick_of(&metadata.location_id),
1869                ) {
1870                    uf_union(&mut uf, a, b);
1871                }
1872                if let (Some(a), Some(b)) = (
1873                    tick_of(&second.metadata().location_id),
1874                    tick_of(&metadata.location_id),
1875                ) {
1876                    uf_union(&mut uf, a, b);
1877                }
1878            }
1879            _ => {}
1880        },
1881        false,
1882    );
1883
1884    // Pass 2: rewrite all LocationIds.
1885    transform_bottom_up(
1886        ir,
1887        &mut |_| {},
1888        &mut |node: &mut HydroNode| {
1889            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1890        },
1891        false,
1892    );
1893}
1894
1895#[cfg(feature = "build")]
1896pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1897    let mut builders = SecondaryMap::new();
1898    let mut seen_tees = HashMap::new();
1899    let mut built_tees = HashMap::new();
1900    let mut next_stmt_id = StmtId::default();
1901    let mut fold_hooked_idents = HashSet::new();
1902    for leaf in ir {
1903        leaf.emit(
1904            &mut builders,
1905            &mut seen_tees,
1906            &mut built_tees,
1907            &mut next_stmt_id,
1908            &mut fold_hooked_idents,
1909        );
1910    }
1911    builders
1912}
1913
1914#[cfg(feature = "build")]
1915pub fn traverse_dfir(
1916    ir: &mut [HydroRoot],
1917    transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1918    transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1919) {
1920    let mut seen_tees = HashMap::new();
1921    let mut built_tees = HashMap::new();
1922    let mut next_stmt_id = StmtId::default();
1923    let mut fold_hooked_idents = HashSet::new();
1924    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1925    ir.iter_mut().for_each(|leaf| {
1926        leaf.emit_core(
1927            &mut callback,
1928            &mut seen_tees,
1929            &mut built_tees,
1930            &mut next_stmt_id,
1931            &mut fold_hooked_idents,
1932        );
1933    });
1934}
1935
1936pub fn transform_bottom_up(
1937    ir: &mut [HydroRoot],
1938    transform_root: &mut impl FnMut(&mut HydroRoot),
1939    transform_node: &mut impl FnMut(&mut HydroNode),
1940    check_well_formed: bool,
1941) {
1942    let mut seen_tees = HashMap::new();
1943    ir.iter_mut().for_each(|leaf| {
1944        leaf.transform_bottom_up(
1945            transform_root,
1946            transform_node,
1947            &mut seen_tees,
1948            check_well_formed,
1949        );
1950    });
1951}
1952
1953pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1954    let mut seen_tees = HashMap::new();
1955    ir.iter()
1956        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1957        .collect()
1958}
1959
1960type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1961thread_local! {
1962    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1963    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1964    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1965    /// on subsequent encounters, preventing infinite loops.
1966    static SERIALIZED_SHARED: PrintedTees
1967        = const { RefCell::new(None) };
1968}
1969
1970pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1971    PRINTED_TEES.with(|printed_tees| {
1972        let mut printed_tees_mut = printed_tees.borrow_mut();
1973        *printed_tees_mut = Some((0, HashMap::new()));
1974        drop(printed_tees_mut);
1975
1976        let ret = f();
1977
1978        let mut printed_tees_mut = printed_tees.borrow_mut();
1979        *printed_tees_mut = None;
1980
1981        ret
1982    })
1983}
1984
1985/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1986/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1987/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1988/// back-reference.  The tracking state is restored when `f` returns or panics.
1989pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1990    let _guard = SerializedSharedGuard::enter();
1991    f()
1992}
1993
1994/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1995/// making `serialize_dedup_shared` re-entrant and panic-safe.
1996struct SerializedSharedGuard {
1997    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1998}
1999
2000impl SerializedSharedGuard {
2001    fn enter() -> Self {
2002        let previous = SERIALIZED_SHARED.with(|cell| {
2003            let mut guard = cell.borrow_mut();
2004            guard.replace((0, HashMap::new()))
2005        });
2006        Self { previous }
2007    }
2008}
2009
2010impl Drop for SerializedSharedGuard {
2011    fn drop(&mut self) {
2012        SERIALIZED_SHARED.with(|cell| {
2013            *cell.borrow_mut() = self.previous.take();
2014        });
2015    }
2016}
2017
2018pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2019
2020impl serde::Serialize for SharedNode {
2021    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2022    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2023    /// same subtree every time and, if the graph ever contains a cycle, loop
2024    /// forever.
2025    ///
2026    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2027    /// integer id.  The first time we see a pointer we assign it the next id and
2028    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2029    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2030    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2031    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2032        SERIALIZED_SHARED.with(|cell| {
2033            let mut guard = cell.borrow_mut();
2034            // (next_id, pointer → assigned_id)
2035            let state = guard.as_mut().ok_or_else(|| {
2036                serde::ser::Error::custom(
2037                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2038                )
2039            })?;
2040            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2041
2042            if let Some(&id) = state.1.get(&ptr) {
2043                drop(guard);
2044                use serde::ser::SerializeMap;
2045                let mut map = serializer.serialize_map(Some(1))?;
2046                map.serialize_entry("$shared_ref", &id)?;
2047                map.end()
2048            } else {
2049                let id = state.0;
2050                state.0 += 1;
2051                state.1.insert(ptr, id);
2052                drop(guard);
2053
2054                use serde::ser::SerializeMap;
2055                let mut map = serializer.serialize_map(Some(2))?;
2056                map.serialize_entry("$shared", &id)?;
2057                map.serialize_entry("node", &*self.0.borrow())?;
2058                map.end()
2059            }
2060        })
2061    }
2062}
2063
2064impl SharedNode {
2065    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2066        Rc::as_ptr(&self.0)
2067    }
2068}
2069
2070impl Debug for SharedNode {
2071    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2072        PRINTED_TEES.with(|printed_tees| {
2073            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2074            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2075
2076            if let Some(printed_tees_mut) = printed_tees_mut {
2077                if let Some(existing) = printed_tees_mut
2078                    .1
2079                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2080                {
2081                    write!(f, "<shared {}>", existing)
2082                } else {
2083                    let next_id = printed_tees_mut.0;
2084                    printed_tees_mut.0 += 1;
2085                    printed_tees_mut
2086                        .1
2087                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2088                    drop(printed_tees_mut_borrow);
2089                    write!(f, "<shared {}>: ", next_id)?;
2090                    Debug::fmt(&self.0.borrow(), f)
2091                }
2092            } else {
2093                drop(printed_tees_mut_borrow);
2094                write!(f, "<shared>: ")?;
2095                Debug::fmt(&self.0.borrow(), f)
2096            }
2097        })
2098    }
2099}
2100
2101impl Hash for SharedNode {
2102    fn hash<H: Hasher>(&self, state: &mut H) {
2103        self.0.borrow_mut().hash(state);
2104    }
2105}
2106
2107#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2108pub enum BoundKind {
2109    Unbounded,
2110    Bounded,
2111}
2112
2113#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2114pub enum StreamOrder {
2115    NoOrder,
2116    TotalOrder,
2117}
2118
2119#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2120pub enum StreamRetry {
2121    AtLeastOnce,
2122    ExactlyOnce,
2123}
2124
2125#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2126pub enum KeyedSingletonBoundKind {
2127    Unbounded,
2128    MonotonicValue,
2129    BoundedValue,
2130    Bounded,
2131}
2132
2133#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2134pub enum SingletonBoundKind {
2135    Unbounded,
2136    Monotonic,
2137    Bounded,
2138}
2139
2140#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2141pub enum CollectionKind {
2142    Stream {
2143        bound: BoundKind,
2144        order: StreamOrder,
2145        retry: StreamRetry,
2146        element_type: DebugType,
2147    },
2148    Singleton {
2149        bound: SingletonBoundKind,
2150        element_type: DebugType,
2151    },
2152    Optional {
2153        bound: BoundKind,
2154        element_type: DebugType,
2155    },
2156    KeyedStream {
2157        bound: BoundKind,
2158        value_order: StreamOrder,
2159        value_retry: StreamRetry,
2160        key_type: DebugType,
2161        value_type: DebugType,
2162    },
2163    KeyedSingleton {
2164        bound: KeyedSingletonBoundKind,
2165        key_type: DebugType,
2166        value_type: DebugType,
2167    },
2168}
2169
2170impl CollectionKind {
2171    pub fn is_bounded(&self) -> bool {
2172        matches!(
2173            self,
2174            CollectionKind::Stream {
2175                bound: BoundKind::Bounded,
2176                ..
2177            } | CollectionKind::Singleton {
2178                bound: SingletonBoundKind::Bounded,
2179                ..
2180            } | CollectionKind::Optional {
2181                bound: BoundKind::Bounded,
2182                ..
2183            } | CollectionKind::KeyedStream {
2184                bound: BoundKind::Bounded,
2185                ..
2186            } | CollectionKind::KeyedSingleton {
2187                bound: KeyedSingletonBoundKind::Bounded,
2188                ..
2189            }
2190        )
2191    }
2192}
2193
2194#[derive(Clone, serde::Serialize)]
2195pub struct HydroIrMetadata {
2196    pub location_id: LocationId,
2197    pub collection_kind: CollectionKind,
2198    pub consistency: Option<ClusterConsistency>,
2199    pub cardinality: Option<usize>,
2200    pub tag: Option<String>,
2201    pub op: HydroIrOpMetadata,
2202}
2203
2204// HydroIrMetadata shouldn't be used to hash or compare
2205impl Hash for HydroIrMetadata {
2206    fn hash<H: Hasher>(&self, _: &mut H) {}
2207}
2208
2209impl PartialEq for HydroIrMetadata {
2210    fn eq(&self, _: &Self) -> bool {
2211        true
2212    }
2213}
2214
2215impl Eq for HydroIrMetadata {}
2216
2217impl Debug for HydroIrMetadata {
2218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2219        f.debug_struct("HydroIrMetadata")
2220            .field("location_id", &self.location_id)
2221            .field("collection_kind", &self.collection_kind)
2222            .finish()
2223    }
2224}
2225
2226/// Metadata that is specific to the operator itself, rather than its outputs.
2227/// This is available on _both_ inner nodes and roots.
2228#[derive(Clone, serde::Serialize)]
2229pub struct HydroIrOpMetadata {
2230    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2231    pub backtrace: Backtrace,
2232    pub cpu_usage: Option<f64>,
2233    pub network_recv_cpu_usage: Option<f64>,
2234    pub id: Option<usize>,
2235}
2236
2237impl HydroIrOpMetadata {
2238    #[expect(
2239        clippy::new_without_default,
2240        reason = "explicit calls to new ensure correct backtrace bounds"
2241    )]
2242    pub fn new() -> HydroIrOpMetadata {
2243        Self::new_with_skip(1)
2244    }
2245
2246    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2247        HydroIrOpMetadata {
2248            backtrace: Backtrace::get_backtrace(2 + skip_count),
2249            cpu_usage: None,
2250            network_recv_cpu_usage: None,
2251            id: None,
2252        }
2253    }
2254}
2255
2256impl Debug for HydroIrOpMetadata {
2257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2258        f.debug_struct("HydroIrOpMetadata").finish()
2259    }
2260}
2261
2262impl Hash for HydroIrOpMetadata {
2263    fn hash<H: Hasher>(&self, _: &mut H) {}
2264}
2265
2266/// An intermediate node in a Hydro graph, which consumes data
2267/// from upstream nodes and emits data to downstream nodes.
2268#[derive(Debug, Hash, serde::Serialize)]
2269pub enum HydroNode {
2270    Placeholder,
2271
2272    /// Manually "casts" between two different collection kinds.
2273    ///
2274    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2275    /// correctness checks. In particular, the user must ensure that every possible
2276    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2277    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2278    /// collection. This ensures that the simulator does not miss any possible outputs.
2279    Cast {
2280        inner: Box<HydroNode>,
2281        metadata: HydroIrMetadata,
2282    },
2283
2284    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2285    /// interpretation of the input stream.
2286    ///
2287    /// In production, this simply passes through the input, but in simulation, this operator
2288    /// explicitly selects a randomized interpretation.
2289    ObserveNonDet {
2290        inner: Box<HydroNode>,
2291        trusted: bool, // if true, we do not need to simulate non-determinism
2292        metadata: HydroIrMetadata,
2293    },
2294
2295    Source {
2296        source: HydroSource,
2297        metadata: HydroIrMetadata,
2298    },
2299
2300    SingletonSource {
2301        value: DebugExpr,
2302        first_tick_only: bool,
2303        metadata: HydroIrMetadata,
2304    },
2305
2306    CycleSource {
2307        cycle_id: CycleId,
2308        metadata: HydroIrMetadata,
2309    },
2310
2311    Tee {
2312        inner: SharedNode,
2313        metadata: HydroIrMetadata,
2314    },
2315
2316    /// A singleton materialization point. Wraps a SharedNode so that:
2317    /// - The pipe output delivers the single item to one consumer
2318    /// - `#var` references can borrow the value from the singleton slot
2319    ///
2320    /// In DFIR codegen, emits `ident = inner_ident -> singleton()`.
2321    ///
2322    /// Uses the same `built_tees` dedup pattern as `Tee`.
2323    Singleton {
2324        inner: SharedNode,
2325        metadata: HydroIrMetadata,
2326    },
2327
2328    Partition {
2329        inner: SharedNode,
2330        f: ClosureExpr,
2331        is_true: bool,
2332        metadata: HydroIrMetadata,
2333    },
2334
2335    BeginAtomic {
2336        inner: Box<HydroNode>,
2337        metadata: HydroIrMetadata,
2338    },
2339
2340    EndAtomic {
2341        inner: Box<HydroNode>,
2342        metadata: HydroIrMetadata,
2343    },
2344
2345    Batch {
2346        inner: Box<HydroNode>,
2347        metadata: HydroIrMetadata,
2348    },
2349
2350    YieldConcat {
2351        inner: Box<HydroNode>,
2352        metadata: HydroIrMetadata,
2353    },
2354
2355    Chain {
2356        first: Box<HydroNode>,
2357        second: Box<HydroNode>,
2358        metadata: HydroIrMetadata,
2359    },
2360
2361    MergeOrdered {
2362        first: Box<HydroNode>,
2363        second: Box<HydroNode>,
2364        metadata: HydroIrMetadata,
2365    },
2366
2367    ChainFirst {
2368        first: Box<HydroNode>,
2369        second: Box<HydroNode>,
2370        metadata: HydroIrMetadata,
2371    },
2372
2373    CrossProduct {
2374        left: Box<HydroNode>,
2375        right: Box<HydroNode>,
2376        metadata: HydroIrMetadata,
2377    },
2378
2379    CrossSingleton {
2380        left: Box<HydroNode>,
2381        right: Box<HydroNode>,
2382        metadata: HydroIrMetadata,
2383    },
2384
2385    Join {
2386        left: Box<HydroNode>,
2387        right: Box<HydroNode>,
2388        metadata: HydroIrMetadata,
2389    },
2390
2391    /// Asymmetric join where the right (build) side is bounded.
2392    /// The build side is accumulated (stratum-delayed) into a hash table,
2393    /// then the left (probe) side streams through preserving its ordering.
2394    JoinHalf {
2395        left: Box<HydroNode>,
2396        right: Box<HydroNode>,
2397        metadata: HydroIrMetadata,
2398    },
2399
2400    Difference {
2401        pos: Box<HydroNode>,
2402        neg: Box<HydroNode>,
2403        metadata: HydroIrMetadata,
2404    },
2405
2406    AntiJoin {
2407        pos: Box<HydroNode>,
2408        neg: Box<HydroNode>,
2409        metadata: HydroIrMetadata,
2410    },
2411
2412    ResolveFutures {
2413        input: Box<HydroNode>,
2414        metadata: HydroIrMetadata,
2415    },
2416    ResolveFuturesBlocking {
2417        input: Box<HydroNode>,
2418        metadata: HydroIrMetadata,
2419    },
2420    ResolveFuturesOrdered {
2421        input: Box<HydroNode>,
2422        metadata: HydroIrMetadata,
2423    },
2424
2425    Map {
2426        f: ClosureExpr,
2427        input: Box<HydroNode>,
2428        metadata: HydroIrMetadata,
2429    },
2430    FlatMap {
2431        f: ClosureExpr,
2432        input: Box<HydroNode>,
2433        metadata: HydroIrMetadata,
2434    },
2435    FlatMapStreamBlocking {
2436        f: ClosureExpr,
2437        input: Box<HydroNode>,
2438        metadata: HydroIrMetadata,
2439    },
2440    Filter {
2441        f: ClosureExpr,
2442        input: Box<HydroNode>,
2443        metadata: HydroIrMetadata,
2444    },
2445    FilterMap {
2446        f: ClosureExpr,
2447        input: Box<HydroNode>,
2448        metadata: HydroIrMetadata,
2449    },
2450
2451    DeferTick {
2452        input: Box<HydroNode>,
2453        metadata: HydroIrMetadata,
2454    },
2455    Enumerate {
2456        input: Box<HydroNode>,
2457        metadata: HydroIrMetadata,
2458    },
2459    Inspect {
2460        f: ClosureExpr,
2461        input: Box<HydroNode>,
2462        metadata: HydroIrMetadata,
2463    },
2464
2465    Unique {
2466        input: Box<HydroNode>,
2467        metadata: HydroIrMetadata,
2468    },
2469
2470    Sort {
2471        input: Box<HydroNode>,
2472        metadata: HydroIrMetadata,
2473    },
2474    Fold {
2475        init: ClosureExpr,
2476        acc: ClosureExpr,
2477        input: Box<HydroNode>,
2478        /// Whether the accumulator was proven commutative.
2479        is_commutative: bool,
2480        /// Whether the accumulator was proven idempotent.
2481        is_idempotent: bool,
2482        metadata: HydroIrMetadata,
2483    },
2484
2485    Scan {
2486        init: ClosureExpr,
2487        acc: ClosureExpr,
2488        input: Box<HydroNode>,
2489        metadata: HydroIrMetadata,
2490    },
2491    ScanAsyncBlocking {
2492        init: ClosureExpr,
2493        acc: ClosureExpr,
2494        input: Box<HydroNode>,
2495        metadata: HydroIrMetadata,
2496    },
2497    FoldKeyed {
2498        init: ClosureExpr,
2499        acc: ClosureExpr,
2500        input: Box<HydroNode>,
2501        /// Whether the accumulator was proven commutative.
2502        is_commutative: bool,
2503        /// Whether the accumulator was proven idempotent.
2504        is_idempotent: bool,
2505        metadata: HydroIrMetadata,
2506    },
2507
2508    Reduce {
2509        f: ClosureExpr,
2510        input: Box<HydroNode>,
2511        /// Whether the reducer was proven commutative.
2512        is_commutative: bool,
2513        /// Whether the reducer was proven idempotent.
2514        is_idempotent: bool,
2515        metadata: HydroIrMetadata,
2516    },
2517    ReduceKeyed {
2518        f: ClosureExpr,
2519        input: Box<HydroNode>,
2520        /// Whether the reducer was proven commutative.
2521        is_commutative: bool,
2522        /// Whether the reducer was proven idempotent.
2523        is_idempotent: bool,
2524        metadata: HydroIrMetadata,
2525    },
2526    ReduceKeyedWatermark {
2527        f: ClosureExpr,
2528        input: Box<HydroNode>,
2529        watermark: Box<HydroNode>,
2530        /// Whether the reducer was proven commutative.
2531        is_commutative: bool,
2532        /// Whether the reducer was proven idempotent.
2533        is_idempotent: bool,
2534        metadata: HydroIrMetadata,
2535    },
2536
2537    Network {
2538        name: Option<String>,
2539        networking_info: crate::networking::NetworkingInfo,
2540        serialize_fn: Option<DebugExpr>,
2541        instantiate_fn: DebugInstantiate,
2542        deserialize_fn: Option<DebugExpr>,
2543        input: Box<HydroNode>,
2544        metadata: HydroIrMetadata,
2545    },
2546
2547    ExternalInput {
2548        from_external_key: LocationKey,
2549        from_port_id: ExternalPortId,
2550        from_many: bool,
2551        codec_type: DebugType,
2552        #[serde(skip)]
2553        port_hint: NetworkHint,
2554        instantiate_fn: DebugInstantiate,
2555        deserialize_fn: Option<DebugExpr>,
2556        metadata: HydroIrMetadata,
2557    },
2558
2559    Counter {
2560        tag: String,
2561        duration: DebugExpr,
2562        prefix: String,
2563        input: Box<HydroNode>,
2564        metadata: HydroIrMetadata,
2565    },
2566
2567    AssertIsConsistent {
2568        inner: Box<HydroNode>,
2569        trusted: bool,
2570        metadata: HydroIrMetadata,
2571    },
2572
2573    UnboundSingleton {
2574        inner: Box<HydroNode>,
2575        metadata: HydroIrMetadata,
2576    },
2577}
2578
2579pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2580pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2581
2582impl HydroNode {
2583    pub fn transform_bottom_up(
2584        &mut self,
2585        transform: &mut impl FnMut(&mut HydroNode),
2586        seen_tees: &mut SeenSharedNodes,
2587        check_well_formed: bool,
2588    ) {
2589        self.transform_children(
2590            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2591            seen_tees,
2592        );
2593
2594        transform(self);
2595
2596        let self_location = self.metadata().location_id.root();
2597
2598        if check_well_formed {
2599            match &*self {
2600                HydroNode::Network { .. } => {}
2601                _ => {
2602                    self.input_metadata().iter().for_each(|i| {
2603                        if i.location_id.root() != self_location {
2604                            panic!(
2605                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2606                                i,
2607                                i.location_id.root(),
2608                                self,
2609                                self_location
2610                            )
2611                        }
2612                    });
2613                }
2614            }
2615        }
2616    }
2617
2618    #[inline(always)]
2619    pub fn transform_children(
2620        &mut self,
2621        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2622        seen_tees: &mut SeenSharedNodes,
2623    ) {
2624        match self {
2625            HydroNode::Placeholder => {
2626                panic!();
2627            }
2628
2629            HydroNode::Source { .. }
2630            | HydroNode::SingletonSource { .. }
2631            | HydroNode::CycleSource { .. }
2632            | HydroNode::ExternalInput { .. } => {}
2633
2634            HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2635                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2636                    *inner = SharedNode(transformed.clone());
2637                } else {
2638                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2639                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2640                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2641                    transform(&mut orig, seen_tees);
2642                    *transformed_cell.borrow_mut() = orig;
2643                    *inner = SharedNode(transformed_cell);
2644                }
2645            }
2646
2647            HydroNode::Partition { inner, f, .. } => {
2648                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2649                    *inner = SharedNode(transformed.clone());
2650                } else {
2651                    f.transform_children(&mut transform, seen_tees);
2652                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2653                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2654                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2655                    transform(&mut orig, seen_tees);
2656                    *transformed_cell.borrow_mut() = orig;
2657                    *inner = SharedNode(transformed_cell);
2658                }
2659            }
2660
2661            HydroNode::Cast { inner, .. }
2662            | HydroNode::ObserveNonDet { inner, .. }
2663            | HydroNode::BeginAtomic { inner, .. }
2664            | HydroNode::EndAtomic { inner, .. }
2665            | HydroNode::Batch { inner, .. }
2666            | HydroNode::YieldConcat { inner, .. }
2667            | HydroNode::UnboundSingleton { inner, .. }
2668            | HydroNode::AssertIsConsistent { inner, .. } => {
2669                transform(inner.as_mut(), seen_tees);
2670            }
2671
2672            HydroNode::Chain { first, second, .. } => {
2673                transform(first.as_mut(), seen_tees);
2674                transform(second.as_mut(), seen_tees);
2675            }
2676
2677            HydroNode::MergeOrdered { first, second, .. } => {
2678                transform(first.as_mut(), seen_tees);
2679                transform(second.as_mut(), seen_tees);
2680            }
2681
2682            HydroNode::ChainFirst { first, second, .. } => {
2683                transform(first.as_mut(), seen_tees);
2684                transform(second.as_mut(), seen_tees);
2685            }
2686
2687            HydroNode::CrossSingleton { left, right, .. }
2688            | HydroNode::CrossProduct { left, right, .. }
2689            | HydroNode::Join { left, right, .. }
2690            | HydroNode::JoinHalf { left, right, .. } => {
2691                transform(left.as_mut(), seen_tees);
2692                transform(right.as_mut(), seen_tees);
2693            }
2694
2695            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2696                transform(pos.as_mut(), seen_tees);
2697                transform(neg.as_mut(), seen_tees);
2698            }
2699
2700            HydroNode::Map { f, input, .. } => {
2701                f.transform_children(&mut transform, seen_tees);
2702                transform(input.as_mut(), seen_tees);
2703            }
2704            HydroNode::FlatMap { f, input, .. }
2705            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2706            | HydroNode::Filter { f, input, .. }
2707            | HydroNode::FilterMap { f, input, .. }
2708            | HydroNode::Inspect { f, input, .. }
2709            | HydroNode::Reduce { f, input, .. }
2710            | HydroNode::ReduceKeyed { f, input, .. } => {
2711                f.transform_children(&mut transform, seen_tees);
2712                transform(input.as_mut(), seen_tees);
2713            }
2714            HydroNode::ReduceKeyedWatermark {
2715                f,
2716                input,
2717                watermark,
2718                ..
2719            } => {
2720                f.transform_children(&mut transform, seen_tees);
2721                transform(input.as_mut(), seen_tees);
2722                transform(watermark.as_mut(), seen_tees);
2723            }
2724            HydroNode::Fold {
2725                init, acc, input, ..
2726            }
2727            | HydroNode::Scan {
2728                init, acc, input, ..
2729            }
2730            | HydroNode::ScanAsyncBlocking {
2731                init, acc, input, ..
2732            }
2733            | HydroNode::FoldKeyed {
2734                init, acc, input, ..
2735            } => {
2736                init.transform_children(&mut transform, seen_tees);
2737                acc.transform_children(&mut transform, seen_tees);
2738                transform(input.as_mut(), seen_tees);
2739            }
2740            HydroNode::ResolveFutures { input, .. }
2741            | HydroNode::ResolveFuturesBlocking { input, .. }
2742            | HydroNode::ResolveFuturesOrdered { input, .. }
2743            | HydroNode::Sort { input, .. }
2744            | HydroNode::DeferTick { input, .. }
2745            | HydroNode::Enumerate { input, .. }
2746            | HydroNode::Unique { input, .. }
2747            | HydroNode::Network { input, .. }
2748            | HydroNode::Counter { input, .. } => {
2749                transform(input.as_mut(), seen_tees);
2750            }
2751        }
2752    }
2753
2754    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2755        match self {
2756            HydroNode::Placeholder => HydroNode::Placeholder,
2757            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2758                inner: Box::new(inner.deep_clone(seen_tees)),
2759                metadata: metadata.clone(),
2760            },
2761            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2762                inner: Box::new(inner.deep_clone(seen_tees)),
2763                metadata: metadata.clone(),
2764            },
2765            HydroNode::ObserveNonDet {
2766                inner,
2767                trusted,
2768                metadata,
2769            } => HydroNode::ObserveNonDet {
2770                inner: Box::new(inner.deep_clone(seen_tees)),
2771                trusted: *trusted,
2772                metadata: metadata.clone(),
2773            },
2774            HydroNode::AssertIsConsistent {
2775                inner,
2776                trusted,
2777                metadata,
2778            } => HydroNode::AssertIsConsistent {
2779                inner: Box::new(inner.deep_clone(seen_tees)),
2780                trusted: *trusted,
2781                metadata: metadata.clone(),
2782            },
2783            HydroNode::Source { source, metadata } => HydroNode::Source {
2784                source: source.clone(),
2785                metadata: metadata.clone(),
2786            },
2787            HydroNode::SingletonSource {
2788                value,
2789                first_tick_only,
2790                metadata,
2791            } => HydroNode::SingletonSource {
2792                value: value.clone(),
2793                first_tick_only: *first_tick_only,
2794                metadata: metadata.clone(),
2795            },
2796            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2797                cycle_id: *cycle_id,
2798                metadata: metadata.clone(),
2799            },
2800            HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2801                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2802                    SharedNode(transformed.clone())
2803                } else {
2804                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2805                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2806                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2807                    *new_rc.borrow_mut() = cloned;
2808                    SharedNode(new_rc)
2809                };
2810                if matches!(self, HydroNode::Singleton { .. }) {
2811                    HydroNode::Singleton {
2812                        inner: cloned_inner,
2813                        metadata: metadata.clone(),
2814                    }
2815                } else {
2816                    HydroNode::Tee {
2817                        inner: cloned_inner,
2818                        metadata: metadata.clone(),
2819                    }
2820                }
2821            }
2822            HydroNode::Partition {
2823                inner,
2824                f,
2825                is_true,
2826                metadata,
2827            } => {
2828                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2829                    HydroNode::Partition {
2830                        inner: SharedNode(transformed.clone()),
2831                        f: f.deep_clone(seen_tees),
2832                        is_true: *is_true,
2833                        metadata: metadata.clone(),
2834                    }
2835                } else {
2836                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2837                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2838                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2839                    *new_rc.borrow_mut() = cloned;
2840                    HydroNode::Partition {
2841                        inner: SharedNode(new_rc),
2842                        f: f.deep_clone(seen_tees),
2843                        is_true: *is_true,
2844                        metadata: metadata.clone(),
2845                    }
2846                }
2847            }
2848            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2849                inner: Box::new(inner.deep_clone(seen_tees)),
2850                metadata: metadata.clone(),
2851            },
2852            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2853                inner: Box::new(inner.deep_clone(seen_tees)),
2854                metadata: metadata.clone(),
2855            },
2856            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2857                inner: Box::new(inner.deep_clone(seen_tees)),
2858                metadata: metadata.clone(),
2859            },
2860            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2861                inner: Box::new(inner.deep_clone(seen_tees)),
2862                metadata: metadata.clone(),
2863            },
2864            HydroNode::Chain {
2865                first,
2866                second,
2867                metadata,
2868            } => HydroNode::Chain {
2869                first: Box::new(first.deep_clone(seen_tees)),
2870                second: Box::new(second.deep_clone(seen_tees)),
2871                metadata: metadata.clone(),
2872            },
2873            HydroNode::MergeOrdered {
2874                first,
2875                second,
2876                metadata,
2877            } => HydroNode::MergeOrdered {
2878                first: Box::new(first.deep_clone(seen_tees)),
2879                second: Box::new(second.deep_clone(seen_tees)),
2880                metadata: metadata.clone(),
2881            },
2882            HydroNode::ChainFirst {
2883                first,
2884                second,
2885                metadata,
2886            } => HydroNode::ChainFirst {
2887                first: Box::new(first.deep_clone(seen_tees)),
2888                second: Box::new(second.deep_clone(seen_tees)),
2889                metadata: metadata.clone(),
2890            },
2891            HydroNode::CrossProduct {
2892                left,
2893                right,
2894                metadata,
2895            } => HydroNode::CrossProduct {
2896                left: Box::new(left.deep_clone(seen_tees)),
2897                right: Box::new(right.deep_clone(seen_tees)),
2898                metadata: metadata.clone(),
2899            },
2900            HydroNode::CrossSingleton {
2901                left,
2902                right,
2903                metadata,
2904            } => HydroNode::CrossSingleton {
2905                left: Box::new(left.deep_clone(seen_tees)),
2906                right: Box::new(right.deep_clone(seen_tees)),
2907                metadata: metadata.clone(),
2908            },
2909            HydroNode::Join {
2910                left,
2911                right,
2912                metadata,
2913            } => HydroNode::Join {
2914                left: Box::new(left.deep_clone(seen_tees)),
2915                right: Box::new(right.deep_clone(seen_tees)),
2916                metadata: metadata.clone(),
2917            },
2918            HydroNode::JoinHalf {
2919                left,
2920                right,
2921                metadata,
2922            } => HydroNode::JoinHalf {
2923                left: Box::new(left.deep_clone(seen_tees)),
2924                right: Box::new(right.deep_clone(seen_tees)),
2925                metadata: metadata.clone(),
2926            },
2927            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2928                pos: Box::new(pos.deep_clone(seen_tees)),
2929                neg: Box::new(neg.deep_clone(seen_tees)),
2930                metadata: metadata.clone(),
2931            },
2932            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2933                pos: Box::new(pos.deep_clone(seen_tees)),
2934                neg: Box::new(neg.deep_clone(seen_tees)),
2935                metadata: metadata.clone(),
2936            },
2937            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2938                input: Box::new(input.deep_clone(seen_tees)),
2939                metadata: metadata.clone(),
2940            },
2941            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2942                HydroNode::ResolveFuturesBlocking {
2943                    input: Box::new(input.deep_clone(seen_tees)),
2944                    metadata: metadata.clone(),
2945                }
2946            }
2947            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2948                HydroNode::ResolveFuturesOrdered {
2949                    input: Box::new(input.deep_clone(seen_tees)),
2950                    metadata: metadata.clone(),
2951                }
2952            }
2953            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2954                f: f.deep_clone(seen_tees),
2955                input: Box::new(input.deep_clone(seen_tees)),
2956                metadata: metadata.clone(),
2957            },
2958            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2959                f: f.deep_clone(seen_tees),
2960                input: Box::new(input.deep_clone(seen_tees)),
2961                metadata: metadata.clone(),
2962            },
2963            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2964                HydroNode::FlatMapStreamBlocking {
2965                    f: f.deep_clone(seen_tees),
2966                    input: Box::new(input.deep_clone(seen_tees)),
2967                    metadata: metadata.clone(),
2968                }
2969            }
2970            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2971                f: f.deep_clone(seen_tees),
2972                input: Box::new(input.deep_clone(seen_tees)),
2973                metadata: metadata.clone(),
2974            },
2975            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2976                f: f.deep_clone(seen_tees),
2977                input: Box::new(input.deep_clone(seen_tees)),
2978                metadata: metadata.clone(),
2979            },
2980            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2981                input: Box::new(input.deep_clone(seen_tees)),
2982                metadata: metadata.clone(),
2983            },
2984            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2985                input: Box::new(input.deep_clone(seen_tees)),
2986                metadata: metadata.clone(),
2987            },
2988            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2989                f: f.deep_clone(seen_tees),
2990                input: Box::new(input.deep_clone(seen_tees)),
2991                metadata: metadata.clone(),
2992            },
2993            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2994                input: Box::new(input.deep_clone(seen_tees)),
2995                metadata: metadata.clone(),
2996            },
2997            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2998                input: Box::new(input.deep_clone(seen_tees)),
2999                metadata: metadata.clone(),
3000            },
3001            HydroNode::Fold {
3002                init,
3003                acc,
3004                input,
3005                is_commutative,
3006                is_idempotent,
3007                metadata,
3008            } => HydroNode::Fold {
3009                init: init.deep_clone(seen_tees),
3010                acc: acc.deep_clone(seen_tees),
3011                input: Box::new(input.deep_clone(seen_tees)),
3012                is_commutative: *is_commutative,
3013                is_idempotent: *is_idempotent,
3014                metadata: metadata.clone(),
3015            },
3016            HydroNode::Scan {
3017                init,
3018                acc,
3019                input,
3020                metadata,
3021            } => HydroNode::Scan {
3022                init: init.deep_clone(seen_tees),
3023                acc: acc.deep_clone(seen_tees),
3024                input: Box::new(input.deep_clone(seen_tees)),
3025                metadata: metadata.clone(),
3026            },
3027            HydroNode::ScanAsyncBlocking {
3028                init,
3029                acc,
3030                input,
3031                metadata,
3032            } => HydroNode::ScanAsyncBlocking {
3033                init: init.deep_clone(seen_tees),
3034                acc: acc.deep_clone(seen_tees),
3035                input: Box::new(input.deep_clone(seen_tees)),
3036                metadata: metadata.clone(),
3037            },
3038            HydroNode::FoldKeyed {
3039                init,
3040                acc,
3041                input,
3042                is_commutative,
3043                is_idempotent,
3044                metadata,
3045            } => HydroNode::FoldKeyed {
3046                init: init.deep_clone(seen_tees),
3047                acc: acc.deep_clone(seen_tees),
3048                input: Box::new(input.deep_clone(seen_tees)),
3049                is_commutative: *is_commutative,
3050                is_idempotent: *is_idempotent,
3051                metadata: metadata.clone(),
3052            },
3053            HydroNode::ReduceKeyedWatermark {
3054                f,
3055                input,
3056                watermark,
3057                is_commutative,
3058                is_idempotent,
3059                metadata,
3060            } => HydroNode::ReduceKeyedWatermark {
3061                f: f.deep_clone(seen_tees),
3062                input: Box::new(input.deep_clone(seen_tees)),
3063                watermark: Box::new(watermark.deep_clone(seen_tees)),
3064                is_commutative: *is_commutative,
3065                is_idempotent: *is_idempotent,
3066                metadata: metadata.clone(),
3067            },
3068            HydroNode::Reduce {
3069                f,
3070                input,
3071                is_commutative,
3072                is_idempotent,
3073                metadata,
3074            } => HydroNode::Reduce {
3075                f: f.deep_clone(seen_tees),
3076                input: Box::new(input.deep_clone(seen_tees)),
3077                is_commutative: *is_commutative,
3078                is_idempotent: *is_idempotent,
3079                metadata: metadata.clone(),
3080            },
3081            HydroNode::ReduceKeyed {
3082                f,
3083                input,
3084                is_commutative,
3085                is_idempotent,
3086                metadata,
3087            } => HydroNode::ReduceKeyed {
3088                f: f.deep_clone(seen_tees),
3089                input: Box::new(input.deep_clone(seen_tees)),
3090                is_commutative: *is_commutative,
3091                is_idempotent: *is_idempotent,
3092                metadata: metadata.clone(),
3093            },
3094            HydroNode::Network {
3095                name,
3096                networking_info,
3097                serialize_fn,
3098                instantiate_fn,
3099                deserialize_fn,
3100                input,
3101                metadata,
3102            } => HydroNode::Network {
3103                name: name.clone(),
3104                networking_info: networking_info.clone(),
3105                serialize_fn: serialize_fn.clone(),
3106                instantiate_fn: instantiate_fn.clone(),
3107                deserialize_fn: deserialize_fn.clone(),
3108                input: Box::new(input.deep_clone(seen_tees)),
3109                metadata: metadata.clone(),
3110            },
3111            HydroNode::ExternalInput {
3112                from_external_key,
3113                from_port_id,
3114                from_many,
3115                codec_type,
3116                port_hint,
3117                instantiate_fn,
3118                deserialize_fn,
3119                metadata,
3120            } => HydroNode::ExternalInput {
3121                from_external_key: *from_external_key,
3122                from_port_id: *from_port_id,
3123                from_many: *from_many,
3124                codec_type: codec_type.clone(),
3125                port_hint: *port_hint,
3126                instantiate_fn: instantiate_fn.clone(),
3127                deserialize_fn: deserialize_fn.clone(),
3128                metadata: metadata.clone(),
3129            },
3130            HydroNode::Counter {
3131                tag,
3132                duration,
3133                prefix,
3134                input,
3135                metadata,
3136            } => HydroNode::Counter {
3137                tag: tag.clone(),
3138                duration: duration.clone(),
3139                prefix: prefix.clone(),
3140                input: Box::new(input.deep_clone(seen_tees)),
3141                metadata: metadata.clone(),
3142            },
3143        }
3144    }
3145
3146    #[cfg(feature = "build")]
3147    pub fn emit_core(
3148        &mut self,
3149        builders_or_callback: &mut BuildersOrCallback<
3150            impl FnMut(&mut HydroRoot, &mut StmtId),
3151            impl FnMut(&mut HydroNode, &mut StmtId),
3152        >,
3153        seen_tees: &mut SeenSharedNodes,
3154        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3155        next_stmt_id: &mut StmtId,
3156        fold_hooked_idents: &mut HashSet<String>,
3157    ) -> syn::Ident {
3158        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3159
3160        self.transform_bottom_up(
3161            &mut |node: &mut HydroNode| {
3162                let out_location = node.metadata().location_id.clone();
3163                match node {
3164                    HydroNode::Placeholder => {
3165                        panic!()
3166                    }
3167
3168                    HydroNode::Cast { .. } => {
3169                        // Cast passes through the input ident unchanged
3170                        // The input ident is already on the stack from processing the child
3171                        match builders_or_callback {
3172                            BuildersOrCallback::Builders(_) => {}
3173                            BuildersOrCallback::Callback(_, node_callback) => {
3174                                node_callback(node, next_stmt_id);
3175                            }
3176                        }
3177
3178                        let _ = next_stmt_id.get_and_increment();
3179                        // input_ident stays on stack as output
3180                    }
3181
3182                    HydroNode::UnboundSingleton { .. } => {
3183                        let inner_ident = ident_stack.pop().unwrap();
3184
3185                        let out_ident =
3186                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3187
3188                        match builders_or_callback {
3189                            BuildersOrCallback::Builders(graph_builders) => {
3190                                if graph_builders.singleton_intermediates() {
3191                                    let builder = graph_builders.get_dfir_mut(&out_location);
3192                                    builder.add_dfir(
3193                                        parse_quote! {
3194                                            #out_ident = #inner_ident;
3195                                        },
3196                                        None,
3197                                        None,
3198                                    );
3199                                } else {
3200                                    let builder = graph_builders.get_dfir_mut(&out_location);
3201                                    builder.add_dfir(
3202                                        parse_quote! {
3203                                            #out_ident = #inner_ident -> persist::<'static>();
3204                                        },
3205                                        None,
3206                                        None,
3207                                    );
3208                                }
3209                            }
3210                            BuildersOrCallback::Callback(_, node_callback) => {
3211                                node_callback(node, next_stmt_id);
3212                            }
3213                        }
3214
3215                        let _ = next_stmt_id.get_and_increment();
3216
3217                        ident_stack.push(out_ident);
3218                    }
3219
3220                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3221                        let inner_ident = ident_stack.pop().unwrap();
3222
3223                        let out_ident =
3224                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3225
3226                        match builders_or_callback {
3227                            BuildersOrCallback::Builders(graph_builders) => {
3228                                graph_builders.assert_is_consistent(
3229                                    *trusted,
3230                                    &inner.metadata().location_id,
3231                                    inner_ident,
3232                                    &out_ident,
3233                                );
3234                            }
3235                            BuildersOrCallback::Callback(_, node_callback) => {
3236                                node_callback(node, next_stmt_id);
3237                            }
3238                        }
3239
3240                        let _ = next_stmt_id.get_and_increment();
3241
3242                        ident_stack.push(out_ident);
3243                    }
3244
3245                    HydroNode::ObserveNonDet {
3246                        inner,
3247                        trusted,
3248                        metadata,
3249                        ..
3250                    } => {
3251                        let inner_ident = ident_stack.pop().unwrap();
3252
3253                        let observe_ident =
3254                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3255
3256                        match builders_or_callback {
3257                            BuildersOrCallback::Builders(graph_builders) => {
3258                                graph_builders.observe_nondet(
3259                                    *trusted,
3260                                    &inner.metadata().location_id,
3261                                    inner_ident,
3262                                    &inner.metadata().collection_kind,
3263                                    &observe_ident,
3264                                    &metadata.collection_kind,
3265                                    &metadata.op,
3266                                );
3267                            }
3268                            BuildersOrCallback::Callback(_, node_callback) => {
3269                                node_callback(node, next_stmt_id);
3270                            }
3271                        }
3272
3273                        let _ = next_stmt_id.get_and_increment();
3274
3275                        ident_stack.push(observe_ident);
3276                    }
3277
3278                    HydroNode::Batch {
3279                        inner, metadata, ..
3280                    } => {
3281                        let inner_ident = ident_stack.pop().unwrap();
3282
3283                        let batch_ident =
3284                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3285
3286                        match builders_or_callback {
3287                            BuildersOrCallback::Builders(graph_builders) => {
3288                                graph_builders.batch(
3289                                    inner_ident,
3290                                    &inner.metadata().location_id,
3291                                    &inner.metadata().collection_kind,
3292                                    &batch_ident,
3293                                    &out_location,
3294                                    &metadata.op,
3295                                    fold_hooked_idents,
3296                                );
3297                            }
3298                            BuildersOrCallback::Callback(_, node_callback) => {
3299                                node_callback(node, next_stmt_id);
3300                            }
3301                        }
3302
3303                        let _ = next_stmt_id.get_and_increment();
3304
3305                        ident_stack.push(batch_ident);
3306                    }
3307
3308                    HydroNode::YieldConcat { inner, .. } => {
3309                        let inner_ident = ident_stack.pop().unwrap();
3310
3311                        let yield_ident =
3312                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3313
3314                        match builders_or_callback {
3315                            BuildersOrCallback::Builders(graph_builders) => {
3316                                graph_builders.yield_from_tick(
3317                                    inner_ident,
3318                                    &inner.metadata().location_id,
3319                                    &inner.metadata().collection_kind,
3320                                    &yield_ident,
3321                                    &out_location,
3322                                );
3323                            }
3324                            BuildersOrCallback::Callback(_, node_callback) => {
3325                                node_callback(node, next_stmt_id);
3326                            }
3327                        }
3328
3329                        let _ = next_stmt_id.get_and_increment();
3330
3331                        ident_stack.push(yield_ident);
3332                    }
3333
3334                    HydroNode::BeginAtomic { inner, metadata } => {
3335                        let inner_ident = ident_stack.pop().unwrap();
3336
3337                        let begin_ident =
3338                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3339
3340                        match builders_or_callback {
3341                            BuildersOrCallback::Builders(graph_builders) => {
3342                                graph_builders.begin_atomic(
3343                                    inner_ident,
3344                                    &inner.metadata().location_id,
3345                                    &inner.metadata().collection_kind,
3346                                    &begin_ident,
3347                                    &out_location,
3348                                    &metadata.op,
3349                                );
3350                            }
3351                            BuildersOrCallback::Callback(_, node_callback) => {
3352                                node_callback(node, next_stmt_id);
3353                            }
3354                        }
3355
3356                        let _ = next_stmt_id.get_and_increment();
3357
3358                        ident_stack.push(begin_ident);
3359                    }
3360
3361                    HydroNode::EndAtomic { inner, .. } => {
3362                        let inner_ident = ident_stack.pop().unwrap();
3363
3364                        let end_ident =
3365                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3366
3367                        match builders_or_callback {
3368                            BuildersOrCallback::Builders(graph_builders) => {
3369                                graph_builders.end_atomic(
3370                                    inner_ident,
3371                                    &inner.metadata().location_id,
3372                                    &inner.metadata().collection_kind,
3373                                    &end_ident,
3374                                );
3375                            }
3376                            BuildersOrCallback::Callback(_, node_callback) => {
3377                                node_callback(node, next_stmt_id);
3378                            }
3379                        }
3380
3381                        let _ = next_stmt_id.get_and_increment();
3382
3383                        ident_stack.push(end_ident);
3384                    }
3385
3386                    HydroNode::Source {
3387                        source, metadata, ..
3388                    } => {
3389                        if let HydroSource::ExternalNetwork() = source {
3390                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3391                        } else {
3392                            let source_ident =
3393                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3394
3395                            let source_stmt = match source {
3396                                HydroSource::Stream(expr) => {
3397                                    debug_assert!(metadata.location_id.is_top_level());
3398                                    parse_quote! {
3399                                        #source_ident = source_stream(#expr);
3400                                    }
3401                                }
3402
3403                                HydroSource::ExternalNetwork() => {
3404                                    unreachable!()
3405                                }
3406
3407                                HydroSource::Iter(expr) => {
3408                                    if metadata.location_id.is_top_level() {
3409                                        parse_quote! {
3410                                            #source_ident = source_iter(#expr);
3411                                        }
3412                                    } else {
3413                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3414                                        parse_quote! {
3415                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3416                                        }
3417                                    }
3418                                }
3419
3420                                HydroSource::Spin() => {
3421                                    debug_assert!(metadata.location_id.is_top_level());
3422                                    parse_quote! {
3423                                        #source_ident = spin();
3424                                    }
3425                                }
3426
3427                                HydroSource::ClusterMembers(target_loc, state) => {
3428                                    debug_assert!(metadata.location_id.is_top_level());
3429
3430                                    let members_tee_ident = syn::Ident::new(
3431                                        &format!(
3432                                            "__cluster_members_tee_{}_{}",
3433                                            metadata.location_id.root().key(),
3434                                            target_loc.key(),
3435                                        ),
3436                                        Span::call_site(),
3437                                    );
3438
3439                                    match state {
3440                                        ClusterMembersState::Stream(d) => {
3441                                            parse_quote! {
3442                                                #members_tee_ident = source_stream(#d) -> tee();
3443                                                #source_ident = #members_tee_ident;
3444                                            }
3445                                        },
3446                                        ClusterMembersState::Uninit => syn::parse_quote! {
3447                                            #source_ident = source_stream(DUMMY);
3448                                        },
3449                                        ClusterMembersState::Tee(..) => parse_quote! {
3450                                            #source_ident = #members_tee_ident;
3451                                        },
3452                                    }
3453                                }
3454
3455                                HydroSource::Embedded(ident) => {
3456                                    parse_quote! {
3457                                        #source_ident = source_stream(#ident);
3458                                    }
3459                                }
3460
3461                                HydroSource::EmbeddedSingleton(ident) => {
3462                                    parse_quote! {
3463                                        #source_ident = source_iter([#ident]);
3464                                    }
3465                                }
3466                            };
3467
3468                            match builders_or_callback {
3469                                BuildersOrCallback::Builders(graph_builders) => {
3470                                    let builder = graph_builders.get_dfir_mut(&out_location);
3471                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3472                                }
3473                                BuildersOrCallback::Callback(_, node_callback) => {
3474                                    node_callback(node, next_stmt_id);
3475                                }
3476                            }
3477
3478                            let _ = next_stmt_id.get_and_increment();
3479
3480                            ident_stack.push(source_ident);
3481                        }
3482                    }
3483
3484                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3485                        let source_ident =
3486                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3487
3488                        match builders_or_callback {
3489                            BuildersOrCallback::Builders(graph_builders) => {
3490                                let builder = graph_builders.get_dfir_mut(&out_location);
3491
3492                                if *first_tick_only {
3493                                    assert!(
3494                                        !metadata.location_id.is_top_level(),
3495                                        "first_tick_only SingletonSource must be inside a tick"
3496                                    );
3497                                }
3498
3499                                if *first_tick_only
3500                                    || (metadata.location_id.is_top_level()
3501                                        && metadata.collection_kind.is_bounded())
3502                                {
3503                                    builder.add_dfir(
3504                                        parse_quote! {
3505                                            #source_ident = source_iter([#value]);
3506                                        },
3507                                        None,
3508                                        Some(&next_stmt_id.to_string()),
3509                                    );
3510                                } else {
3511                                    builder.add_dfir(
3512                                        parse_quote! {
3513                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3514                                        },
3515                                        None,
3516                                        Some(&next_stmt_id.to_string()),
3517                                    );
3518                                }
3519                            }
3520                            BuildersOrCallback::Callback(_, node_callback) => {
3521                                node_callback(node, next_stmt_id);
3522                            }
3523                        }
3524
3525                        let _ = next_stmt_id.get_and_increment();
3526
3527                        ident_stack.push(source_ident);
3528                    }
3529
3530                    HydroNode::CycleSource { cycle_id, .. } => {
3531                        let ident = cycle_id.as_ident();
3532
3533                        match builders_or_callback {
3534                            BuildersOrCallback::Builders(_) => {}
3535                            BuildersOrCallback::Callback(_, node_callback) => {
3536                                node_callback(node, next_stmt_id);
3537                            }
3538                        }
3539
3540                        // consume a stmt id even though we did not emit anything so that we can instrument this
3541                        let _ = next_stmt_id.get_and_increment();
3542
3543                        ident_stack.push(ident);
3544                    }
3545
3546                    HydroNode::Tee { inner, .. } => {
3547                        let ret_ident = if let Some(built_idents) =
3548                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3549                        {
3550                            match builders_or_callback {
3551                                BuildersOrCallback::Builders(_) => {}
3552                                BuildersOrCallback::Callback(_, node_callback) => {
3553                                    node_callback(node, next_stmt_id);
3554                                }
3555                            }
3556
3557                            built_idents[0].clone()
3558                        } else {
3559                            // The inner node was already processed by transform_bottom_up,
3560                            // so its ident is on the stack
3561                            let inner_ident = ident_stack.pop().unwrap();
3562
3563                            let tee_ident =
3564                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3565
3566                            built_tees.insert(
3567                                inner.0.as_ref() as *const RefCell<HydroNode>,
3568                                vec![tee_ident.clone()],
3569                            );
3570
3571                            match builders_or_callback {
3572                                BuildersOrCallback::Builders(graph_builders) => {
3573                                    // NOTE: With `forward_ref`, the fold codegen may not have
3574                                    // run yet when we reach this tee, so `fold_hooked_idents`
3575                                    // might not contain the inner ident. In that case we won't
3576                                    // propagate the "hooked" status to the tee and the
3577                                    // downstream singleton batch will use the normal
3578                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3579                                    // This is not a soundness issue: the fallback hook still
3580                                    // produces correct behavior, just with a redundant decision
3581                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3582                                    // fix ordering so forward_ref folds are always processed
3583                                    // before their downstream tees.
3584                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3585                                        fold_hooked_idents.insert(tee_ident.to_string());
3586                                    }
3587                                    let builder = graph_builders.get_dfir_mut(&out_location);
3588                                    builder.add_dfir(
3589                                        parse_quote! {
3590                                            #tee_ident = #inner_ident -> tee();
3591                                        },
3592                                        None,
3593                                        Some(&next_stmt_id.to_string()),
3594                                    );
3595                                }
3596                                BuildersOrCallback::Callback(_, node_callback) => {
3597                                    node_callback(node, next_stmt_id);
3598                                }
3599                            }
3600
3601                            tee_ident
3602                        };
3603
3604                        // we consume a stmt id regardless of if we emit the tee() operator,
3605                        // so that during rewrites we touch all recipients of the tee()
3606
3607                        let _ = next_stmt_id.get_and_increment();
3608                        ident_stack.push(ret_ident);
3609                    }
3610
3611                    HydroNode::Singleton { inner, .. } => {
3612                        let ret_ident = if let Some(built_idents) =
3613                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3614                        {
3615                            built_idents[0].clone()
3616                        } else {
3617                            let inner_ident = ident_stack.pop().unwrap();
3618
3619                            let singleton_ident =
3620                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3621
3622                            built_tees.insert(
3623                                inner.0.as_ref() as *const RefCell<HydroNode>,
3624                                vec![singleton_ident.clone()],
3625                            );
3626
3627                            match builders_or_callback {
3628                                BuildersOrCallback::Builders(graph_builders) => {
3629                                    let builder = graph_builders.get_dfir_mut(&out_location);
3630                                    builder.add_dfir(
3631                                        parse_quote! {
3632                                            #singleton_ident = #inner_ident -> singleton();
3633                                        },
3634                                        None,
3635                                        Some(&next_stmt_id.to_string()),
3636                                    );
3637                                }
3638                                BuildersOrCallback::Callback(_, node_callback) => {
3639                                    node_callback(node, next_stmt_id);
3640                                }
3641                            }
3642
3643                            singleton_ident
3644                        };
3645
3646                        // we consume a stmt id regardless of if we emit the singleton() operator,
3647                        // so that during rewrites we touch all recipients of the singleton()
3648                        let _ = next_stmt_id.get_and_increment();
3649                        ident_stack.push(ret_ident);
3650                    }
3651
3652                    HydroNode::Partition {
3653                        inner, f, is_true, ..
3654                    } => {
3655                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3656                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3657                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3658                            match builders_or_callback {
3659                                BuildersOrCallback::Builders(_) => {}
3660                                BuildersOrCallback::Callback(_, node_callback) => {
3661                                    node_callback(node, next_stmt_id);
3662                                }
3663                            }
3664
3665                            let idx = if is_true { 0 } else { 1 };
3666                            built_idents[idx].clone()
3667                        } else {
3668                            // The inner node was already processed by transform_bottom_up,
3669                            // so its ident is on the stack
3670                            let inner_ident = ident_stack.pop().unwrap();
3671                            let f_tokens = f.emit_tokens(&mut ident_stack);
3672
3673                            let partition_ident = syn::Ident::new(
3674                                &format!("stream_{}_partition", *next_stmt_id),
3675                                Span::call_site(),
3676                            );
3677                            let true_ident = syn::Ident::new(
3678                                &format!("stream_{}_true", *next_stmt_id),
3679                                Span::call_site(),
3680                            );
3681                            let false_ident = syn::Ident::new(
3682                                &format!("stream_{}_false", *next_stmt_id),
3683                                Span::call_site(),
3684                            );
3685
3686                            built_tees.insert(
3687                                ptr,
3688                                vec![true_ident.clone(), false_ident.clone()],
3689                            );
3690
3691                            match builders_or_callback {
3692                                BuildersOrCallback::Builders(graph_builders) => {
3693                                    let builder = graph_builders.get_dfir_mut(&out_location);
3694                                    builder.add_dfir(
3695                                        parse_quote! {
3696                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3697                                            #true_ident = #partition_ident[0];
3698                                            #false_ident = #partition_ident[1];
3699                                        },
3700                                        None,
3701                                        Some(&next_stmt_id.to_string()),
3702                                    );
3703                                }
3704                                BuildersOrCallback::Callback(_, node_callback) => {
3705                                    node_callback(node, next_stmt_id);
3706                                }
3707                            }
3708
3709                            if is_true { true_ident } else { false_ident }
3710                        };
3711
3712                        let _ = next_stmt_id.get_and_increment();
3713                        ident_stack.push(ret_ident);
3714                    }
3715
3716                    HydroNode::Chain { .. } => {
3717                        // Children are processed left-to-right, so second is on top
3718                        let second_ident = ident_stack.pop().unwrap();
3719                        let first_ident = ident_stack.pop().unwrap();
3720
3721                        let chain_ident =
3722                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3723
3724                        match builders_or_callback {
3725                            BuildersOrCallback::Builders(graph_builders) => {
3726                                let builder = graph_builders.get_dfir_mut(&out_location);
3727                                builder.add_dfir(
3728                                    parse_quote! {
3729                                        #chain_ident = chain();
3730                                        #first_ident -> [0]#chain_ident;
3731                                        #second_ident -> [1]#chain_ident;
3732                                    },
3733                                    None,
3734                                    Some(&next_stmt_id.to_string()),
3735                                );
3736                            }
3737                            BuildersOrCallback::Callback(_, node_callback) => {
3738                                node_callback(node, next_stmt_id);
3739                            }
3740                        }
3741
3742                        let _ = next_stmt_id.get_and_increment();
3743
3744                        ident_stack.push(chain_ident);
3745                    }
3746
3747                    HydroNode::MergeOrdered { first, metadata, .. } => {
3748                        let second_ident = ident_stack.pop().unwrap();
3749                        let first_ident = ident_stack.pop().unwrap();
3750
3751                        let merge_ident =
3752                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3753
3754                        match builders_or_callback {
3755                            BuildersOrCallback::Builders(graph_builders) => {
3756                                graph_builders.merge_ordered(
3757                                    &first.metadata().location_id,
3758                                    first_ident,
3759                                    second_ident,
3760                                    &merge_ident,
3761                                    &first.metadata().collection_kind,
3762                                    &metadata.op,
3763                                    Some(&next_stmt_id.to_string()),
3764                                );
3765                            }
3766                            BuildersOrCallback::Callback(_, node_callback) => {
3767                                node_callback(node, next_stmt_id);
3768                            }
3769                        }
3770
3771                        let _ = next_stmt_id.get_and_increment();
3772
3773                        ident_stack.push(merge_ident);
3774                    }
3775
3776                    HydroNode::ChainFirst { .. } => {
3777                        let second_ident = ident_stack.pop().unwrap();
3778                        let first_ident = ident_stack.pop().unwrap();
3779
3780                        let chain_ident =
3781                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3782
3783                        match builders_or_callback {
3784                            BuildersOrCallback::Builders(graph_builders) => {
3785                                let builder = graph_builders.get_dfir_mut(&out_location);
3786                                builder.add_dfir(
3787                                    parse_quote! {
3788                                        #chain_ident = chain_first_n(1);
3789                                        #first_ident -> [0]#chain_ident;
3790                                        #second_ident -> [1]#chain_ident;
3791                                    },
3792                                    None,
3793                                    Some(&next_stmt_id.to_string()),
3794                                );
3795                            }
3796                            BuildersOrCallback::Callback(_, node_callback) => {
3797                                node_callback(node, next_stmt_id);
3798                            }
3799                        }
3800
3801                        let _ = next_stmt_id.get_and_increment();
3802
3803                        ident_stack.push(chain_ident);
3804                    }
3805
3806                    HydroNode::CrossSingleton { right, .. } => {
3807                        let right_ident = ident_stack.pop().unwrap();
3808                        let left_ident = ident_stack.pop().unwrap();
3809
3810                        let cross_ident =
3811                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3812
3813                        match builders_or_callback {
3814                            BuildersOrCallback::Builders(graph_builders) => {
3815                                let builder = graph_builders.get_dfir_mut(&out_location);
3816
3817                                if right.metadata().location_id.is_top_level()
3818                                    && right.metadata().collection_kind.is_bounded()
3819                                {
3820                                    builder.add_dfir(
3821                                        parse_quote! {
3822                                            #cross_ident = cross_singleton::<'static>();
3823                                            #left_ident -> [input]#cross_ident;
3824                                            #right_ident -> [single]#cross_ident;
3825                                        },
3826                                        None,
3827                                        Some(&next_stmt_id.to_string()),
3828                                    );
3829                                } else {
3830                                    builder.add_dfir(
3831                                        parse_quote! {
3832                                            #cross_ident = cross_singleton();
3833                                            #left_ident -> [input]#cross_ident;
3834                                            #right_ident -> [single]#cross_ident;
3835                                        },
3836                                        None,
3837                                        Some(&next_stmt_id.to_string()),
3838                                    );
3839                                }
3840                            }
3841                            BuildersOrCallback::Callback(_, node_callback) => {
3842                                node_callback(node, next_stmt_id);
3843                            }
3844                        }
3845
3846                        let _ = next_stmt_id.get_and_increment();
3847
3848                        ident_stack.push(cross_ident);
3849                    }
3850
3851                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3852                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3853                            parse_quote!(cross_join_multiset)
3854                        } else {
3855                            parse_quote!(join_multiset)
3856                        };
3857
3858                        let (HydroNode::CrossProduct { left, right, .. }
3859                        | HydroNode::Join { left, right, .. }) = node
3860                        else {
3861                            unreachable!()
3862                        };
3863
3864                        let is_top_level = left.metadata().location_id.is_top_level()
3865                            && right.metadata().location_id.is_top_level();
3866                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3867                            quote!('static)
3868                        } else {
3869                            quote!('tick)
3870                        };
3871
3872                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3873                            quote!('static)
3874                        } else {
3875                            quote!('tick)
3876                        };
3877
3878                        let right_ident = ident_stack.pop().unwrap();
3879                        let left_ident = ident_stack.pop().unwrap();
3880
3881                        let stream_ident =
3882                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3883
3884                        match builders_or_callback {
3885                            BuildersOrCallback::Builders(graph_builders) => {
3886                                let builder = graph_builders.get_dfir_mut(&out_location);
3887                                builder.add_dfir(
3888                                    if is_top_level {
3889                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3890                                        // a multiset_delta() to negate the replay behavior
3891                                        parse_quote! {
3892                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3893                                            #left_ident -> [0]#stream_ident;
3894                                            #right_ident -> [1]#stream_ident;
3895                                        }
3896                                    } else {
3897                                        parse_quote! {
3898                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3899                                            #left_ident -> [0]#stream_ident;
3900                                            #right_ident -> [1]#stream_ident;
3901                                        }
3902                                    }
3903                                    ,
3904                                    None,
3905                                    Some(&next_stmt_id.to_string()),
3906                                );
3907                            }
3908                            BuildersOrCallback::Callback(_, node_callback) => {
3909                                node_callback(node, next_stmt_id);
3910                            }
3911                        }
3912
3913                        let _ = next_stmt_id.get_and_increment();
3914
3915                        ident_stack.push(stream_ident);
3916                    }
3917
3918                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3919                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3920                            parse_quote!(difference)
3921                        } else {
3922                            parse_quote!(anti_join)
3923                        };
3924
3925                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3926                            node
3927                        else {
3928                            unreachable!()
3929                        };
3930
3931                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3932                            quote!('static)
3933                        } else {
3934                            quote!('tick)
3935                        };
3936
3937                        let neg_ident = ident_stack.pop().unwrap();
3938                        let pos_ident = ident_stack.pop().unwrap();
3939
3940                        let stream_ident =
3941                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3942
3943                        match builders_or_callback {
3944                            BuildersOrCallback::Builders(graph_builders) => {
3945                                let builder = graph_builders.get_dfir_mut(&out_location);
3946                                builder.add_dfir(
3947                                    parse_quote! {
3948                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3949                                        #pos_ident -> [pos]#stream_ident;
3950                                        #neg_ident -> [neg]#stream_ident;
3951                                    },
3952                                    None,
3953                                    Some(&next_stmt_id.to_string()),
3954                                );
3955                            }
3956                            BuildersOrCallback::Callback(_, node_callback) => {
3957                                node_callback(node, next_stmt_id);
3958                            }
3959                        }
3960
3961                        let _ = next_stmt_id.get_and_increment();
3962
3963                        ident_stack.push(stream_ident);
3964                    }
3965
3966                    HydroNode::JoinHalf { .. } => {
3967                        let HydroNode::JoinHalf { right, .. } = node else {
3968                            unreachable!()
3969                        };
3970
3971                        assert!(
3972                            right.metadata().collection_kind.is_bounded(),
3973                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3974                            right.metadata().collection_kind
3975                        );
3976
3977                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3978                            quote!('static)
3979                        } else {
3980                            quote!('tick)
3981                        };
3982
3983                        let build_ident = ident_stack.pop().unwrap();
3984                        let probe_ident = ident_stack.pop().unwrap();
3985
3986                        let stream_ident =
3987                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3988
3989                        match builders_or_callback {
3990                            BuildersOrCallback::Builders(graph_builders) => {
3991                                let builder = graph_builders.get_dfir_mut(&out_location);
3992                                builder.add_dfir(
3993                                    parse_quote! {
3994                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3995                                        #probe_ident -> [probe]#stream_ident;
3996                                        #build_ident -> [build]#stream_ident;
3997                                    },
3998                                    None,
3999                                    Some(&next_stmt_id.to_string()),
4000                                );
4001                            }
4002                            BuildersOrCallback::Callback(_, node_callback) => {
4003                                node_callback(node, next_stmt_id);
4004                            }
4005                        }
4006
4007                        let _ = next_stmt_id.get_and_increment();
4008
4009                        ident_stack.push(stream_ident);
4010                    }
4011
4012                    HydroNode::ResolveFutures { .. } => {
4013                        let input_ident = ident_stack.pop().unwrap();
4014
4015                        let futures_ident =
4016                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4017
4018                        match builders_or_callback {
4019                            BuildersOrCallback::Builders(graph_builders) => {
4020                                let builder = graph_builders.get_dfir_mut(&out_location);
4021                                builder.add_dfir(
4022                                    parse_quote! {
4023                                        #futures_ident = #input_ident -> resolve_futures();
4024                                    },
4025                                    None,
4026                                    Some(&next_stmt_id.to_string()),
4027                                );
4028                            }
4029                            BuildersOrCallback::Callback(_, node_callback) => {
4030                                node_callback(node, next_stmt_id);
4031                            }
4032                        }
4033
4034                        let _ = next_stmt_id.get_and_increment();
4035
4036                        ident_stack.push(futures_ident);
4037                    }
4038
4039                    HydroNode::ResolveFuturesBlocking { .. } => {
4040                        let input_ident = ident_stack.pop().unwrap();
4041
4042                        let futures_ident =
4043                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4044
4045                        match builders_or_callback {
4046                            BuildersOrCallback::Builders(graph_builders) => {
4047                                let builder = graph_builders.get_dfir_mut(&out_location);
4048                                builder.add_dfir(
4049                                    parse_quote! {
4050                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4051                                    },
4052                                    None,
4053                                    Some(&next_stmt_id.to_string()),
4054                                );
4055                            }
4056                            BuildersOrCallback::Callback(_, node_callback) => {
4057                                node_callback(node, next_stmt_id);
4058                            }
4059                        }
4060
4061                        let _ = next_stmt_id.get_and_increment();
4062
4063                        ident_stack.push(futures_ident);
4064                    }
4065
4066                    HydroNode::ResolveFuturesOrdered { .. } => {
4067                        let input_ident = ident_stack.pop().unwrap();
4068
4069                        let futures_ident =
4070                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4071
4072                        match builders_or_callback {
4073                            BuildersOrCallback::Builders(graph_builders) => {
4074                                let builder = graph_builders.get_dfir_mut(&out_location);
4075                                builder.add_dfir(
4076                                    parse_quote! {
4077                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4078                                    },
4079                                    None,
4080                                    Some(&next_stmt_id.to_string()),
4081                                );
4082                            }
4083                            BuildersOrCallback::Callback(_, node_callback) => {
4084                                node_callback(node, next_stmt_id);
4085                            }
4086                        }
4087
4088                        let _ = next_stmt_id.get_and_increment();
4089
4090                        ident_stack.push(futures_ident);
4091                    }
4092
4093                    HydroNode::Map { f, .. } => {
4094                        // Pop input ident (pushed last by transform_children).
4095                        let input_ident = ident_stack.pop().unwrap();
4096                        let f_tokens = f.emit_tokens(&mut ident_stack);
4097
4098                        let map_ident =
4099                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4100
4101                        match builders_or_callback {
4102                            BuildersOrCallback::Builders(graph_builders) => {
4103                                let builder = graph_builders.get_dfir_mut(&out_location);
4104                                builder.add_dfir(
4105                                    parse_quote! {
4106                                        #map_ident = #input_ident -> map(#f_tokens);
4107                                    },
4108                                    None,
4109                                    Some(&next_stmt_id.to_string()),
4110                                );
4111                            }
4112                            BuildersOrCallback::Callback(_, node_callback) => {
4113                                node_callback(node, next_stmt_id);
4114                            }
4115                        }
4116
4117                        let _ = next_stmt_id.get_and_increment();
4118
4119                        ident_stack.push(map_ident);
4120                    }
4121
4122                    HydroNode::FlatMap { f, .. } => {
4123                        let input_ident = ident_stack.pop().unwrap();
4124                        let f_tokens = f.emit_tokens(&mut ident_stack);
4125
4126                        let flat_map_ident =
4127                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4128
4129                        match builders_or_callback {
4130                            BuildersOrCallback::Builders(graph_builders) => {
4131                                let builder = graph_builders.get_dfir_mut(&out_location);
4132                                builder.add_dfir(
4133                                    parse_quote! {
4134                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4135                                    },
4136                                    None,
4137                                    Some(&next_stmt_id.to_string()),
4138                                );
4139                            }
4140                            BuildersOrCallback::Callback(_, node_callback) => {
4141                                node_callback(node, next_stmt_id);
4142                            }
4143                        }
4144
4145                        let _ = next_stmt_id.get_and_increment();
4146
4147                        ident_stack.push(flat_map_ident);
4148                    }
4149
4150                    HydroNode::FlatMapStreamBlocking { f, .. } => {
4151                        let input_ident = ident_stack.pop().unwrap();
4152                        let f_tokens = f.emit_tokens(&mut ident_stack);
4153
4154                        let flat_map_stream_blocking_ident =
4155                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4156
4157                        match builders_or_callback {
4158                            BuildersOrCallback::Builders(graph_builders) => {
4159                                let builder = graph_builders.get_dfir_mut(&out_location);
4160                                builder.add_dfir(
4161                                    parse_quote! {
4162                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4163                                    },
4164                                    None,
4165                                    Some(&next_stmt_id.to_string()),
4166                                );
4167                            }
4168                            BuildersOrCallback::Callback(_, node_callback) => {
4169                                node_callback(node, next_stmt_id);
4170                            }
4171                        }
4172
4173                        let _ = next_stmt_id.get_and_increment();
4174
4175                        ident_stack.push(flat_map_stream_blocking_ident);
4176                    }
4177
4178                    HydroNode::Filter { f, .. } => {
4179                        let input_ident = ident_stack.pop().unwrap();
4180                        let f_tokens = f.emit_tokens(&mut ident_stack);
4181
4182                        let filter_ident =
4183                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4184
4185                        match builders_or_callback {
4186                            BuildersOrCallback::Builders(graph_builders) => {
4187                                let builder = graph_builders.get_dfir_mut(&out_location);
4188                                builder.add_dfir(
4189                                    parse_quote! {
4190                                        #filter_ident = #input_ident -> filter(#f_tokens);
4191                                    },
4192                                    None,
4193                                    Some(&next_stmt_id.to_string()),
4194                                );
4195                            }
4196                            BuildersOrCallback::Callback(_, node_callback) => {
4197                                node_callback(node, next_stmt_id);
4198                            }
4199                        }
4200
4201                        let _ = next_stmt_id.get_and_increment();
4202
4203                        ident_stack.push(filter_ident);
4204                    }
4205
4206                    HydroNode::FilterMap { f, .. } => {
4207                        let input_ident = ident_stack.pop().unwrap();
4208                        let f_tokens = f.emit_tokens(&mut ident_stack);
4209
4210                        let filter_map_ident =
4211                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4212
4213                        match builders_or_callback {
4214                            BuildersOrCallback::Builders(graph_builders) => {
4215                                let builder = graph_builders.get_dfir_mut(&out_location);
4216                                builder.add_dfir(
4217                                    parse_quote! {
4218                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4219                                    },
4220                                    None,
4221                                    Some(&next_stmt_id.to_string()),
4222                                );
4223                            }
4224                            BuildersOrCallback::Callback(_, node_callback) => {
4225                                node_callback(node, next_stmt_id);
4226                            }
4227                        }
4228
4229                        let _ = next_stmt_id.get_and_increment();
4230
4231                        ident_stack.push(filter_map_ident);
4232                    }
4233
4234                    HydroNode::Sort { .. } => {
4235                        let input_ident = ident_stack.pop().unwrap();
4236
4237                        let sort_ident =
4238                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4239
4240                        match builders_or_callback {
4241                            BuildersOrCallback::Builders(graph_builders) => {
4242                                let builder = graph_builders.get_dfir_mut(&out_location);
4243                                builder.add_dfir(
4244                                    parse_quote! {
4245                                        #sort_ident = #input_ident -> sort();
4246                                    },
4247                                    None,
4248                                    Some(&next_stmt_id.to_string()),
4249                                );
4250                            }
4251                            BuildersOrCallback::Callback(_, node_callback) => {
4252                                node_callback(node, next_stmt_id);
4253                            }
4254                        }
4255
4256                        let _ = next_stmt_id.get_and_increment();
4257
4258                        ident_stack.push(sort_ident);
4259                    }
4260
4261                    HydroNode::DeferTick { .. } => {
4262                        let input_ident = ident_stack.pop().unwrap();
4263
4264                        let defer_tick_ident =
4265                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4266
4267                        match builders_or_callback {
4268                            BuildersOrCallback::Builders(graph_builders) => {
4269                                let builder = graph_builders.get_dfir_mut(&out_location);
4270                                builder.add_dfir(
4271                                    parse_quote! {
4272                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4273                                    },
4274                                    None,
4275                                    Some(&next_stmt_id.to_string()),
4276                                );
4277                            }
4278                            BuildersOrCallback::Callback(_, node_callback) => {
4279                                node_callback(node, next_stmt_id);
4280                            }
4281                        }
4282
4283                        let _ = next_stmt_id.get_and_increment();
4284
4285                        ident_stack.push(defer_tick_ident);
4286                    }
4287
4288                    HydroNode::Enumerate { input, .. } => {
4289                        let input_ident = ident_stack.pop().unwrap();
4290
4291                        let enumerate_ident =
4292                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4293
4294                        match builders_or_callback {
4295                            BuildersOrCallback::Builders(graph_builders) => {
4296                                let builder = graph_builders.get_dfir_mut(&out_location);
4297                                let lifetime = if input.metadata().location_id.is_top_level() {
4298                                    quote!('static)
4299                                } else {
4300                                    quote!('tick)
4301                                };
4302                                builder.add_dfir(
4303                                    parse_quote! {
4304                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4305                                    },
4306                                    None,
4307                                    Some(&next_stmt_id.to_string()),
4308                                );
4309                            }
4310                            BuildersOrCallback::Callback(_, node_callback) => {
4311                                node_callback(node, next_stmt_id);
4312                            }
4313                        }
4314
4315                        let _ = next_stmt_id.get_and_increment();
4316
4317                        ident_stack.push(enumerate_ident);
4318                    }
4319
4320                    HydroNode::Inspect { f, .. } => {
4321                        let input_ident = ident_stack.pop().unwrap();
4322                        let f_tokens = f.emit_tokens(&mut ident_stack);
4323
4324                        let inspect_ident =
4325                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4326
4327                        match builders_or_callback {
4328                            BuildersOrCallback::Builders(graph_builders) => {
4329                                let builder = graph_builders.get_dfir_mut(&out_location);
4330                                builder.add_dfir(
4331                                    parse_quote! {
4332                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4333                                    },
4334                                    None,
4335                                    Some(&next_stmt_id.to_string()),
4336                                );
4337                            }
4338                            BuildersOrCallback::Callback(_, node_callback) => {
4339                                node_callback(node, next_stmt_id);
4340                            }
4341                        }
4342
4343                        let _ = next_stmt_id.get_and_increment();
4344
4345                        ident_stack.push(inspect_ident);
4346                    }
4347
4348                    HydroNode::Unique { input, .. } => {
4349                        let input_ident = ident_stack.pop().unwrap();
4350
4351                        let unique_ident =
4352                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4353
4354                        match builders_or_callback {
4355                            BuildersOrCallback::Builders(graph_builders) => {
4356                                let builder = graph_builders.get_dfir_mut(&out_location);
4357                                let lifetime = if input.metadata().location_id.is_top_level() {
4358                                    quote!('static)
4359                                } else {
4360                                    quote!('tick)
4361                                };
4362
4363                                builder.add_dfir(
4364                                    parse_quote! {
4365                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4366                                    },
4367                                    None,
4368                                    Some(&next_stmt_id.to_string()),
4369                                );
4370                            }
4371                            BuildersOrCallback::Callback(_, node_callback) => {
4372                                node_callback(node, next_stmt_id);
4373                            }
4374                        }
4375
4376                        let _ = next_stmt_id.get_and_increment();
4377
4378                        ident_stack.push(unique_ident);
4379                    }
4380
4381                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4382                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4383                            if input.metadata().location_id.is_top_level()
4384                                && input.metadata().collection_kind.is_bounded()
4385                            {
4386                                parse_quote!(fold_no_replay)
4387                            } else {
4388                                parse_quote!(fold)
4389                            }
4390                        } else if matches!(node, HydroNode::Scan { .. }) {
4391                            parse_quote!(scan)
4392                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4393                            parse_quote!(scan_async_blocking)
4394                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4395                            if input.metadata().location_id.is_top_level()
4396                                && input.metadata().collection_kind.is_bounded()
4397                            {
4398                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4399                            } else {
4400                                parse_quote!(fold_keyed)
4401                            }
4402                        } else {
4403                            unreachable!()
4404                        };
4405
4406                        let (HydroNode::Fold { input, .. }
4407                        | HydroNode::FoldKeyed { input, .. }
4408                        | HydroNode::Scan { input, .. }
4409                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4410                        else {
4411                            unreachable!()
4412                        };
4413
4414                        let lifetime = if input.metadata().location_id.is_top_level() {
4415                            quote!('static)
4416                        } else {
4417                            quote!('tick)
4418                        };
4419
4420                        let input_ident = ident_stack.pop().unwrap();
4421
4422                        let (HydroNode::Fold { init, acc, .. }
4423                        | HydroNode::FoldKeyed { init, acc, .. }
4424                        | HydroNode::Scan { init, acc, .. }
4425                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4426                        else {
4427                            unreachable!()
4428                        };
4429
4430                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4431                        let init_tokens = init.emit_tokens(&mut ident_stack);
4432
4433                        let fold_ident =
4434                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4435
4436                        match builders_or_callback {
4437                            BuildersOrCallback::Builders(graph_builders) => {
4438                                if matches!(node, HydroNode::Fold { .. })
4439                                    && node.metadata().location_id.is_top_level()
4440                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4441                                    && graph_builders.singleton_intermediates()
4442                                    && !node.metadata().collection_kind.is_bounded()
4443                                {
4444                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4445                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4446                                        &input.metadata().location_id,
4447                                        &input_ident,
4448                                        &input.metadata().collection_kind,
4449                                        &node.metadata().op,
4450                                    );
4451
4452                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4453                                        let acc: syn::Expr = parse_quote!({
4454                                            let mut __inner = #acc_tokens;
4455                                            move |__state, __batch: Vec<_>| {
4456                                                if __batch.is_empty() {
4457                                                    return None;
4458                                                }
4459                                                for __value in __batch {
4460                                                    __inner(__state, __value);
4461                                                }
4462                                                Some(__state.clone())
4463                                            }
4464                                        });
4465                                        (hooked, acc)
4466                                    } else {
4467                                        let acc: syn::Expr = parse_quote!({
4468                                            let mut __inner = #acc_tokens;
4469                                            move |__state, __value| {
4470                                                __inner(__state, __value);
4471                                                Some(__state.clone())
4472                                            }
4473                                        });
4474                                        (&input_ident, acc)
4475                                    };
4476
4477                                    let builder = graph_builders.get_dfir_mut(&out_location);
4478                                    builder.add_dfir(
4479                                        parse_quote! {
4480                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4481                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4482                                            #fold_ident = chain();
4483                                        },
4484                                        None,
4485                                        Some(&next_stmt_id.to_string()),
4486                                    );
4487
4488                                    if hooked_input_ident.is_some() {
4489                                        fold_hooked_idents.insert(fold_ident.to_string());
4490                                    }
4491                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4492                                    && node.metadata().location_id.is_top_level()
4493                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4494                                    && graph_builders.singleton_intermediates()
4495                                    && !node.metadata().collection_kind.is_bounded()
4496                                {
4497                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4498                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4499                                        &input.metadata().location_id,
4500                                        &input_ident,
4501                                        &input.metadata().collection_kind,
4502                                        &node.metadata().op,
4503                                    );
4504                                    let builder = graph_builders.get_dfir_mut(&out_location);
4505
4506                                    let wrapped_acc: syn::Expr = parse_quote!({
4507                                        let mut __init = #init_tokens;
4508                                        let mut __inner = #acc_tokens;
4509                                        move |__state, __kv: (_, _)| {
4510                                            // TODO(shadaj): we can avoid the clone when the entry exists
4511                                            let __state = __state
4512                                                .entry(::std::clone::Clone::clone(&__kv.0))
4513                                                .or_insert_with(|| (__init)());
4514                                            __inner(__state, __kv.1);
4515                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4516                                        }
4517                                    });
4518
4519                                    if let Some(hooked_input_ident) = hooked_input_ident {
4520                                        builder.add_dfir(
4521                                            parse_quote! {
4522                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4523                                            },
4524                                            None,
4525                                            Some(&next_stmt_id.to_string()),
4526                                        );
4527
4528                                        fold_hooked_idents.insert(fold_ident.to_string());
4529                                    } else {
4530                                        builder.add_dfir(
4531                                            parse_quote! {
4532                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4533                                            },
4534                                            None,
4535                                            Some(&next_stmt_id.to_string()),
4536                                        );
4537                                    }
4538                                } else if (matches!(node, HydroNode::Fold { .. })
4539                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4540                                    && !node.metadata().location_id.is_top_level()
4541                                    && graph_builders.singleton_intermediates()
4542                                {
4543                                    let input_ref = match &*node {
4544                                        HydroNode::Fold { input, .. } => input,
4545                                        HydroNode::FoldKeyed { input, .. } => input,
4546                                        _ => unreachable!(),
4547                                    };
4548                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4549                                        &input_ref.metadata().location_id,
4550                                        &input_ident,
4551                                        &input_ref.metadata().collection_kind,
4552                                        &node.metadata().op,
4553                                    );
4554
4555                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4556                                    let builder = graph_builders.get_dfir_mut(&out_location);
4557                                    builder.add_dfir(
4558                                        parse_quote! {
4559                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4560                                        },
4561                                        None,
4562                                        Some(&next_stmt_id.to_string()),
4563                                    );
4564                                } else {
4565                                    let builder = graph_builders.get_dfir_mut(&out_location);
4566                                    builder.add_dfir(
4567                                        parse_quote! {
4568                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4569                                        },
4570                                        None,
4571                                        Some(&next_stmt_id.to_string()),
4572                                    );
4573                                }
4574                            }
4575                            BuildersOrCallback::Callback(_, node_callback) => {
4576                                node_callback(node, next_stmt_id);
4577                            }
4578                        }
4579
4580                        let _ = next_stmt_id.get_and_increment();
4581
4582                        ident_stack.push(fold_ident);
4583                    }
4584
4585                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4586                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4587                            if input.metadata().location_id.is_top_level()
4588                                && input.metadata().collection_kind.is_bounded()
4589                            {
4590                                parse_quote!(reduce_no_replay)
4591                            } else {
4592                                parse_quote!(reduce)
4593                            }
4594                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4595                            if input.metadata().location_id.is_top_level()
4596                                && input.metadata().collection_kind.is_bounded()
4597                            {
4598                                todo!(
4599                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4600                                )
4601                            } else {
4602                                parse_quote!(reduce_keyed)
4603                            }
4604                        } else {
4605                            unreachable!()
4606                        };
4607
4608                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4609                        else {
4610                            unreachable!()
4611                        };
4612
4613                        let lifetime = if input.metadata().location_id.is_top_level() {
4614                            quote!('static)
4615                        } else {
4616                            quote!('tick)
4617                        };
4618
4619                        let input_ident = ident_stack.pop().unwrap();
4620
4621                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4622                        else {
4623                            unreachable!()
4624                        };
4625
4626                        let f_tokens = f.emit_tokens(&mut ident_stack);
4627
4628                        let reduce_ident =
4629                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4630
4631                        match builders_or_callback {
4632                            BuildersOrCallback::Builders(graph_builders) => {
4633                                if matches!(node, HydroNode::Reduce { .. })
4634                                    && node.metadata().location_id.is_top_level()
4635                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4636                                    && graph_builders.singleton_intermediates()
4637                                    && !node.metadata().collection_kind.is_bounded()
4638                                {
4639                                    todo!(
4640                                        "Reduce with optional intermediates is not yet supported in simulator"
4641                                    );
4642                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4643                                    && node.metadata().location_id.is_top_level()
4644                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4645                                    && graph_builders.singleton_intermediates()
4646                                    && !node.metadata().collection_kind.is_bounded()
4647                                {
4648                                    todo!(
4649                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4650                                    );
4651                                } else {
4652                                    let builder = graph_builders.get_dfir_mut(&out_location);
4653                                    builder.add_dfir(
4654                                        parse_quote! {
4655                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4656                                        },
4657                                        None,
4658                                        Some(&next_stmt_id.to_string()),
4659                                    );
4660                                }
4661                            }
4662                            BuildersOrCallback::Callback(_, node_callback) => {
4663                                node_callback(node, next_stmt_id);
4664                            }
4665                        }
4666
4667                        let _ = next_stmt_id.get_and_increment();
4668
4669                        ident_stack.push(reduce_ident);
4670                    }
4671
4672                    HydroNode::ReduceKeyedWatermark {
4673                        f,
4674                        input,
4675                        metadata,
4676                        ..
4677                    } => {
4678                        let lifetime = if input.metadata().location_id.is_top_level() {
4679                            quote!('static)
4680                        } else {
4681                            quote!('tick)
4682                        };
4683
4684                        // watermark is processed second, so it's on top
4685                        let watermark_ident = ident_stack.pop().unwrap();
4686                        let input_ident = ident_stack.pop().unwrap();
4687                        let f_tokens = f.emit_tokens(&mut ident_stack);
4688
4689                        let chain_ident = syn::Ident::new(
4690                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4691                            Span::call_site(),
4692                        );
4693
4694                        let fold_ident =
4695                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4696
4697                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4698                            && input.metadata().collection_kind.is_bounded()
4699                        {
4700                            parse_quote!(fold_no_replay)
4701                        } else {
4702                            parse_quote!(fold)
4703                        };
4704
4705                        match builders_or_callback {
4706                            BuildersOrCallback::Builders(graph_builders) => {
4707                                if metadata.location_id.is_top_level()
4708                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4709                                    && graph_builders.singleton_intermediates()
4710                                    && !metadata.collection_kind.is_bounded()
4711                                {
4712                                    todo!(
4713                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4714                                    )
4715                                } else {
4716                                    let builder = graph_builders.get_dfir_mut(&out_location);
4717                                    builder.add_dfir(
4718                                        parse_quote! {
4719                                            #chain_ident = chain();
4720                                            #input_ident
4721                                                -> map(|x| (Some(x), None))
4722                                                -> [0]#chain_ident;
4723                                            #watermark_ident
4724                                                -> map(|watermark| (None, Some(watermark)))
4725                                                -> [1]#chain_ident;
4726
4727                                            #fold_ident = #chain_ident
4728                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4729                                                    let __reduce_keyed_fn = #f_tokens;
4730                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4731                                                        if let Some((k, v)) = opt_payload {
4732                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4733                                                                if k < curr_watermark {
4734                                                                    return;
4735                                                                }
4736                                                            }
4737                                                            match map.entry(k) {
4738                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4739                                                                    e.insert(v);
4740                                                                }
4741                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4742                                                                    __reduce_keyed_fn(e.get_mut(), v);
4743                                                                }
4744                                                            }
4745                                                        } else {
4746                                                            let watermark = opt_watermark.unwrap();
4747                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4748                                                                if watermark <= curr_watermark {
4749                                                                    return;
4750                                                                }
4751                                                            }
4752                                                            map.retain(|k, _| *k >= watermark);
4753                                                            *opt_curr_watermark = Some(watermark);
4754                                                        }
4755                                                    }
4756                                                })
4757                                                -> flat_map(|(map, _curr_watermark)| map);
4758                                        },
4759                                        None,
4760                                        Some(&next_stmt_id.to_string()),
4761                                    );
4762                                }
4763                            }
4764                            BuildersOrCallback::Callback(_, node_callback) => {
4765                                node_callback(node, next_stmt_id);
4766                            }
4767                        }
4768
4769                        let _ = next_stmt_id.get_and_increment();
4770
4771                        ident_stack.push(fold_ident);
4772                    }
4773
4774                    HydroNode::Network {
4775                        networking_info,
4776                        serialize_fn: serialize_pipeline,
4777                        instantiate_fn,
4778                        deserialize_fn: deserialize_pipeline,
4779                        input,
4780                        ..
4781                    } => {
4782                        let input_ident = ident_stack.pop().unwrap();
4783
4784                        let receiver_stream_ident =
4785                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4786
4787                        match builders_or_callback {
4788                            BuildersOrCallback::Builders(graph_builders) => {
4789                                let (sink_expr, source_expr) = match instantiate_fn {
4790                                    DebugInstantiate::Building => (
4791                                        syn::parse_quote!(DUMMY_SINK),
4792                                        syn::parse_quote!(DUMMY_SOURCE),
4793                                    ),
4794
4795                                    DebugInstantiate::Finalized(finalized) => {
4796                                        (finalized.sink.clone(), finalized.source.clone())
4797                                    }
4798                                };
4799
4800                                graph_builders.create_network(
4801                                    &input.metadata().location_id,
4802                                    &out_location,
4803                                    input_ident,
4804                                    &receiver_stream_ident,
4805                                    serialize_pipeline.as_ref(),
4806                                    sink_expr,
4807                                    source_expr,
4808                                    deserialize_pipeline.as_ref(),
4809                                    *next_stmt_id,
4810                                    networking_info,
4811                                );
4812                            }
4813                            BuildersOrCallback::Callback(_, node_callback) => {
4814                                node_callback(node, next_stmt_id);
4815                            }
4816                        }
4817
4818                        let _ = next_stmt_id.get_and_increment();
4819
4820                        ident_stack.push(receiver_stream_ident);
4821                    }
4822
4823                    HydroNode::ExternalInput {
4824                        instantiate_fn,
4825                        deserialize_fn: deserialize_pipeline,
4826                        ..
4827                    } => {
4828                        let receiver_stream_ident =
4829                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4830
4831                        match builders_or_callback {
4832                            BuildersOrCallback::Builders(graph_builders) => {
4833                                let (_, source_expr) = match instantiate_fn {
4834                                    DebugInstantiate::Building => (
4835                                        syn::parse_quote!(DUMMY_SINK),
4836                                        syn::parse_quote!(DUMMY_SOURCE),
4837                                    ),
4838
4839                                    DebugInstantiate::Finalized(finalized) => {
4840                                        (finalized.sink.clone(), finalized.source.clone())
4841                                    }
4842                                };
4843
4844                                graph_builders.create_external_source(
4845                                    &out_location,
4846                                    source_expr,
4847                                    &receiver_stream_ident,
4848                                    deserialize_pipeline.as_ref(),
4849                                    *next_stmt_id,
4850                                );
4851                            }
4852                            BuildersOrCallback::Callback(_, node_callback) => {
4853                                node_callback(node, next_stmt_id);
4854                            }
4855                        }
4856
4857                        let _ = next_stmt_id.get_and_increment();
4858
4859                        ident_stack.push(receiver_stream_ident);
4860                    }
4861
4862                    HydroNode::Counter {
4863                        tag,
4864                        duration,
4865                        prefix,
4866                        ..
4867                    } => {
4868                        let input_ident = ident_stack.pop().unwrap();
4869
4870                        let counter_ident =
4871                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4872
4873                        match builders_or_callback {
4874                            BuildersOrCallback::Builders(graph_builders) => {
4875                                let arg = format!("{}({})", prefix, tag);
4876                                let builder = graph_builders.get_dfir_mut(&out_location);
4877                                builder.add_dfir(
4878                                    parse_quote! {
4879                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4880                                    },
4881                                    None,
4882                                    Some(&next_stmt_id.to_string()),
4883                                );
4884                            }
4885                            BuildersOrCallback::Callback(_, node_callback) => {
4886                                node_callback(node, next_stmt_id);
4887                            }
4888                        }
4889
4890                        let _ = next_stmt_id.get_and_increment();
4891
4892                        ident_stack.push(counter_ident);
4893                    }
4894                }
4895            },
4896            seen_tees,
4897            false,
4898        );
4899
4900        let ret = ident_stack
4901            .pop()
4902            .expect("ident_stack should have exactly one element after traversal");
4903        assert!(
4904            ident_stack.is_empty(),
4905            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4906             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4907            ident_stack.len()
4908        );
4909        ret
4910    }
4911
4912    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4913        match self {
4914            HydroNode::Placeholder => {
4915                panic!()
4916            }
4917            HydroNode::Cast { .. }
4918            | HydroNode::ObserveNonDet { .. }
4919            | HydroNode::UnboundSingleton { .. }
4920            | HydroNode::AssertIsConsistent { .. } => {}
4921            HydroNode::Source { source, .. } => match source {
4922                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4923                HydroSource::ExternalNetwork()
4924                | HydroSource::Spin()
4925                | HydroSource::ClusterMembers(_, _)
4926                | HydroSource::Embedded(_)
4927                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4928            },
4929            HydroNode::SingletonSource { value, .. } => {
4930                transform(value);
4931            }
4932            HydroNode::CycleSource { .. }
4933            | HydroNode::Tee { .. }
4934            | HydroNode::Singleton { .. }
4935            | HydroNode::YieldConcat { .. }
4936            | HydroNode::BeginAtomic { .. }
4937            | HydroNode::EndAtomic { .. }
4938            | HydroNode::Batch { .. }
4939            | HydroNode::Chain { .. }
4940            | HydroNode::MergeOrdered { .. }
4941            | HydroNode::ChainFirst { .. }
4942            | HydroNode::CrossProduct { .. }
4943            | HydroNode::CrossSingleton { .. }
4944            | HydroNode::ResolveFutures { .. }
4945            | HydroNode::ResolveFuturesBlocking { .. }
4946            | HydroNode::ResolveFuturesOrdered { .. }
4947            | HydroNode::Join { .. }
4948            | HydroNode::JoinHalf { .. }
4949            | HydroNode::Difference { .. }
4950            | HydroNode::AntiJoin { .. }
4951            | HydroNode::DeferTick { .. }
4952            | HydroNode::Enumerate { .. }
4953            | HydroNode::Unique { .. }
4954            | HydroNode::Sort { .. } => {}
4955            HydroNode::Map { f, .. }
4956            | HydroNode::FlatMap { f, .. }
4957            | HydroNode::FlatMapStreamBlocking { f, .. }
4958            | HydroNode::Filter { f, .. }
4959            | HydroNode::FilterMap { f, .. }
4960            | HydroNode::Inspect { f, .. }
4961            | HydroNode::Partition { f, .. }
4962            | HydroNode::Reduce { f, .. }
4963            | HydroNode::ReduceKeyed { f, .. }
4964            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4965                transform(&mut f.expr);
4966            }
4967            HydroNode::Fold { init, acc, .. }
4968            | HydroNode::Scan { init, acc, .. }
4969            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4970            | HydroNode::FoldKeyed { init, acc, .. } => {
4971                transform(&mut init.expr);
4972                transform(&mut acc.expr);
4973            }
4974            HydroNode::Network {
4975                serialize_fn,
4976                deserialize_fn,
4977                ..
4978            } => {
4979                if let Some(serialize_fn) = serialize_fn {
4980                    transform(serialize_fn);
4981                }
4982                if let Some(deserialize_fn) = deserialize_fn {
4983                    transform(deserialize_fn);
4984                }
4985            }
4986            HydroNode::ExternalInput { deserialize_fn, .. } => {
4987                if let Some(deserialize_fn) = deserialize_fn {
4988                    transform(deserialize_fn);
4989                }
4990            }
4991            HydroNode::Counter { duration, .. } => {
4992                transform(duration);
4993            }
4994        }
4995    }
4996
4997    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4998        &self.metadata().op
4999    }
5000
5001    pub fn metadata(&self) -> &HydroIrMetadata {
5002        match self {
5003            HydroNode::Placeholder => {
5004                panic!()
5005            }
5006            HydroNode::Cast { metadata, .. }
5007            | HydroNode::ObserveNonDet { metadata, .. }
5008            | HydroNode::AssertIsConsistent { metadata, .. }
5009            | HydroNode::UnboundSingleton { metadata, .. }
5010            | HydroNode::Source { metadata, .. }
5011            | HydroNode::SingletonSource { metadata, .. }
5012            | HydroNode::CycleSource { metadata, .. }
5013            | HydroNode::Tee { metadata, .. }
5014            | HydroNode::Singleton { metadata, .. }
5015            | HydroNode::Partition { metadata, .. }
5016            | HydroNode::YieldConcat { metadata, .. }
5017            | HydroNode::BeginAtomic { metadata, .. }
5018            | HydroNode::EndAtomic { metadata, .. }
5019            | HydroNode::Batch { metadata, .. }
5020            | HydroNode::Chain { metadata, .. }
5021            | HydroNode::MergeOrdered { metadata, .. }
5022            | HydroNode::ChainFirst { metadata, .. }
5023            | HydroNode::CrossProduct { metadata, .. }
5024            | HydroNode::CrossSingleton { metadata, .. }
5025            | HydroNode::Join { metadata, .. }
5026            | HydroNode::JoinHalf { metadata, .. }
5027            | HydroNode::Difference { metadata, .. }
5028            | HydroNode::AntiJoin { metadata, .. }
5029            | HydroNode::ResolveFutures { metadata, .. }
5030            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5031            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5032            | HydroNode::Map { metadata, .. }
5033            | HydroNode::FlatMap { metadata, .. }
5034            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5035            | HydroNode::Filter { metadata, .. }
5036            | HydroNode::FilterMap { metadata, .. }
5037            | HydroNode::DeferTick { metadata, .. }
5038            | HydroNode::Enumerate { metadata, .. }
5039            | HydroNode::Inspect { metadata, .. }
5040            | HydroNode::Unique { metadata, .. }
5041            | HydroNode::Sort { metadata, .. }
5042            | HydroNode::Scan { metadata, .. }
5043            | HydroNode::ScanAsyncBlocking { metadata, .. }
5044            | HydroNode::Fold { metadata, .. }
5045            | HydroNode::FoldKeyed { metadata, .. }
5046            | HydroNode::Reduce { metadata, .. }
5047            | HydroNode::ReduceKeyed { metadata, .. }
5048            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5049            | HydroNode::ExternalInput { metadata, .. }
5050            | HydroNode::Network { metadata, .. }
5051            | HydroNode::Counter { metadata, .. } => metadata,
5052        }
5053    }
5054
5055    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5056        &mut self.metadata_mut().op
5057    }
5058
5059    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5060        match self {
5061            HydroNode::Placeholder => {
5062                panic!()
5063            }
5064            HydroNode::Cast { metadata, .. }
5065            | HydroNode::ObserveNonDet { metadata, .. }
5066            | HydroNode::AssertIsConsistent { metadata, .. }
5067            | HydroNode::UnboundSingleton { metadata, .. }
5068            | HydroNode::Source { metadata, .. }
5069            | HydroNode::SingletonSource { metadata, .. }
5070            | HydroNode::CycleSource { metadata, .. }
5071            | HydroNode::Tee { metadata, .. }
5072            | HydroNode::Singleton { metadata, .. }
5073            | HydroNode::Partition { metadata, .. }
5074            | HydroNode::YieldConcat { metadata, .. }
5075            | HydroNode::BeginAtomic { metadata, .. }
5076            | HydroNode::EndAtomic { metadata, .. }
5077            | HydroNode::Batch { metadata, .. }
5078            | HydroNode::Chain { metadata, .. }
5079            | HydroNode::MergeOrdered { metadata, .. }
5080            | HydroNode::ChainFirst { metadata, .. }
5081            | HydroNode::CrossProduct { metadata, .. }
5082            | HydroNode::CrossSingleton { metadata, .. }
5083            | HydroNode::Join { metadata, .. }
5084            | HydroNode::JoinHalf { metadata, .. }
5085            | HydroNode::Difference { metadata, .. }
5086            | HydroNode::AntiJoin { metadata, .. }
5087            | HydroNode::ResolveFutures { metadata, .. }
5088            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5089            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5090            | HydroNode::Map { metadata, .. }
5091            | HydroNode::FlatMap { metadata, .. }
5092            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5093            | HydroNode::Filter { metadata, .. }
5094            | HydroNode::FilterMap { metadata, .. }
5095            | HydroNode::DeferTick { metadata, .. }
5096            | HydroNode::Enumerate { metadata, .. }
5097            | HydroNode::Inspect { metadata, .. }
5098            | HydroNode::Unique { metadata, .. }
5099            | HydroNode::Sort { metadata, .. }
5100            | HydroNode::Scan { metadata, .. }
5101            | HydroNode::ScanAsyncBlocking { metadata, .. }
5102            | HydroNode::Fold { metadata, .. }
5103            | HydroNode::FoldKeyed { metadata, .. }
5104            | HydroNode::Reduce { metadata, .. }
5105            | HydroNode::ReduceKeyed { metadata, .. }
5106            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5107            | HydroNode::ExternalInput { metadata, .. }
5108            | HydroNode::Network { metadata, .. }
5109            | HydroNode::Counter { metadata, .. } => metadata,
5110        }
5111    }
5112
5113    pub fn input(&self) -> Vec<&HydroNode> {
5114        match self {
5115            HydroNode::Placeholder => {
5116                panic!()
5117            }
5118            HydroNode::Source { .. }
5119            | HydroNode::SingletonSource { .. }
5120            | HydroNode::ExternalInput { .. }
5121            | HydroNode::CycleSource { .. }
5122            | HydroNode::Tee { .. }
5123            | HydroNode::Singleton { .. }
5124            | HydroNode::Partition { .. } => {
5125                // Tee/Partition should find their input in separate special ways
5126                vec![]
5127            }
5128            HydroNode::Cast { inner, .. }
5129            | HydroNode::ObserveNonDet { inner, .. }
5130            | HydroNode::YieldConcat { inner, .. }
5131            | HydroNode::BeginAtomic { inner, .. }
5132            | HydroNode::EndAtomic { inner, .. }
5133            | HydroNode::Batch { inner, .. }
5134            | HydroNode::UnboundSingleton { inner, .. }
5135            | HydroNode::AssertIsConsistent { inner, .. } => {
5136                vec![inner]
5137            }
5138            HydroNode::Chain { first, second, .. } => {
5139                vec![first, second]
5140            }
5141            HydroNode::MergeOrdered { first, second, .. } => {
5142                vec![first, second]
5143            }
5144            HydroNode::ChainFirst { first, second, .. } => {
5145                vec![first, second]
5146            }
5147            HydroNode::CrossProduct { left, right, .. }
5148            | HydroNode::CrossSingleton { left, right, .. }
5149            | HydroNode::Join { left, right, .. }
5150            | HydroNode::JoinHalf { left, right, .. } => {
5151                vec![left, right]
5152            }
5153            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5154                vec![pos, neg]
5155            }
5156            HydroNode::Map { input, .. }
5157            | HydroNode::FlatMap { input, .. }
5158            | HydroNode::FlatMapStreamBlocking { input, .. }
5159            | HydroNode::Filter { input, .. }
5160            | HydroNode::FilterMap { input, .. }
5161            | HydroNode::Sort { input, .. }
5162            | HydroNode::DeferTick { input, .. }
5163            | HydroNode::Enumerate { input, .. }
5164            | HydroNode::Inspect { input, .. }
5165            | HydroNode::Unique { input, .. }
5166            | HydroNode::Network { input, .. }
5167            | HydroNode::Counter { input, .. }
5168            | HydroNode::ResolveFutures { input, .. }
5169            | HydroNode::ResolveFuturesBlocking { input, .. }
5170            | HydroNode::ResolveFuturesOrdered { input, .. }
5171            | HydroNode::Fold { input, .. }
5172            | HydroNode::FoldKeyed { input, .. }
5173            | HydroNode::Reduce { input, .. }
5174            | HydroNode::ReduceKeyed { input, .. }
5175            | HydroNode::Scan { input, .. }
5176            | HydroNode::ScanAsyncBlocking { input, .. } => {
5177                vec![input]
5178            }
5179            HydroNode::ReduceKeyedWatermark {
5180                input, watermark, ..
5181            } => {
5182                vec![input, watermark]
5183            }
5184        }
5185    }
5186
5187    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5188        self.input()
5189            .iter()
5190            .map(|input_node| input_node.metadata())
5191            .collect()
5192    }
5193
5194    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5195    /// has other live references, meaning the upstream is already driven
5196    /// by another consumer and does not need a Null sink.
5197    pub fn is_shared_with_others(&self) -> bool {
5198        match self {
5199            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5200                Rc::strong_count(&inner.0) > 1
5201            }
5202            // A zero-output singleton() is valid in DFIR (it drains itself at
5203            // end of tick), so it doesn't need to be driven by another consumer.
5204            HydroNode::Singleton { .. } => false,
5205            _ => false,
5206        }
5207    }
5208
5209    pub fn print_root(&self) -> String {
5210        match self {
5211            HydroNode::Placeholder => {
5212                panic!()
5213            }
5214            HydroNode::Cast { .. } => "Cast()".to_owned(),
5215            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5216            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5217            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5218            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5219            HydroNode::SingletonSource {
5220                value,
5221                first_tick_only,
5222                ..
5223            } => format!(
5224                "SingletonSource({:?}, first_tick_only={})",
5225                value, first_tick_only
5226            ),
5227            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5228            HydroNode::Tee { inner, .. } => {
5229                format!("Tee({})", inner.0.borrow().print_root())
5230            }
5231            HydroNode::Singleton { inner, .. } => {
5232                format!("Singleton({})", inner.0.borrow().print_root())
5233            }
5234            HydroNode::Partition { f, is_true, .. } => {
5235                format!("Partition({:?}, is_true={})", f, is_true)
5236            }
5237            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5238            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5239            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5240            HydroNode::Batch { .. } => "Batch()".to_owned(),
5241            HydroNode::Chain { first, second, .. } => {
5242                format!("Chain({}, {})", first.print_root(), second.print_root())
5243            }
5244            HydroNode::MergeOrdered { first, second, .. } => {
5245                format!(
5246                    "MergeOrdered({}, {})",
5247                    first.print_root(),
5248                    second.print_root()
5249                )
5250            }
5251            HydroNode::ChainFirst { first, second, .. } => {
5252                format!(
5253                    "ChainFirst({}, {})",
5254                    first.print_root(),
5255                    second.print_root()
5256                )
5257            }
5258            HydroNode::CrossProduct { left, right, .. } => {
5259                format!(
5260                    "CrossProduct({}, {})",
5261                    left.print_root(),
5262                    right.print_root()
5263                )
5264            }
5265            HydroNode::CrossSingleton { left, right, .. } => {
5266                format!(
5267                    "CrossSingleton({}, {})",
5268                    left.print_root(),
5269                    right.print_root()
5270                )
5271            }
5272            HydroNode::Join { left, right, .. } => {
5273                format!("Join({}, {})", left.print_root(), right.print_root())
5274            }
5275            HydroNode::JoinHalf { left, right, .. } => {
5276                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5277            }
5278            HydroNode::Difference { pos, neg, .. } => {
5279                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5280            }
5281            HydroNode::AntiJoin { pos, neg, .. } => {
5282                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5283            }
5284            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5285            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5286            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5287            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5288            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5289            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5290            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5291            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5292            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5293            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5294            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5295            HydroNode::Unique { .. } => "Unique()".to_owned(),
5296            HydroNode::Sort { .. } => "Sort()".to_owned(),
5297            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5298            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5299            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5300                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5301            }
5302            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5303            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5304            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5305            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5306            HydroNode::Network { .. } => "Network()".to_owned(),
5307            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5308            HydroNode::Counter { tag, duration, .. } => {
5309                format!("Counter({:?}, {:?})", tag, duration)
5310            }
5311        }
5312    }
5313}
5314
5315#[cfg(feature = "build")]
5316fn instantiate_network<'a, D>(
5317    env: &mut D::InstantiateEnv,
5318    from_location: &LocationId,
5319    to_location: &LocationId,
5320    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5321    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5322    name: Option<&str>,
5323    networking_info: &crate::networking::NetworkingInfo,
5324) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5325where
5326    D: Deploy<'a>,
5327{
5328    let ((sink, source), connect_fn) = match (from_location, to_location) {
5329        (&LocationId::Process(from), &LocationId::Process(to)) => {
5330            let from_node = processes
5331                .get(from)
5332                .unwrap_or_else(|| {
5333                    panic!("A process used in the graph was not instantiated: {}", from)
5334                })
5335                .clone();
5336            let to_node = processes
5337                .get(to)
5338                .unwrap_or_else(|| {
5339                    panic!("A process used in the graph was not instantiated: {}", to)
5340                })
5341                .clone();
5342
5343            let sink_port = from_node.next_port();
5344            let source_port = to_node.next_port();
5345
5346            (
5347                D::o2o_sink_source(
5348                    env,
5349                    &from_node,
5350                    &sink_port,
5351                    &to_node,
5352                    &source_port,
5353                    name,
5354                    networking_info,
5355                ),
5356                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5357            )
5358        }
5359        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5360            let from_node = processes
5361                .get(from)
5362                .unwrap_or_else(|| {
5363                    panic!("A process used in the graph was not instantiated: {}", from)
5364                })
5365                .clone();
5366            let to_node = clusters
5367                .get(to)
5368                .unwrap_or_else(|| {
5369                    panic!("A cluster used in the graph was not instantiated: {}", to)
5370                })
5371                .clone();
5372
5373            let sink_port = from_node.next_port();
5374            let source_port = to_node.next_port();
5375
5376            (
5377                D::o2m_sink_source(
5378                    env,
5379                    &from_node,
5380                    &sink_port,
5381                    &to_node,
5382                    &source_port,
5383                    name,
5384                    networking_info,
5385                ),
5386                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5387            )
5388        }
5389        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5390            let from_node = clusters
5391                .get(from)
5392                .unwrap_or_else(|| {
5393                    panic!("A cluster used in the graph was not instantiated: {}", from)
5394                })
5395                .clone();
5396            let to_node = processes
5397                .get(to)
5398                .unwrap_or_else(|| {
5399                    panic!("A process used in the graph was not instantiated: {}", to)
5400                })
5401                .clone();
5402
5403            let sink_port = from_node.next_port();
5404            let source_port = to_node.next_port();
5405
5406            (
5407                D::m2o_sink_source(
5408                    env,
5409                    &from_node,
5410                    &sink_port,
5411                    &to_node,
5412                    &source_port,
5413                    name,
5414                    networking_info,
5415                ),
5416                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5417            )
5418        }
5419        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5420            let from_node = clusters
5421                .get(from)
5422                .unwrap_or_else(|| {
5423                    panic!("A cluster used in the graph was not instantiated: {}", from)
5424                })
5425                .clone();
5426            let to_node = clusters
5427                .get(to)
5428                .unwrap_or_else(|| {
5429                    panic!("A cluster used in the graph was not instantiated: {}", to)
5430                })
5431                .clone();
5432
5433            let sink_port = from_node.next_port();
5434            let source_port = to_node.next_port();
5435
5436            (
5437                D::m2m_sink_source(
5438                    env,
5439                    &from_node,
5440                    &sink_port,
5441                    &to_node,
5442                    &source_port,
5443                    name,
5444                    networking_info,
5445                ),
5446                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5447            )
5448        }
5449        (LocationId::Tick(_, _), _) => panic!(),
5450        (_, LocationId::Tick(_, _)) => panic!(),
5451        (LocationId::Atomic(_), _) => panic!(),
5452        (_, LocationId::Atomic(_)) => panic!(),
5453    };
5454    (sink, source, connect_fn)
5455}
5456
5457#[cfg(test)]
5458mod serde_test;
5459
5460#[cfg(test)]
5461mod test {
5462    use std::mem::size_of;
5463
5464    use stageleft::{QuotedWithContext, q};
5465
5466    use super::*;
5467
5468    #[test]
5469    #[cfg_attr(
5470        not(feature = "build"),
5471        ignore = "expects inclusion of feature-gated fields"
5472    )]
5473    fn hydro_node_size() {
5474        assert_eq!(size_of::<HydroNode>(), 264);
5475    }
5476
5477    #[test]
5478    #[cfg_attr(
5479        not(feature = "build"),
5480        ignore = "expects inclusion of feature-gated fields"
5481    )]
5482    fn hydro_root_size() {
5483        assert_eq!(size_of::<HydroRoot>(), 136);
5484    }
5485
5486    #[test]
5487    fn test_simplify_q_macro_basic() {
5488        // Test basic non-q! expression
5489        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5490        let result = simplify_q_macro(simple_expr.clone());
5491        assert_eq!(result, simple_expr);
5492    }
5493
5494    #[test]
5495    fn test_simplify_q_macro_actual_stageleft_call() {
5496        // Test a simplified version of what a real stageleft call might look like
5497        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5498        let result = simplify_q_macro(stageleft_call);
5499        // This should be processed by our visitor and simplified to q!(...)
5500        // since we detect the stageleft::runtime_support::fn_* pattern
5501        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5502    }
5503
5504    #[test]
5505    fn test_closure_no_pipe_at_start() {
5506        // Test a closure that does not start with a pipe
5507        let stageleft_call = q!({
5508            let foo = 123;
5509            move |b: usize| b + foo
5510        })
5511        .splice_fn1_ctx(&());
5512        let result = simplify_q_macro(stageleft_call);
5513        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5514    }
5515}