44use anyhow:: Result ;
55use base64:: { Engine as _, engine:: general_purpose} ;
66use ndarray:: { ArrayBase , Dimension , OwnedRepr } ;
7+ use serde:: { Deserialize , Serialize } ;
8+ use std:: sync:: Arc ;
79
810use dynamo_async_openai:: types:: ChatCompletionRequestUserMessageContentPart ;
911
12+ use crate :: block_manager:: storage:: {
13+ StorageError , SystemStorage , nixl:: NixlRegisterableStorage , nixl:: NixlStorage ,
14+ } ;
1015use crate :: preprocessor:: media:: { ImageDecoder , VideoDecoder } ;
16+ use nixl_sys:: Agent as NixlAgent ;
1117
1218// Raw encoded media data (.png, .mp4, ...), optionally b64-encoded
1319pub struct EncodedMediaData {
1420 bytes : Vec < u8 > ,
1521 b64_encoded : bool ,
1622}
1723
24+ // Decoded media data (image RGB, video frames pixels, ...)
25+ pub struct DecodedMediaData {
26+ data : SystemStorage ,
27+ shape : Vec < usize > ,
28+ dtype : String ,
29+ }
30+
31+ // Decoded media data NIXL descriptor (sent to the next step in the pipeline / NATS)
32+ #[ derive( Serialize , Deserialize , Clone , Debug ) ]
33+ pub struct RdmaMediaDataDescriptor {
34+ // b64 agent metadata
35+ nixl_metadata : String ,
36+ // tensor descriptor
37+ nixl_descriptor : NixlStorage ,
38+ shape : Vec < usize > ,
39+ dtype : String ,
40+ // reference to the actual data, kept alive while the rdma descriptor is alive
41+ #[ serde( skip, default ) ]
42+ #[ allow( dead_code) ]
43+ source_storage : Option < Arc < SystemStorage > > ,
44+ }
45+
1846impl EncodedMediaData {
1947 // Handles both web URLs (will download the bytes) and data URLs (will keep b64-encoded)
2048 // This function is kept in tokio runtime so we do not want any expensive operations
@@ -56,25 +84,42 @@ impl EncodedMediaData {
5684 }
5785}
5886
59- // Decoded media data (image RGB, video frames pixels, ...)
60- #[ derive( serde:: Serialize , serde:: Deserialize , Debug , Clone ) ]
61- pub struct DecodedMediaData {
62- data : Vec < u8 > ,
63- shape : Vec < usize > ,
64- dtype : String ,
87+ impl DecodedMediaData {
88+ pub fn into_rdma_descriptor ( self , nixl_agent : & NixlAgent ) -> Result < RdmaMediaDataDescriptor > {
89+ // get NIXL metadata and descriptor
90+ let mut source_storage = self . data ;
91+ source_storage. nixl_register ( nixl_agent, None ) ?;
92+ let nixl_descriptor = unsafe { source_storage. as_nixl_descriptor ( ) }
93+ . ok_or_else ( || anyhow:: anyhow!( "Cannot convert storage to NIXL descriptor" ) ) ?;
94+
95+ // TODO: cache this if this is constant across the worker lifetime?
96+ let nixl_local_md = nixl_agent. get_local_md ( ) ?;
97+ let nixl_metadata = general_purpose:: STANDARD . encode ( & nixl_local_md) ;
98+
99+ Ok ( RdmaMediaDataDescriptor {
100+ nixl_metadata,
101+ nixl_descriptor,
102+ shape : self . shape ,
103+ dtype : self . dtype ,
104+ // do not drop / free the storage yet
105+ source_storage : Some ( Arc :: new ( source_storage) ) ,
106+ } )
107+ }
65108}
66109
67110// convert Array{N}<u8> to DecodedMediaData
68111// TODO: Array1<f32> for audio
69- impl < D : Dimension > From < ArrayBase < OwnedRepr < u8 > , D > > for DecodedMediaData {
70- fn from ( array : ArrayBase < OwnedRepr < u8 > , D > ) -> Self {
112+ impl < D : Dimension > TryFrom < ArrayBase < OwnedRepr < u8 > , D > > for DecodedMediaData {
113+ type Error = StorageError ;
114+
115+ fn try_from ( array : ArrayBase < OwnedRepr < u8 > , D > ) -> Result < Self , Self :: Error > {
71116 let shape = array. shape ( ) . to_vec ( ) ;
72117 let ( data, _) = array. into_raw_vec_and_offset ( ) ;
73- Self {
74- data,
118+ Ok ( Self {
119+ data : SystemStorage :: try_from ( data ) ? ,
75120 shape,
76121 dtype : "uint8" . to_string ( ) ,
77- }
122+ } )
78123 }
79124}
80125
@@ -102,28 +147,36 @@ pub struct MediaDecoder {
102147pub struct MediaLoader {
103148 media_decoder : MediaDecoder ,
104149 http_client : reqwest:: Client ,
150+ nixl_agent : NixlAgent ,
105151}
106152
107153impl MediaLoader {
108154 pub fn new ( media_decoder : MediaDecoder ) -> Result < Self > {
109155 let http_client = reqwest:: Client :: builder ( )
110156 . user_agent (
111- "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0" ,
157+ "dynamo-ai/dynamo" , // TODO: use a proper user agent
112158 )
113159 . build ( ) ?;
114160
161+ let uuid = uuid:: Uuid :: new_v4 ( ) ;
162+ let nixl_agent = NixlAgent :: new ( & format ! ( "media-loader-{}" , uuid) ) ?;
163+ let ( _, ucx_params) = nixl_agent. get_plugin_params ( "UCX" ) ?;
164+ nixl_agent. create_backend ( "UCX" , & ucx_params) ?;
165+
115166 Ok ( Self {
116167 media_decoder,
117168 http_client,
169+ nixl_agent,
118170 } )
119171 }
120172
121173 pub async fn fetch_and_decode_media_part (
122174 & self ,
123175 oai_content_part : & ChatCompletionRequestUserMessageContentPart ,
124- ) -> Result < DecodedMediaData > {
176+ ) -> Result < RdmaMediaDataDescriptor > {
125177 // TODO: request-level options
126- match oai_content_part {
178+ // fetch and decode the media
179+ let decoded = match oai_content_part {
127180 ChatCompletionRequestUserMessageContentPart :: ImageUrl ( image_part) => {
128181 let url = & image_part. image_url . url ;
129182 let data = EncodedMediaData :: from_url ( url, & self . http_client ) . await ?;
@@ -138,6 +191,9 @@ impl MediaLoader {
138191 anyhow:: bail!( "Audio decoding is not supported yet" ) ;
139192 }
140193 _ => anyhow:: bail!( "Unsupported media type" ) ,
141- }
194+ } ?;
195+
196+ let rdma_descriptor = decoded. into_rdma_descriptor ( & self . nixl_agent ) ?;
197+ Ok ( rdma_descriptor)
142198 }
143199}
0 commit comments