3030import com .sun .jna .CallbackThreadInitializer ;
3131import com .sun .jna .Native ;
3232import com .sun .jna .Pointer ;
33+ import com .sun .jna .ptr .IntByReference ;
34+ import com .sun .jna .ptr .LongByReference ;
3335import com .sun .jna .ptr .PointerByReference ;
36+ import org .freedesktop .gstreamer .glib .NativeEnum ;
3437
3538import org .freedesktop .gstreamer .glib .Natives ;
3639import org .freedesktop .gstreamer .lowlevel .GstAPI .GErrorStruct ;
3740import org .freedesktop .gstreamer .lowlevel .GstBusAPI ;
3841import org .freedesktop .gstreamer .lowlevel .GstBusAPI .BusCallback ;
42+ import org .freedesktop .gstreamer .lowlevel .GstBusPtr ;
43+ import org .freedesktop .gstreamer .lowlevel .GstMessagePtr ;
3944import org .freedesktop .gstreamer .message .Message ;
4045import org .freedesktop .gstreamer .message .MessageType ;
4146
4247import static org .freedesktop .gstreamer .lowlevel .GlibAPI .GLIB_API ;
4348import static org .freedesktop .gstreamer .lowlevel .GstBusAPI .GSTBUS_API ;
4449import static org .freedesktop .gstreamer .lowlevel .GstMessageAPI .GSTMESSAGE_API ;
50+ import static org .freedesktop .gstreamer .lowlevel .GstMiniObjectAPI .GSTMINIOBJECT_API ;
4551
4652/**
4753 * The {@link Bus} is an object responsible for delivering {@link Message}s in a
@@ -110,8 +116,8 @@ public void setFlushing(boolean flushing) {
110116 */
111117 public void connect (final EOS listener ) {
112118 connect (EOS .class , listener , new BusCallback () {
113- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
114- listener .endOfStream (msg .getSource ());
119+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
120+ listener .endOfStream (Natives . objectFor ( msg .getSource (), GstObject . class , true , true ));
115121 return true ;
116122 }
117123 });
@@ -134,11 +140,12 @@ public void disconnect(EOS listener) {
134140 */
135141 public void connect (final ERROR listener ) {
136142 connect (ERROR .class , listener , new BusCallback () {
137- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
143+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
138144 PointerByReference err = new PointerByReference ();
139145 GSTMESSAGE_API .gst_message_parse_error (msg , err , null );
140146 GErrorStruct error = new GErrorStruct (err .getValue ());
141- listener .errorMessage (msg .getSource (), error .getCode (), error .getMessage ());
147+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
148+ listener .errorMessage (source , error .getCode (), error .getMessage ());
142149 GLIB_API .g_error_free (err .getValue ());
143150 return true ;
144151 }
@@ -162,11 +169,12 @@ public void disconnect(ERROR listener) {
162169 */
163170 public void connect (final WARNING listener ) {
164171 connect (WARNING .class , listener , new BusCallback () {
165- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
172+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
166173 PointerByReference err = new PointerByReference ();
167174 GSTMESSAGE_API .gst_message_parse_warning (msg , err , null );
168175 GErrorStruct error = new GErrorStruct (err .getValue ());
169- listener .warningMessage (msg .getSource (), error .getCode (), error .getMessage ());
176+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
177+ listener .warningMessage (source , error .getCode (), error .getMessage ());
170178 GLIB_API .g_error_free (err .getValue ());
171179 return true ;
172180 }
@@ -190,11 +198,12 @@ public void disconnect(WARNING listener) {
190198 */
191199 public void connect (final INFO listener ) {
192200 connect (INFO .class , listener , new BusCallback () {
193- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
201+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
194202 PointerByReference err = new PointerByReference ();
195203 GSTMESSAGE_API .gst_message_parse_info (msg , err , null );
196204 GErrorStruct error = new GErrorStruct (err .getValue ());
197- listener .infoMessage (msg .getSource (), error .getCode (), error .getMessage ());
205+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
206+ listener .infoMessage (source , error .getCode (), error .getMessage ());
198207 GLIB_API .g_error_free (err .getValue ());
199208 return true ;
200209 }
@@ -218,12 +227,16 @@ public void disconnect(INFO listener) {
218227 */
219228 public void connect (final STATE_CHANGED listener ) {
220229 connect (STATE_CHANGED .class , listener , new BusCallback () {
221- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
222- State [] o = new State [1 ];
223- State [] n = new State [1 ];
224- State [] p = new State [1 ];
225- GSTMESSAGE_API .gst_message_parse_state_changed (msg , o , n , p );
226- listener .stateChanged (msg .getSource (), o [0 ], n [0 ], p [0 ]);
230+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
231+ IntByReference oldPtr = new IntByReference ();
232+ IntByReference currentPtr = new IntByReference ();
233+ IntByReference pendingPtr = new IntByReference ();
234+ GSTMESSAGE_API .gst_message_parse_state_changed (msg , oldPtr , currentPtr , pendingPtr );
235+ State old = NativeEnum .fromInt (State .class , oldPtr .getValue ());
236+ State current = NativeEnum .fromInt (State .class , currentPtr .getValue ());
237+ State pending = NativeEnum .fromInt (State .class , pendingPtr .getValue ());
238+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
239+ listener .stateChanged (source , old , current , pending );
227240 return true ;
228241 }
229242 });
@@ -245,11 +258,12 @@ public void disconnect(STATE_CHANGED listener) {
245258 */
246259 public void connect (final TAG listener ) {
247260 connect (TAG .class , listener , new BusCallback () {
248- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
261+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
249262 PointerByReference list = new PointerByReference ();
250263 GSTMESSAGE_API .gst_message_parse_tag (msg , list );
251264 TagList tl = new TagList (Natives .initializer (list .getValue ()));
252- listener .tagsFound (msg .getSource (), tl );
265+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
266+ listener .tagsFound (source , tl );
253267 return true ;
254268 }
255269 });
@@ -271,10 +285,11 @@ public void disconnect(TAG listener) {
271285 */
272286 public void connect (final BUFFERING listener ) {
273287 connect (BUFFERING .class , listener , new BusCallback () {
274- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
275- int [] percent = { 0 } ;
288+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
289+ IntByReference percent = new IntByReference () ;
276290 GSTMESSAGE_API .gst_message_parse_buffering (msg , percent );
277- listener .bufferingData (msg .getSource (), percent [0 ]);
291+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
292+ listener .bufferingData (source , percent .getValue ());
278293 return true ;
279294 }
280295 });
@@ -296,8 +311,9 @@ public void disconnect(BUFFERING listener) {
296311 */
297312 public void connect (final DURATION_CHANGED listener ) {
298313 connect (DURATION_CHANGED .class , listener , new BusCallback () {
299- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
300- listener .durationChanged (msg .getSource ());
314+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
315+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
316+ listener .durationChanged (source );
301317 return true ;
302318 }
303319 });
@@ -320,11 +336,13 @@ public void disconnect(DURATION_CHANGED listener) {
320336 */
321337 public void connect (final SEGMENT_START listener ) {
322338 connect (SEGMENT_START .class , listener , new BusCallback () {
323- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
324- Format [] format = new Format [1 ];
325- long [] position = {0 };
326- GSTMESSAGE_API .gst_message_parse_segment_start (msg , format , position );
327- listener .segmentStart (msg .getSource (), format [0 ], position [0 ]);
339+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
340+ IntByReference formatPtr = new IntByReference ();
341+ LongByReference positionPtr = new LongByReference ();
342+ GSTMESSAGE_API .gst_message_parse_segment_start (msg , formatPtr , positionPtr );
343+ Format format = NativeEnum .fromInt (Format .class , formatPtr .getValue ());
344+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
345+ listener .segmentStart (source , format , positionPtr .getValue ());
328346 return true ;
329347 }
330348 });
@@ -347,11 +365,13 @@ public void disconnect(SEGMENT_START listener) {
347365 */
348366 public void connect (final SEGMENT_DONE listener ) {
349367 connect (SEGMENT_DONE .class , listener , new BusCallback () {
350- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
351- Format [] format = new Format [1 ];
352- long [] position = {0 };
353- GSTMESSAGE_API .gst_message_parse_segment_done (msg , format , position );
354- listener .segmentDone (msg .getSource (), format [0 ], position [0 ]);
368+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
369+ IntByReference formatPtr = new IntByReference ();
370+ LongByReference positionPtr = new LongByReference ();
371+ GSTMESSAGE_API .gst_message_parse_segment_done (msg , formatPtr , positionPtr );
372+ Format format = NativeEnum .fromInt (Format .class , formatPtr .getValue ());
373+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
374+ listener .segmentDone (source , format , positionPtr .getValue ());
355375 return true ;
356376 }
357377 });
@@ -374,8 +394,9 @@ public void disconnect(SEGMENT_DONE listener) {
374394 */
375395 public void connect (final ASYNC_DONE listener ) {
376396 connect (ASYNC_DONE .class , listener , new BusCallback () {
377- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
378- listener .asyncDone (msg .getSource ());
397+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
398+ GstObject source = Natives .objectFor (msg .getSource (), GstObject .class , true , true );
399+ listener .asyncDone (source );
379400 return true ;
380401 }
381402 });
@@ -398,8 +419,8 @@ public void disconnect(ASYNC_DONE listener) {
398419 */
399420 public void connect (final MESSAGE listener ) {
400421 connect (MESSAGE .class , listener , new BusCallback () {
401- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
402- listener .busMessage (bus , msg );
422+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
423+ listener .busMessage (Bus . this , Natives . objectFor ( msg , Message . class , true , true ) );
403424 return true ;
404425 }
405426 });
@@ -420,8 +441,8 @@ public void connect(String signal, final MESSAGE listener) {
420441 signal = signal .substring (signal .lastIndexOf ("::" ) + 2 );
421442 }
422443 connect (signal , MESSAGE .class , listener , new BusCallback () {
423- public boolean callback (Bus bus , Message msg , Pointer user_data ) {
424- listener .busMessage (bus , msg );
444+ public boolean callback (GstBusPtr bus , GstMessagePtr msg , Pointer user_data ) {
445+ listener .busMessage (Bus . this , Natives . objectFor ( msg , Message . class , true , true ) );
425446 return true ;
426447 }
427448 });
@@ -502,7 +523,7 @@ public <T> void connect(String signal, Class<T> listenerClass, T listener,
502523 addMessageProxy (type , listenerClass , listener , (BusCallback ) callback );
503524 }
504525 }
505-
526+
506527 private synchronized <T > void addMessageProxy (MessageType type ,
507528 Class <T > listenerClass ,
508529 T listener ,
@@ -519,7 +540,7 @@ public <T> void disconnect(Class<T> listenerClass, T listener) {
519540 removeMessageProxy (listenerClass , listener );
520541 }
521542 }
522-
543+
523544 private synchronized <T > void removeMessageProxy (Class <T > listenerClass , T listener ) {
524545 messageProxies .removeIf (p -> p .listener == listener );
525546 if (messageProxies .isEmpty ()) {
@@ -536,14 +557,15 @@ private synchronized <T> void removeMessageProxy(Class<T> listenerClass, T liste
536557 * those notifications, and the messages just queue up.
537558 *
538559 */
539- private void dispatchMessage (Message msg ) {
560+ private void dispatchMessage (GstBusPtr busPtr , GstMessagePtr msgPtr ) {
540561 messageProxies .forEach (p -> {
541562 try {
542- p .busMessage (this , msg );
563+ p .busMessage (busPtr , msgPtr );
543564 } catch (Throwable t ) {
544565 LOG .log (Level .SEVERE , "Exception thrown by bus message handler" , t );
545566 }
546567 });
568+ GSTMINIOBJECT_API .gst_mini_object_unref (msgPtr );
547569 }
548570
549571 @ Override
@@ -839,8 +861,8 @@ private static class MessageProxy<T> {
839861 this .callback = callback ;
840862 }
841863
842- void busMessage (final Bus bus , final Message msg ) {
843- if (type == MessageType .ANY || type == msg .getType ()) {
864+ void busMessage (final GstBusPtr bus , final GstMessagePtr msg ) {
865+ if (type == MessageType .ANY || type . intValue () == msg .getMessageType ()) {
844866 callback .callback (bus , msg , null );
845867 }
846868 }
@@ -856,22 +878,20 @@ private static class SyncCallback implements GstBusAPI.BusSyncHandler {
856878 }
857879
858880 @ Override
859- public BusSyncReply callback (final Bus bus , final Message msg , Pointer userData ) {
881+ public BusSyncReply callback (final GstBusPtr busPtr , final GstMessagePtr msgPtr , Pointer userData ) {
882+ Bus bus = Natives .objectFor (busPtr , Bus .class , true , true );
860883 if (bus .syncHandler != null ) {
884+ Message msg = Natives .objectFor (msgPtr , Message .class , true , true );
861885 BusSyncReply reply = bus .syncHandler .syncMessage (msg );
862-
863886 if (reply != BusSyncReply .DROP ) {
864- Gst .getExecutor ().execute (() -> bus .dispatchMessage (msg ));
887+ Gst .getExecutor ().execute (() -> bus .dispatchMessage (busPtr , msgPtr ));
888+ } else {
889+ // not calling dispatch message so unref here
890+ GSTMINIOBJECT_API .gst_mini_object_unref (msgPtr );
865891 }
866892 } else {
867- Gst .getExecutor ().execute (() -> bus .dispatchMessage (msg ));
893+ Gst .getExecutor ().execute (() -> bus .dispatchMessage (busPtr , msgPtr ));
868894 }
869- //
870- // Unref the message, since we are dropping it.
871- // (the normal GC will drop other refs to it)
872- //
873- // GSTMINIOBJECT_API.gst_mini_object_unref(msg);
874- Natives .unref (msg );
875895 return BusSyncReply .DROP ;
876896 }
877897 }
0 commit comments