Hey,
I am currently working on a gstreamer project that takes a stream via RTSP and outputs it as HLS. This works just fine. Now I want to add sort of a snapshot function, to take a PNG picture out of the stream occasionally. To do this, I wanted to use a tee to duplicate the stream. However, when I try to dynamically link some parts, I receive an error. A reduced version of my code looks like this:
use gst::prelude::*;
use gst_video::VideoFormat;
use std::env;
use std::net::UdpSocket;
use std::thread;
use std::str;
fn main() {
// Initialize GStreamer
gst::init().unwrap();
let args: Vec<String> = env::args().collect();
let location = &args[1];
println!("Using {} as stream location.", location);
// Create the elements
let source = gst::ElementFactory::make("rtspsrc", Some("source"))
.expect("Could not create rtspsrc element.");
source.set_property("location", &location);
source.set_property("latency", &(500 as u32))
.expect("Couldn't set RTSP Source latency");
let depayloader = gst::ElementFactory::make("rtph264depay", Some("depayloader"))
.expect("Could not create depayloader element.");
let decodebin = gst::ElementFactory::make("decodebin", Some("decodebin"))
.expect("Could not create decodebin element.");
let convert = gst::ElementFactory::make("videoconvert", Some("video_convert"))
.expect("Could not create convert element.");
let convert2 = gst::ElementFactory::make("x264enc", Some("video_convert2"))
.expect("Could not create convert element.");
convert2.set_property("key_int_max", &(5 as u32));
let convert3 = gst::ElementFactory::make("h264parse", Some("video_convert3"))
.expect("Could not create convert element.");
let sink = gst::ElementFactory::make("hlssink2", Some("udp_sink"))
.expect("Could not create sink element.");
sink.set_property("playlist-root", &"http://10.10.193.95:443");
sink.set_property("playlist-location", &"/stream/playlist.m3u8");
sink.set_property("location", &"/stream/segment%05d.ts");
sink.set_property("target-duration", &(1 as u32))
.expect("Not working.");
sink.set_property("max-files", &(6 as u32))
.expect("Not working.");
sink.set_property("playlist-length", &(3 as u32))
.expect("Not working.");
sink.set_property("send-keyframe-requests", &(false))
.expect("Not working.");
let filter = gst::ElementFactory::make("capsfilter", Some("filter"))
.expect("Could not create filter element.");
let video_caps =
gst::Caps::new_simple("video/x-h264", &[("profile", &"constrained-baseline")]);
filter.set_property("caps", &video_caps);
let tee = gst::ElementFactory::make("tee", Some("tee"))
.expect("Could not create tee element.");
let tee_queue = gst::ElementFactory::make("queue", Some("tee_queue"))
.expect("Could not create tee queue element.");
let tee_queue2 = gst::ElementFactory::make("queue", Some("tee_queue2"))
.expect("Could not create tee queue 2 element.");
tee_queue.set_property_from_str("Queue-leaky", &"downstream");
let png_enc = gst::ElementFactory::make("pngenc", Some("pngenc"))
.expect("Could not create PNG encoder element.");
png_enc.set_property("snapshot", &(true))
.expect("Could not set png_enc property.");
let filesink = gst::ElementFactory::make("filesink", Some("filesink"))
.expect("Could not create filesink element.");
filesink.set_property("location", &"/stream/capture.png")
.expect("Could not set filesink property.");
// Create the empty pipeline
let pipeline = gst::Pipeline::new(Some("test-pipeline"));
// Build the pipeline Note that we are NOT linking the source at this
// point. We will do it later.
pipeline
.add_many(&[&source, &decodebin, &depayloader, &convert, &convert2, &filter, &convert3, &sink, &tee, &tee_queue, &tee_queue2, &png_enc, &filesink])
//.add_many(&[&source, &parse, &queue, &convert, &convert2, &filter, &convert3, &sink])
.unwrap();
let tee_hls_pad = tee.get_request_pad("src_%u").unwrap();
println!(
"Obtained request pad {} for HLS branch",
tee_hls_pad.get_name()
);
let tee_queue2_pad = tee_queue2.get_static_pad("sink").unwrap();
tee_hls_pad.link(&tee_queue2_pad).unwrap();
let tee_img_pad = tee.get_request_pad("src_%u").unwrap();
println!(
"Obtained request pad {} for image capture branch",
tee_img_pad.get_name()
);
let tee_queue_pad = tee_queue.get_static_pad("sink").unwrap();
tee_img_pad.link(&tee_queue_pad).unwrap();
// This links all elements in a row (something could be added in the middle)
//gst::Element::link_many(&[&source, &parse, &queue, &convert, &convert2, &filter, &convert3, &sink]).expect("Elements could not be linked.");
gst::Element::link_many(&[&depayloader, &decodebin]).expect("Depayloader/Decodebin elements could not be linked.");
gst::Element::link_many(&[&convert, &tee]).expect("Source elements could not be linked.");
gst::Element::link_many(&[&tee_queue2, &convert2, &filter, &convert3, &sink]).expect("HLS branch elements could not be linked.");
gst::Element::link_many(&[&tee_queue, &png_enc, &filesink]).expect("Image capture elements could not be linked.");
/*
let text_sink = textoverlay.get_static_pad("text_sink")
.expect("Failed to get static sink pad from text");
let udp_src_pad = udpsrc.get_static_pad("src")
.expect("Failed to get static src pad from UDP");
let res = udp_src_pad.link(&text_sink);
if res.is_err() {
println!("Text Link failed.");
} else {
println!("Text Link succeeded.");
}
*/
// Set the URI to play
//let uri =
// "https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm";
// //"file:///home/leo/zal.mp4";
//source
// .set_property("uri", &uri)
// .expect("Can't set uri property on uridecodebin");
//Connect the pad-added signal
//This is a function that is called when a new pad is added to the source element (which is audio and video each)
let sink_pad = depayloader
.get_static_pad("sink")
.expect("Failed to get static sink pad from depayloader");
source.connect_pad_added(move |src, src_pad| {
println!(
"Received new pad {} from {}",
src_pad.get_name(),
src.get_name()
);
if sink_pad.is_linked() {
println!("We are already linked. Ignoring.");
return;
}
let new_pad_caps = src_pad
.get_current_caps()
.expect("Failed to get caps of new pad.");
let new_pad_struct = new_pad_caps
.get_structure(0)
.expect("Failed to get first structure of caps.");
let new_pad_type = new_pad_struct.get_name();
/*let is_audio = new_pad_type.starts_with("video/x-raw");
if !is_audio {
println!(
"It has type {} which is not raw video. Ignoring.",
new_pad_type
);
return;
}*/
let is_video = new_pad_type.starts_with("application/x-rtp");
if !is_video {
println!(
"It has type {} which is not x-rtp. Ignoring.",
new_pad_type
);
return;
}
let res = src_pad.link(&sink_pad);
if res.is_err() {
println!("Type is {} but link failed.", new_pad_type);
} else {
println!("Link succeeded (type {}).", new_pad_type);
}
});
decodebin.connect_pad_added(move |src, src_pad| {
println!(
"Received new pad {} from {}",
src_pad.get_name(),
src.get_name()
);
let sink_pad = convert
.get_static_pad("sink")
.expect("Failed to get static sink pad from convert");
if sink_pad.is_linked() {
println!("We are already linked. Ignoring.");
return;
}
let new_pad_caps = src_pad
.get_current_caps()
.expect("Failed to get caps of new pad.");
let new_pad_struct = new_pad_caps
.get_structure(0)
.expect("Failed to get first structure of caps.");
let new_pad_type = new_pad_struct.get_name();
/*let is_audio = new_pad_type.starts_with("video/x-raw");
if !is_audio {
println!(
"It has type {} which is not raw video. Ignoring.",
new_pad_type
);
return;
}*/
let is_video = new_pad_type.starts_with("video/x-raw");
if !is_video {
println!(
"It has type {} which is not video. Ignoring.",
new_pad_type
);
return;
}
let res = src_pad.link(&sink_pad);
if res.is_err() {
println!("Type is {} but link failed.", new_pad_type);
} else {
println!("Link succeeded (type {}).", new_pad_type);
}
});
//let x = source
//.set_state(gst::State::Paused);
//println!("{}", x.to_string());
//.expect("Couldn't set the source to the ready state");
// Start playing
//textoverlay.set_property("text", &"Hello there!!!!!!");
pipeline
.set_state(gst::State::Playing)
.expect("Unable to set the pipeline to the `Playing` state");
println!("Pipeline set to playing state!!!");
// Wait until error or EOS
let bus = pipeline.get_bus().unwrap();
for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
eprintln!(
"Error received from element {:?} {}",
err.get_src().map(|s| s.get_path_string()),
err.get_error()
);
eprintln!("Debugging information: {:?}", err.get_debug());
break;
}
MessageView::StateChanged(state_changed) => {
if state_changed
.get_src()
.map(|s| s == pipeline)
.unwrap_or(false)
{
println!(
"Pipeline state changed from {:?} to {:?}",
state_changed.get_old(),
state_changed.get_current()
);
}
}
MessageView::Eos(..) => break,
_ => (),
}
}
pipeline
.set_state(gst::State::Null)
.expect("Unable to set the pipeline to the `Null` state");
}
I am running this program in a docker container with Ubuntu 20, and I’m using the most recently available gstreamer version via apt (1.16). My gstreamer rust repository was pulled up to date just an hour ago. The output my program gives me is this:
Using rtsp://10.10.193.96:8554/unicast as stream location.
Obtained request pad src_0 for HLS branch
Obtained request pad src_1 for image capture branch
Pipeline set to playing state!!!
Pipeline state changed from Null to Ready
Pipeline state changed from Ready to Paused
Received new pad recv_rtp_src_0_807327211_96 from source
Link succeeded (type application/x-rtp).
Received new pad src_0 from decodebin
Type is video/x-raw but link failed.
This last line is the problem. The link fails. The problem does not exist when I only link one of the two source pads from the tee or remove it entirely from the pipeline; but obviously I need both, because otherwise it would be pointless.