Skip to main content

hydro_lang/deploy/
deploy_graph_containerized_ecs.rs

1//! Deployment backend for Hydro that can generate manifests that can be consumed by CDK to deploy cloud formation stacks to aws.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bytes::Bytes;
9use dfir_lang::graph::DfirGraph;
10use futures::{Sink, Stream};
11use proc_macro2::Span;
12use serde::{Deserialize, Serialize};
13use stageleft::QuotedWithContext;
14use syn::parse_quote;
15use tracing::{instrument, trace};
16
17/// Manifest for CDK deployment - describes all processes, clusters, and their configuration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct HydroManifest {
20    /// Process definitions (single-instance services)
21    pub processes: HashMap<String, ProcessManifest>,
22    /// Cluster definitions (multi-instance services)
23    pub clusters: HashMap<String, ClusterManifest>,
24}
25
26/// Build configuration for a Hydro binary
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct BuildConfig {
29    /// Path to the trybuild project directory
30    pub project_dir: String,
31    /// Path to the target directory
32    pub target_dir: String,
33    /// Example/binary name to build
34    pub bin_name: String,
35    /// Package name containing the example (for -p flag)
36    pub package_name: String,
37    /// Features to enable
38    pub features: Vec<String>,
39}
40
41/// Information about an exposed port
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct PortInfo {
44    /// The port number
45    pub port: u16,
46}
47
48/// Manifest entry for a single process
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct ProcessManifest {
51    /// Build configuration for this process
52    pub build: BuildConfig,
53    /// Internal location ID used for service discovery
54    pub location_key: LocationKey,
55    /// Ports that need to be exposed, keyed by external port identifier
56    pub ports: HashMap<String, PortInfo>,
57    /// Task family name (used for ECS service discovery)
58    pub task_family: String,
59}
60
61/// Manifest entry for a cluster (multiple instances of the same service)
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ClusterManifest {
64    /// Build configuration for this cluster (same binary for all instances)
65    pub build: BuildConfig,
66    /// Internal location ID used for service discovery
67    pub location_key: LocationKey,
68    /// Ports that need to be exposed
69    pub ports: Vec<u16>,
70    /// Default number of instances
71    pub default_count: usize,
72    /// Task family prefix (instances will be named {prefix}-0, {prefix}-1, etc.)
73    pub task_family_prefix: String,
74}
75
76use super::deploy_runtime_containerized_ecs::*;
77use crate::compile::builder::ExternalPortId;
78use crate::compile::deploy::DeployResult;
79use crate::compile::deploy_provider::{
80    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
81};
82use crate::compile::trybuild::generate::create_graph_trybuild;
83use crate::location::dynamic::LocationId;
84use crate::location::member_id::TaglessMemberId;
85use crate::location::{LocationKey, MembershipEvent, NetworkHint};
86
87/// Represents a process running in an ecs deployment
88#[derive(Clone)]
89pub struct EcsDeployProcess {
90    id: LocationKey,
91    name: String,
92    next_port: Rc<RefCell<u16>>,
93
94    exposed_ports: Rc<RefCell<HashMap<String, PortInfo>>>,
95
96    trybuild_config:
97        Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
98}
99
100impl Node for EcsDeployProcess {
101    type Port = u16;
102    type Meta = ();
103    type InstantiateEnv = EcsDeploy;
104
105    #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
106    fn next_port(&self) -> Self::Port {
107        let port = {
108            let mut borrow = self.next_port.borrow_mut();
109            let port = *borrow;
110            *borrow += 1;
111            port
112        };
113
114        port
115    }
116
117    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
118    fn update_meta(&self, _meta: &Self::Meta) {}
119
120    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
121    fn instantiate(
122        &self,
123        _env: &mut Self::InstantiateEnv,
124        meta: &mut Self::Meta,
125        graph: DfirGraph,
126        extra_stmts: &[syn::Stmt],
127        sidecars: &[syn::Expr],
128    ) {
129        let (bin_name, config) = create_graph_trybuild(
130            graph,
131            extra_stmts,
132            sidecars,
133            Some(&self.name),
134            crate::compile::trybuild::generate::DeployMode::Containerized,
135            crate::compile::trybuild::generate::LinkingMode::Static,
136        );
137
138        // Store the trybuild config for CDK export
139        *self.trybuild_config.borrow_mut() = Some((bin_name, config));
140    }
141}
142
143/// Represents a logical cluster, which can be a variable amount of individual containers.
144#[derive(Clone)]
145pub struct EcsDeployCluster {
146    id: LocationKey,
147    name: String,
148    next_port: Rc<RefCell<u16>>,
149
150    count: usize,
151
152    /// Stored trybuild config for CDK export
153    trybuild_config:
154        Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
155}
156
157impl Node for EcsDeployCluster {
158    type Port = u16;
159    type Meta = ();
160    type InstantiateEnv = EcsDeploy;
161
162    #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
163    fn next_port(&self) -> Self::Port {
164        let port = {
165            let mut borrow = self.next_port.borrow_mut();
166            let port = *borrow;
167            *borrow += 1;
168            port
169        };
170
171        port
172    }
173
174    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
175    fn update_meta(&self, _meta: &Self::Meta) {}
176
177    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, extra_stmts = extra_stmts.len()))]
178    fn instantiate(
179        &self,
180        _env: &mut Self::InstantiateEnv,
181        _meta: &mut Self::Meta,
182        graph: DfirGraph,
183        extra_stmts: &[syn::Stmt],
184        sidecars: &[syn::Expr],
185    ) {
186        let (bin_name, config) = create_graph_trybuild(
187            graph,
188            extra_stmts,
189            sidecars,
190            Some(&self.name),
191            crate::compile::trybuild::generate::DeployMode::Containerized,
192            crate::compile::trybuild::generate::LinkingMode::Static,
193        );
194
195        // Store the trybuild config for CDK export
196        *self.trybuild_config.borrow_mut() = Some((bin_name, config));
197    }
198}
199
200/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
201#[derive(Clone, Debug)]
202pub struct EcsDeployExternal {
203    name: String,
204    next_port: Rc<RefCell<u16>>,
205}
206
207impl Node for EcsDeployExternal {
208    type Port = u16;
209    type Meta = ();
210    type InstantiateEnv = EcsDeploy;
211
212    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
213    fn next_port(&self) -> Self::Port {
214        let port = {
215            let mut borrow = self.next_port.borrow_mut();
216            let port = *borrow;
217            *borrow += 1;
218            port
219        };
220
221        port
222    }
223
224    #[instrument(level = "trace", skip_all, fields(name = self.name))]
225    fn update_meta(&self, _meta: &Self::Meta) {}
226
227    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
228    fn instantiate(
229        &self,
230        _env: &mut Self::InstantiateEnv,
231        meta: &mut Self::Meta,
232        graph: DfirGraph,
233        extra_stmts: &[syn::Stmt],
234        sidecars: &[syn::Expr],
235    ) {
236        trace!(name: "surface", surface = graph.surface_syntax_string());
237    }
238}
239
240type DynSourceSink<Out, In, InErr> = (
241    Pin<Box<dyn Stream<Item = Out>>>,
242    Pin<Box<dyn Sink<In, Error = InErr>>>,
243);
244
245impl<'a> RegisterPort<'a, EcsDeploy> for EcsDeployExternal {
246    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
247    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {}
248
249    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
250    fn as_bytes_bidi(
251        &self,
252        _external_port_id: ExternalPortId,
253    ) -> impl Future<
254        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
255    > + 'a {
256        async { unimplemented!() }
257    }
258
259    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
260    fn as_bincode_bidi<InT, OutT>(
261        &self,
262        _external_port_id: ExternalPortId,
263    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
264    where
265        InT: Serialize + 'static,
266        OutT: serde::de::DeserializeOwned + 'static,
267    {
268        async { unimplemented!() }
269    }
270
271    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
272    fn as_bincode_sink<T>(
273        &self,
274        _external_port_id: ExternalPortId,
275    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
276    where
277        T: Serialize + 'static,
278    {
279        async { unimplemented!() }
280    }
281
282    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
283    fn as_bincode_source<T>(
284        &self,
285        _external_port_id: ExternalPortId,
286    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
287    where
288        T: serde::de::DeserializeOwned + 'static,
289    {
290        async { unimplemented!() }
291    }
292}
293
294/// Represents an aws ecs deployment.
295pub struct EcsDeploy;
296
297impl Default for EcsDeploy {
298    fn default() -> Self {
299        Self::new()
300    }
301}
302
303impl EcsDeploy {
304    /// Creates a new ecs deployment.
305    pub fn new() -> Self {
306        Self
307    }
308
309    /// Add an internal ecs process to the deployment.
310    pub fn add_ecs_process(&mut self) -> EcsDeployProcessSpec {
311        EcsDeployProcessSpec
312    }
313
314    /// Add an internal ecs cluster to the deployment.
315    pub fn add_ecs_cluster(&mut self, count: usize) -> EcsDeployClusterSpec {
316        EcsDeployClusterSpec { count }
317    }
318
319    /// Add an external process to the deployment.
320    pub fn add_external(&self, name: String) -> EcsDeployExternalSpec {
321        EcsDeployExternalSpec { name }
322    }
323
324    /// Export deployment configuration for CDK consumption.
325    ///
326    /// This generates a manifest with build instructions that can be consumed
327    /// by CDK constructs. CDK will handle building the binaries and Docker images.
328    #[instrument(level = "trace", skip_all)]
329    pub fn export_for_cdk(&self, nodes: &DeployResult<'_, Self>) -> HydroManifest {
330        let mut manifest = HydroManifest {
331            processes: HashMap::new(),
332            clusters: HashMap::new(),
333        };
334
335        for (location_id, name_hint, process) in nodes.get_all_processes() {
336            let LocationId::Process(raw_id) = location_id else {
337                unreachable!();
338            };
339            let task_family = get_ecs_container_name(&process.name, None);
340            let ports = process.exposed_ports.borrow().clone();
341
342            let (bin_name, trybuild_config) = process
343                .trybuild_config
344                .borrow()
345                .clone()
346                .expect("trybuild_config should be set after instantiate");
347
348            let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
349            if let Some(extra_features) = trybuild_config.features {
350                features.extend(extra_features);
351            }
352
353            let crate_name = trybuild_config
354                .project_dir
355                .file_name()
356                .and_then(|n| n.to_str())
357                .unwrap_or("unknown")
358                .replace("_", "-");
359            let package_name = format!("{}-hydro-trybuild", crate_name);
360
361            manifest.processes.insert(
362                name_hint.to_owned(),
363                ProcessManifest {
364                    build: BuildConfig {
365                        project_dir: trybuild_config.project_dir.to_string_lossy().into_owned(),
366                        target_dir: trybuild_config.target_dir.to_string_lossy().into_owned(),
367                        bin_name,
368                        package_name,
369                        features,
370                    },
371                    location_key: raw_id,
372                    ports,
373                    task_family,
374                },
375            );
376        }
377
378        for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
379            let LocationId::Cluster(raw_id) = location_id else {
380                unreachable!();
381            };
382            let task_family_prefix = cluster.name.clone();
383
384            let (bin_name, trybuild_config) = cluster
385                .trybuild_config
386                .borrow()
387                .clone()
388                .expect("trybuild_config should be set after instantiate");
389
390            let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
391            if let Some(extra_features) = trybuild_config.features {
392                features.extend(extra_features);
393            }
394
395            let crate_name = trybuild_config
396                .project_dir
397                .file_name()
398                .and_then(|n| n.to_str())
399                .unwrap_or("unknown")
400                .replace("_", "-");
401            let package_name = format!("{}-hydro-trybuild", crate_name);
402
403            manifest.clusters.insert(
404                name_hint.to_owned(),
405                ClusterManifest {
406                    build: BuildConfig {
407                        project_dir: trybuild_config.project_dir.to_string_lossy().into_owned(),
408                        target_dir: trybuild_config.target_dir.to_string_lossy().into_owned(),
409                        bin_name,
410                        package_name,
411                        features,
412                    },
413                    location_key: raw_id,
414                    ports: vec![],
415                    default_count: cluster.count,
416                    task_family_prefix,
417                },
418            );
419        }
420
421        manifest
422    }
423}
424
425impl<'a> Deploy<'a> for EcsDeploy {
426    type InstantiateEnv = Self;
427    type Process = EcsDeployProcess;
428    type Cluster = EcsDeployCluster;
429    type External = EcsDeployExternal;
430    type Meta = ();
431
432    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
433    fn o2o_sink_source(
434        _env: &mut Self::InstantiateEnv,
435        p1: &Self::Process,
436        p1_port: &<Self::Process as Node>::Port,
437        p2: &Self::Process,
438        p2_port: &<Self::Process as Node>::Port,
439        name: Option<&str>,
440        networking_info: &crate::networking::NetworkingInfo,
441    ) -> (syn::Expr, syn::Expr) {
442        match networking_info {
443            crate::networking::NetworkingInfo::Tcp {
444                fault: crate::networking::TcpFault::FailStop,
445            } => {}
446            _ => panic!("Unsupported networking info: {:?}", networking_info),
447        }
448
449        deploy_containerized_o2o(
450            &p2.name,
451            name.expect("channel name is required for containerized deployment"),
452        )
453    }
454
455    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
456    fn o2o_connect(
457        p1: &Self::Process,
458        p1_port: &<Self::Process as Node>::Port,
459        p2: &Self::Process,
460        p2_port: &<Self::Process as Node>::Port,
461    ) -> Box<dyn FnOnce()> {
462        let serialized = format!(
463            "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
464            p1.name, p2.name
465        );
466
467        Box::new(move || {
468            trace!(name: "o2o_connect thunk", %serialized);
469        })
470    }
471
472    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
473    fn o2m_sink_source(
474        _env: &mut Self::InstantiateEnv,
475        p1: &Self::Process,
476        p1_port: &<Self::Process as Node>::Port,
477        c2: &Self::Cluster,
478        c2_port: &<Self::Cluster as Node>::Port,
479        name: Option<&str>,
480        networking_info: &crate::networking::NetworkingInfo,
481    ) -> (syn::Expr, syn::Expr) {
482        match networking_info {
483            crate::networking::NetworkingInfo::Tcp {
484                fault: crate::networking::TcpFault::FailStop,
485            } => {}
486            _ => panic!("Unsupported networking info: {:?}", networking_info),
487        }
488
489        deploy_containerized_o2m(
490            name.expect("channel name is required for containerized deployment"),
491        )
492    }
493
494    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
495    fn o2m_connect(
496        p1: &Self::Process,
497        p1_port: &<Self::Process as Node>::Port,
498        c2: &Self::Cluster,
499        c2_port: &<Self::Cluster as Node>::Port,
500    ) -> Box<dyn FnOnce()> {
501        let serialized = format!(
502            "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
503            p1.name, c2.name
504        );
505
506        Box::new(move || {
507            trace!(name: "o2m_connect thunk", %serialized);
508        })
509    }
510
511    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
512    fn m2o_sink_source(
513        _env: &mut Self::InstantiateEnv,
514        c1: &Self::Cluster,
515        c1_port: &<Self::Cluster as Node>::Port,
516        p2: &Self::Process,
517        p2_port: &<Self::Process as Node>::Port,
518        name: Option<&str>,
519        networking_info: &crate::networking::NetworkingInfo,
520    ) -> (syn::Expr, syn::Expr) {
521        match networking_info {
522            crate::networking::NetworkingInfo::Tcp {
523                fault: crate::networking::TcpFault::FailStop,
524            } => {}
525            _ => panic!("Unsupported networking info: {:?}", networking_info),
526        }
527
528        deploy_containerized_m2o(
529            &p2.name,
530            name.expect("channel name is required for containerized deployment"),
531        )
532    }
533
534    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
535    fn m2o_connect(
536        c1: &Self::Cluster,
537        c1_port: &<Self::Cluster as Node>::Port,
538        p2: &Self::Process,
539        p2_port: &<Self::Process as Node>::Port,
540    ) -> Box<dyn FnOnce()> {
541        let serialized = format!(
542            "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
543            c1.name, p2.name
544        );
545
546        Box::new(move || {
547            trace!(name: "m2o_connect thunk", %serialized);
548        })
549    }
550
551    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
552    fn m2m_sink_source(
553        _env: &mut Self::InstantiateEnv,
554        c1: &Self::Cluster,
555        c1_port: &<Self::Cluster as Node>::Port,
556        c2: &Self::Cluster,
557        c2_port: &<Self::Cluster as Node>::Port,
558        name: Option<&str>,
559        networking_info: &crate::networking::NetworkingInfo,
560    ) -> (syn::Expr, syn::Expr) {
561        match networking_info {
562            crate::networking::NetworkingInfo::Tcp {
563                fault: crate::networking::TcpFault::FailStop,
564            } => {}
565            _ => panic!("Unsupported networking info: {:?}", networking_info),
566        }
567
568        deploy_containerized_m2m(
569            name.expect("channel name is required for containerized deployment"),
570        )
571    }
572
573    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
574    fn m2m_connect(
575        c1: &Self::Cluster,
576        c1_port: &<Self::Cluster as Node>::Port,
577        c2: &Self::Cluster,
578        c2_port: &<Self::Cluster as Node>::Port,
579    ) -> Box<dyn FnOnce()> {
580        let serialized = format!(
581            "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
582            c1.name, c2.name
583        );
584
585        Box::new(move || {
586            trace!(name: "m2m_connect thunk", %serialized);
587        })
588    }
589
590    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
591    fn e2o_many_source(
592        extra_stmts: &mut Vec<syn::Stmt>,
593        p2: &Self::Process,
594        p2_port: &<Self::Process as Node>::Port,
595        codec_type: &syn::Type,
596        shared_handle: String,
597    ) -> syn::Expr {
598        p2.exposed_ports
599            .borrow_mut()
600            .insert(shared_handle.clone(), PortInfo { port: *p2_port });
601
602        let socket_ident = syn::Ident::new(
603            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
604            Span::call_site(),
605        );
606
607        let source_ident = syn::Ident::new(
608            &format!("__hydro_deploy_many_{}_source", &shared_handle),
609            Span::call_site(),
610        );
611
612        let sink_ident = syn::Ident::new(
613            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
614            Span::call_site(),
615        );
616
617        let membership_ident = syn::Ident::new(
618            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
619            Span::call_site(),
620        );
621
622        let bind_addr = format!("0.0.0.0:{}", p2_port);
623
624        extra_stmts.push(syn::parse_quote! {
625            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
626        });
627
628        let root = crate::staging_util::get_this_crate();
629
630        extra_stmts.push(syn::parse_quote! {
631            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
632        });
633
634        parse_quote!(#source_ident)
635    }
636
637    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
638    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
639        let sink_ident = syn::Ident::new(
640            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
641            Span::call_site(),
642        );
643        parse_quote!(#sink_ident)
644    }
645
646    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
647    fn e2o_source(
648        extra_stmts: &mut Vec<syn::Stmt>,
649        p1: &Self::External,
650        p1_port: &<Self::External as Node>::Port,
651        p2: &Self::Process,
652        p2_port: &<Self::Process as Node>::Port,
653        codec_type: &syn::Type,
654        shared_handle: String,
655    ) -> syn::Expr {
656        // Record the port for manifest export
657        p2.exposed_ports
658            .borrow_mut()
659            .insert(shared_handle.clone(), PortInfo { port: *p2_port });
660
661        let source_ident = syn::Ident::new(
662            &format!("__hydro_deploy_{}_source", &shared_handle),
663            Span::call_site(),
664        );
665
666        let bind_addr = format!("0.0.0.0:{}", p2_port);
667
668        // Always use LazySinkSource for external connections - it creates both sink and source
669        // which is needed for bidirectional connections (unpaired: false)
670        let socket_ident = syn::Ident::new(
671            &format!("__hydro_deploy_{}_socket", &shared_handle),
672            Span::call_site(),
673        );
674
675        let sink_ident = syn::Ident::new(
676            &format!("__hydro_deploy_{}_sink", &shared_handle),
677            Span::call_site(),
678        );
679
680        extra_stmts.push(syn::parse_quote! {
681            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
682        });
683
684        let create_expr = deploy_containerized_external_sink_source_ident(bind_addr, socket_ident);
685
686        extra_stmts.push(syn::parse_quote! {
687            let (#sink_ident, #source_ident) = (#create_expr).split();
688        });
689
690        parse_quote!(#source_ident)
691    }
692
693    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
694    fn e2o_connect(
695        p1: &Self::External,
696        p1_port: &<Self::External as Node>::Port,
697        p2: &Self::Process,
698        p2_port: &<Self::Process as Node>::Port,
699        many: bool,
700        server_hint: NetworkHint,
701    ) -> Box<dyn FnOnce()> {
702        let serialized = format!(
703            "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
704            p1.name, p2.name
705        );
706
707        Box::new(move || {
708            trace!(name: "e2o_connect thunk", %serialized);
709        })
710    }
711
712    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
713    fn o2e_sink(
714        p1: &Self::Process,
715        p1_port: &<Self::Process as Node>::Port,
716        p2: &Self::External,
717        p2_port: &<Self::External as Node>::Port,
718        shared_handle: String,
719    ) -> syn::Expr {
720        let sink_ident = syn::Ident::new(
721            &format!("__hydro_deploy_{}_sink", &shared_handle),
722            Span::call_site(),
723        );
724        parse_quote!(#sink_ident)
725    }
726
727    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
728    fn cluster_ids(
729        of_cluster: LocationKey,
730    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
731        cluster_ids()
732    }
733
734    #[instrument(level = "trace", skip_all)]
735    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
736        cluster_self_id()
737    }
738
739    #[instrument(level = "trace", skip_all, fields(?location_id))]
740    fn cluster_membership_stream(
741        _env: &mut Self::InstantiateEnv,
742        _at_location: &LocationId,
743        location_id: &LocationId,
744    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
745    {
746        cluster_membership_stream(location_id)
747    }
748}
749
750#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location))]
751fn get_ecs_image_name(name_hint: &str, location: LocationKey) -> String {
752    let name_hint = name_hint
753        .split("::")
754        .last()
755        .unwrap()
756        .to_ascii_lowercase()
757        .replace(".", "-")
758        .replace("_", "-")
759        .replace("::", "-");
760
761    format!("hy-{name_hint}-{location}")
762}
763
764#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
765fn get_ecs_container_name(image_name: &str, instance: Option<usize>) -> String {
766    if let Some(instance) = instance {
767        format!("{image_name}-{instance}")
768    } else {
769        image_name.to_owned()
770    }
771}
772/// Represents a Process running in an ecs deployment
773#[derive(Clone)]
774pub struct EcsDeployProcessSpec;
775
776impl<'a> ProcessSpec<'a, EcsDeploy> for EcsDeployProcessSpec {
777    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
778    fn build(self, id: LocationKey, name_hint: &'_ str) -> <EcsDeploy as Deploy<'a>>::Process {
779        EcsDeployProcess {
780            id,
781            name: get_ecs_image_name(name_hint, id),
782            next_port: Rc::new(RefCell::new(1000)),
783            exposed_ports: Rc::new(RefCell::new(HashMap::new())),
784            trybuild_config: Rc::new(RefCell::new(None)),
785        }
786    }
787}
788
789/// Represents a Cluster running across `count` ecs tasks.
790#[derive(Clone)]
791pub struct EcsDeployClusterSpec {
792    count: usize,
793}
794
795impl<'a> ClusterSpec<'a, EcsDeploy> for EcsDeployClusterSpec {
796    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
797    fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::Cluster {
798        EcsDeployCluster {
799            id,
800            name: get_ecs_image_name(name_hint, id),
801            next_port: Rc::new(RefCell::new(1000)),
802            count: self.count,
803            trybuild_config: Rc::new(RefCell::new(None)),
804        }
805    }
806}
807
808/// Represents an external process outside of the management of hydro deploy.
809pub struct EcsDeployExternalSpec {
810    name: String,
811}
812
813impl<'a> ExternalSpec<'a, EcsDeploy> for EcsDeployExternalSpec {
814    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
815    fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::External {
816        EcsDeployExternal {
817            name: self.name,
818            next_port: Rc::new(RefCell::new(10000)),
819        }
820    }
821}