33
44use  anyhow:: Result ; 
55use  base64:: { Engine  as  _,  engine:: general_purpose} ; 
6- use  ndarray:: { ArrayBase ,  Dimension ,  OwnedRepr } ; 
7- use  serde:: { Deserialize ,  Serialize } ; 
8- use  std:: sync:: Arc ; 
9- 
10- use  dynamo_async_openai:: types:: ChatCompletionRequestUserMessageContentPart ; 
11- 
12- use  crate :: block_manager:: storage:: { 
13-     StorageError ,  SystemStorage ,  nixl:: NixlRegisterableStorage ,  nixl:: NixlStorage , 
14- } ; 
15- use  crate :: preprocessor:: media:: { ImageDecoder ,  VideoDecoder } ; 
16- use  nixl_sys:: Agent  as  NixlAgent ; 
176
187// Raw encoded media data (.png, .mp4, ...), optionally b64-encoded 
198#[ derive( Debug ) ]  
@@ -22,29 +11,6 @@ pub struct EncodedMediaData {
2211    pub ( crate )  b64_encoded :  bool , 
2312} 
2413
25- // Decoded media data (image RGB, video frames pixels, ...) 
26- #[ derive( Debug ) ]  
27- pub  struct  DecodedMediaData  { 
28-     pub ( crate )  data :  SystemStorage , 
29-     pub ( crate )  shape :  Vec < usize > , 
30-     pub ( crate )  dtype :  String , 
31- } 
32- 
33- // Decoded media data NIXL descriptor (sent to the next step in the pipeline / NATS) 
34- #[ derive( Serialize ,  Deserialize ,  Clone ,  Debug ) ]  
35- pub  struct  RdmaMediaDataDescriptor  { 
36-     // b64 agent metadata 
37-     nixl_metadata :  String , 
38-     // tensor descriptor 
39-     nixl_descriptor :  NixlStorage , 
40-     shape :  Vec < usize > , 
41-     dtype :  String , 
42-     // reference to the actual data, kept alive while the rdma descriptor is alive 
43-     #[ serde( skip,  default ) ]  
44-     #[ allow( dead_code) ]  
45-     source_storage :  Option < Arc < SystemStorage > > , 
46- } 
47- 
4814impl  EncodedMediaData  { 
4915    // Handles both web URLs (will download the bytes) and data URLs (will keep b64-encoded) 
5016    // This function is kept in tokio runtime so we do not want any expensive operations 
@@ -86,120 +52,6 @@ impl EncodedMediaData {
8652    } 
8753} 
8854
89- impl  DecodedMediaData  { 
90-     pub  fn  into_rdma_descriptor ( self ,  nixl_agent :  & NixlAgent )  -> Result < RdmaMediaDataDescriptor >  { 
91-         // get NIXL metadata and descriptor 
92-         let  mut  source_storage = self . data ; 
93-         source_storage. nixl_register ( nixl_agent,  None ) ?; 
94-         let  nixl_descriptor = unsafe  {  source_storage. as_nixl_descriptor ( )  } 
95-             . ok_or_else ( || anyhow:: anyhow!( "Cannot convert storage to NIXL descriptor" ) ) ?; 
96- 
97-         // TODO: cache this if this is constant across the worker lifetime? 
98-         let  nixl_local_md = nixl_agent. get_local_md ( ) ?; 
99-         let  nixl_metadata = general_purpose:: STANDARD . encode ( & nixl_local_md) ; 
100- 
101-         Ok ( RdmaMediaDataDescriptor  { 
102-             nixl_metadata, 
103-             nixl_descriptor, 
104-             shape :  self . shape , 
105-             dtype :  self . dtype , 
106-             // do not drop / free the storage yet 
107-             source_storage :  Some ( Arc :: new ( source_storage) ) , 
108-         } ) 
109-     } 
110- } 
111- 
112- // convert Array{N}<u8> to DecodedMediaData 
113- // TODO: Array1<f32> for audio 
114- impl < D :  Dimension >  TryFrom < ArrayBase < OwnedRepr < u8 > ,  D > >  for  DecodedMediaData  { 
115-     type  Error  = StorageError ; 
116- 
117-     fn  try_from ( array :  ArrayBase < OwnedRepr < u8 > ,  D > )  -> Result < Self ,  Self :: Error >  { 
118-         let  shape = array. shape ( ) . to_vec ( ) ; 
119-         let  ( data,  _)  = array. into_raw_vec_and_offset ( ) ; 
120-         Ok ( Self  { 
121-             data :  SystemStorage :: try_from ( data) ?, 
122-             shape, 
123-             dtype :  "uint8" . to_string ( ) , 
124-         } ) 
125-     } 
126- } 
127- 
128- #[ async_trait:: async_trait]  
129- pub  trait  Decoder :  Clone  + Send  + ' static  { 
130-     fn  decode ( & self ,  data :  EncodedMediaData )  -> Result < DecodedMediaData > ; 
131- 
132-     async  fn  decode_async ( & self ,  data :  EncodedMediaData )  -> Result < DecodedMediaData >  { 
133-         // light clone (only config params) 
134-         let  decoder = self . clone ( ) ; 
135-         // compute heavy -> rayon 
136-         let  result = tokio_rayon:: spawn ( move  || decoder. decode ( data) ) . await ?; 
137-         Ok ( result) 
138-     } 
139- } 
140- 
141- #[ derive( Clone ,  Debug ,  Default ,  serde:: Serialize ,  serde:: Deserialize ) ]  
142- pub  struct  MediaDecoder  { 
143-     #[ serde( default ) ]  
144-     pub  image_decoder :  ImageDecoder , 
145-     #[ serde( default ) ]  
146-     pub  video_decoder :  VideoDecoder , 
147- } 
148- 
149- pub  struct  MediaLoader  { 
150-     media_decoder :  MediaDecoder , 
151-     http_client :  reqwest:: Client , 
152-     nixl_agent :  NixlAgent , 
153- } 
154- 
155- impl  MediaLoader  { 
156-     pub  fn  new ( media_decoder :  MediaDecoder )  -> Result < Self >  { 
157-         let  http_client = reqwest:: Client :: builder ( ) 
158-             . user_agent ( 
159-                 "dynamo-ai/dynamo" ,  // TODO: use a proper user agent 
160-             ) 
161-             . build ( ) ?; 
162- 
163-         let  uuid = uuid:: Uuid :: new_v4 ( ) ; 
164-         let  nixl_agent = NixlAgent :: new ( & format ! ( "media-loader-{}" ,  uuid) ) ?; 
165-         let  ( _,  ucx_params)  = nixl_agent. get_plugin_params ( "UCX" ) ?; 
166-         nixl_agent. create_backend ( "UCX" ,  & ucx_params) ?; 
167- 
168-         Ok ( Self  { 
169-             media_decoder, 
170-             http_client, 
171-             nixl_agent, 
172-         } ) 
173-     } 
174- 
175-     pub  async  fn  fetch_and_decode_media_part ( 
176-         & self , 
177-         oai_content_part :  & ChatCompletionRequestUserMessageContentPart , 
178-     )  -> Result < RdmaMediaDataDescriptor >  { 
179-         // TODO: request-level options 
180-         // fetch and decode the media 
181-         let  decoded = match  oai_content_part { 
182-             ChatCompletionRequestUserMessageContentPart :: ImageUrl ( image_part)  => { 
183-                 let  url = & image_part. image_url . url ; 
184-                 let  data = EncodedMediaData :: from_url ( url,  & self . http_client ) . await ?; 
185-                 self . media_decoder . image_decoder . decode_async ( data) . await 
186-             } 
187-             ChatCompletionRequestUserMessageContentPart :: VideoUrl ( video_part)  => { 
188-                 let  url = & video_part. video_url . url ; 
189-                 let  data = EncodedMediaData :: from_url ( url,  & self . http_client ) . await ?; 
190-                 self . media_decoder . video_decoder . decode_async ( data) . await 
191-             } 
192-             ChatCompletionRequestUserMessageContentPart :: AudioUrl ( _)  => { 
193-                 anyhow:: bail!( "Audio decoding is not supported yet" ) ; 
194-             } 
195-             _ => anyhow:: bail!( "Unsupported media type" ) , 
196-         } ?; 
197- 
198-         let  rdma_descriptor = decoded. into_rdma_descriptor ( & self . nixl_agent ) ?; 
199-         Ok ( rdma_descriptor) 
200-     } 
201- } 
202- 
20355#[ cfg( test) ]  
20456mod  tests { 
20557    use  super :: * ; 
0 commit comments