-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdynamic_pipelines.py
121 lines (93 loc) · 4.16 KB
/
dynamic_pipelines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
import sys
import gi
import logging
gi.require_version('GLib', '2.0')
gi.require_version('GObject', '2.0')
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
logging.basicConfig(level=logging.DEBUG, format="[%(name)s] [%(levelname)8s] - %(message)s")
logger = logging.getLogger(__name__)
# This function will be called by the pad-added signal
def pad_added_handler(src, new_pad, data):
sink_pad = data[2].get_static_pad("sink")
# If our converter is already linked, we have nothing to do here
if sink_pad.is_linked():
print("We are already linked. Ignoring.\n")
return
# check the new pads type
new_pad_caps = new_pad.get_current_caps() # Gets the capabilities currently configured on pad.
new_pad_struct = new_pad_caps.get_structure(0) # Finds the structure in caps that has the index 0, and returns it.
new_pad_type = new_pad_struct.get_name() # Get the name of structure as a string
print(new_pad_type)
if not new_pad_type.startswith("audio/x-raw"):
print(f"It has type {new_pad_type} which is not raw audio. Ignoring.\n")
return
# Attempt the link
ret = new_pad.link(sink_pad)
if ret:
print(f"Type is {new_pad_type} but link failed {ret}.\n")
else:
print(f"Link succeeded (type {new_pad_type}).\n")
def main():
# initialize GStreamer
Gst.init(sys.argv[1:])
# create the elements
source = Gst.ElementFactory.make("uridecodebin", "source0")
convert = Gst.ElementFactory.make("audioconvert", "convert")
resample = Gst.ElementFactory.make("audioresample", "resample")
sink = Gst.ElementFactory.make("autoaudiosink", "sink")
# create the pipeline
pipeline = Gst.Pipeline.new("test-pipeline")
if not source or not convert or not resample or not sink:
logger.error("Not all elements could be created.\n")
sys.exit(1)
# Build the pipeline. Note that we are NOT linking the source at this
# point. We will do it later
pipeline.add(source)
pipeline.add(convert)
pipeline.add(resample)
pipeline.add(sink)
if not convert.link(resample) or not resample.link(sink):
logger.error("Elements could not be linked.\n")
sys.exit(1)
# set the Uri to play
source.set_property("uri", "https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm")
print("connecting pad-added signal")
# Connect to the pad-added signal
data = [pipeline, source, convert, resample, sink]
source.connect("pad_added", pad_added_handler, data)
# start playing
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state.\n")
pipeline.unref()
return
# Listen to the bus
bus = pipeline.get_bus()
terminate = False
while not terminate:
msg = bus.timed_pop_filtered(
Gst.CLOCK_TIME_NONE, Gst.MessageType.STATE_CHANGED | Gst.MessageType.ERROR | Gst.MessageType.EOS
)
if msg:
if msg.type == Gst.MessageType.ERROR:
err, debug_info = msg.parse_error()
logger.error(f"Error received from element {msg.src.get_name()}: {err.message}")
logger.error(f"Debugging information: {debug_info if debug_info else 'none'}")
terminate = True
elif msg.type == Gst.MessageType.EOS:
logger.info("End-Of-Stream reached.")
terminate = True
elif msg.type == Gst.MessageType.STATE_CHANGED:
# We are only interested in state-changed messages from the pipeline
if type(msg.src) == type(pipeline):
old_state, new_state, pending_state = msg.parse_state_changed()
logger.info(
f"Pipeline state changed from {Gst.Element.state_get_name(old_state)} to {Gst.Element.state_get_name(new_state)}:\n"
)
else:
logger.error("Unexpected message received.\n")
pipeline.set_state(Gst.State.NULL)
if __name__=="__main__":
main()