Okay, I feel like I have made some solid progress connecting all of the wires 
This is what I have now:
fn output_audio_and_video_streams(file_path: &str) -> Result<(), Error> {
gstreamer::init()?;
let pipeline: gstreamer::Pipeline = gstreamer::Pipeline::new(None);
let filesrc: gstreamer::Element = gstreamer::ElementFactory::make("filesrc", None).map_err(|_| MissingElement("filesrc"))?;
let decodebin: gstreamer::Element =
gstreamer::ElementFactory::make("decodebin", None).map_err(|_| MissingElement("decodebin"))?;
filesrc.set_property("location", &file_path)?;
pipeline.add_many(&[&filesrc, &decodebin])?;
gstreamer::Element::link_many(&[&filesrc, &decodebin])?;
let pipeline_weak: glib::object::WeakRef<gstreamer::Pipeline> = pipeline.downgrade();
decodebin.connect_pad_added(move |decode_element, decode_pad| {
let pipeline: gstreamer::Pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
let (is_audio, is_video): (bool, bool) = {
let media_type: Option<(bool, bool)> = decode_pad.get_current_caps().and_then(|caps| {
caps.get_structure(0).map(|s| {
let name = s.get_name();
(name.starts_with("audio/"), name.starts_with("video/"))
})
});
match media_type {
None => {
gst_element_warning!(
decode_element,
gstreamer::CoreError::Negotiation,
("Failed to get media type from pad {}", decode_pad.get_name())
);
return;
}
Some(media_type) => media_type,
}
};
let insert_sink = |is_audio, is_video| -> Result<(), Error> {
if is_audio {
let queue: gstreamer::Element = gstreamer::ElementFactory::make("queue", None)
.map_err(|_| MissingElement("queue"))?;
let convert: gstreamer::Element = gstreamer::ElementFactory::make("audioconvert", None)
.map_err(|_| MissingElement("audioconvert"))?;
let resample: gstreamer::Element = gstreamer::ElementFactory::make("audioresample", None)
.map_err(|_| MissingElement("audioresample"))?;
let sink_audio: gstreamer::Element = gstreamer::ElementFactory::make("appsink", None)
.map_err(|_| MissingElement("appsink"))?;
let elements: &[&gstreamer::Element; 4] = &[&queue, &convert, &resample, &sink_audio];
pipeline.add_many(elements)?;
gstreamer::Element::link_many(elements)?;
for element in elements {
element.sync_state_with_parent()?;
}
let appsink_audio: gstreamer_app::AppSink = sink_audio
.dynamic_cast::<gstreamer_app::AppSink>()
.expect("Sink element is expected to be an appsink!");
let audio_caps: gstreamer::Caps = gstreamer::Caps::builder("audio/x-raw")
.field("format", &gstreamer_audio::AudioFormat::S16le.to_str())
.field("layout", &"interleaved")
.field("channels", &(1i32))
.field("rate", &gstreamer::IntRange::<i32>::new(1, i32::MAX))
.build();
appsink_audio.set_caps(Some(&audio_caps));
let callbacks = gstreamer_app::AppSinkCallbacks::builder()
.new_sample(|appsink| {
let sample: gstreamer::Sample = appsink.pull_sample().map_err(|_| gstreamer::FlowError::Eos)?;
let buffer: &gstreamer::BufferRef = sample.get_buffer().ok_or_else(|| {
gst_element_error!(
appsink,
gstreamer::ResourceError::Failed,
("Failed to get buffer from appsink")
);
gstreamer::FlowError::Error
})?;
let map: gstreamer::BufferMap<gstreamer::buffer::Readable> = buffer.map_readable().map_err(|_| {
gst_element_error!(
appsink,
gstreamer::ResourceError::Failed,
("Failed to map buffer readable")
);
gstreamer::FlowError::Error
})?;
let audio_samples: &[i16] = map.as_slice_of::<i16>().map_err(|_| {
gst_element_error!(
appsink,
gstreamer::ResourceError::Failed,
("Failed to interpret buffer as S16 PCM")
);
gstreamer::FlowError::Error
})?;
Ok(gstreamer::FlowSuccess::Ok)
})
.build();
appsink_audio.set_callbacks(callbacks);
let sink_pad: gstreamer::Pad = queue.get_static_pad("sink").expect("queue has no sinkpad");
decode_pad.link(&sink_pad)?;
} else if is_video {
let queue: gstreamer::Element = gstreamer::ElementFactory::make("queue", None)
.map_err(|_| MissingElement("queue"))?;
let convert: gstreamer::Element = gstreamer::ElementFactory::make("videoconvert", None)
.map_err(|_| MissingElement("videoconvert"))?;
let scale: gstreamer::Element = gstreamer::ElementFactory::make("videoscale", None)
.map_err(|_| MissingElement("videoscale"))?;
let sink_video: gstreamer::Element = gstreamer::ElementFactory::make("appsink", None)
.map_err(|_| MissingElement("appsink"))?;
let elements: &[&gstreamer::Element; 4] = &[&queue, &convert, &scale, &sink_video];
pipeline.add_many(elements)?;
gstreamer::Element::link_many(elements)?;
for element in elements {
element.sync_state_with_parent()?
}
let appsink_video: gstreamer_app::AppSink = sink_video
.dynamic_cast::<gstreamer_app::AppSink>()
.expect("Sink element is expected to be an appsink!");
let video_caps: gstreamer::Caps = gstreamer::Caps::builder("video/x-raw")
// .features(&[&gstreamer_gl::CAPS_FEATURE_MEMORY_GL_MEMORY])
.field("format", &gstreamer_video::VideoFormat::Rgba.to_str())
.field("texture-target", &"2D")
.build();
appsink_video.set_caps(Some(&video_caps));
let callbacks = gstreamer_app::AppSinkCallbacks::builder()
.new_sample(|appsink| {
let sample: gstreamer::Sample = appsink.pull_sample().map_err(|_| gstreamer::FlowError::Eos)?;
let buffer: &gstreamer::BufferRef = sample.get_buffer().ok_or_else(|| {
gst_element_error!(
appsink,
gstreamer::ResourceError::Failed,
("Failed to get buffer from appsink")
);
gstreamer::FlowError::Error
})?;
let map: gstreamer::BufferMap<gstreamer::buffer::Readable> = buffer.map_readable().map_err(|_| {
gst_element_error!(
appsink,
gstreamer::ResourceError::Failed,
("Failed to map buffer readable")
);
gstreamer::FlowError::Error
})?;
let video_samples: &[u8] = map.as_slice_of::<u8>().map_err(|_| {
gst_element_error!(
appsink,
gstreamer::ResourceError::Failed,
("Failed to interpret buffer as RGBa")
);
gstreamer::FlowError::Error
})?;
Ok(gstreamer::FlowSuccess::Ok)
})
.build();
appsink_video.set_callbacks(callbacks);
let sink_pad: gstreamer::Pad = queue.get_static_pad("sink").expect("queue has no sinkpad");
decode_pad.link(&sink_pad)?;
}
Ok(())
};
if let Err(err) = insert_sink(is_audio, is_video) {
#[cfg(feature = "v1_10")]
element_error!(
decode_element,
gstreamer::LibraryError::Failed,
("Failed to insert sink"),
details: gstreamer::Structure::builder("error-details")
.field("error",
&ErrorValue(Arc::new(Mutex::new(Some(err)))))
.build()
);
#[cfg(not(feature = "v1_10"))]
gst_element_error!(
decode_element,
gstreamer::LibraryError::Failed,
("Failed to insert sink"),
["{}", err]
);
}
});
pipeline.set_state(gstreamer::State::Playing)?;
let bus: gstreamer::Bus = pipeline
.get_bus()
.expect("Pipeline without bus. Shouldn't happen!");
for msg in bus.iter_timed(gstreamer::CLOCK_TIME_NONE) {
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
pipeline.set_state(gstreamer::State::Null)?;
#[cfg(feature = "v1_10")]
{
match err.get_details() {
// This bus-message of type error contained our custom error-details struct
// that we sent in the pad-added callback above. So we unpack it and log
// the detailed error information here. details contains a glib::SendValue.
// The unpacked error is the converted to a Result::Err, stopping the
// application's execution.
Some(details) if details.get_name() == "error-details" => details
.get::<&ErrorValue>("error")
.unwrap()
.and_then(|v| v.0.lock().unwrap().take())
.map(Result::Err)
.expect("error-details message without actual error"),
_ => Err(ErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
}
.into()),
}?;
}
#[cfg(not(feature = "v1_10"))]
{
return Err(ErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
}
.into());
}
}
MessageView::StateChanged(s) => {
println!(
"State changed from {:?}: {:?} -> {:?} ({:?})",
s.get_src().map(|s| s.get_path_string()),
s.get_old(),
s.get_current(),
s.get_pending()
);
}
_ => (),
}
}
pipeline.set_state(gstreamer::State::Null)?;
Ok(())
}
The issue I have not been able to figure out, is how to export/return the audio_samples
and video_samples
variables from within the nested closures/callbacks.
A working example of what I need is here (I mentioned this in my first post).
A quick example of what I need to do is this:
let (audio, video): (&[i16], &[u8]) = output_audio_and_video_streams("/path/to/file.mkv");
handle.upload_audio_frame(audio);
handle.upload_video_frame(video);
Where the return type of my function is changed to -> (&[i16], &[u8])
Do you have any thoughts on how to do this?