From 6062350b61e14ba490ac53449b726af2604993a8 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Mon, 11 Mar 2024 12:45:24 -0700 Subject: [PATCH 01/27] Starting --- samples/kvssink_gstreamer_sample.cpp | 29 ++++++++++--- samples/kvssink_intermittent_sample.cpp | 0 src/gstreamer/gstkvssink.cpp | 54 ++++++++++++------------- 3 files changed, 49 insertions(+), 34 deletions(-) create mode 100644 samples/kvssink_intermittent_sample.cpp diff --git a/samples/kvssink_gstreamer_sample.cpp b/samples/kvssink_gstreamer_sample.cpp index ade28782..b2b23cba 100644 --- a/samples/kvssink_gstreamer_sample.cpp +++ b/samples/kvssink_gstreamer_sample.cpp @@ -166,8 +166,8 @@ static void eos_cb(GstElement *sink, GstMessage *message, CustomData *data) { data->file_list.at(data->current_file_idx).last_fragment_ts = data->key_frame_pts; } } - LOG_DEBUG("Terminating pipeline due to EOS"); - g_main_loop_quit(data->main_loop); + //LOG_DEBUG("Terminating pipeline due to EOS"); + //g_main_loop_quit(data->main_loop); } /* This function is called when an error message is posted on the bus */ @@ -356,9 +356,9 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem encoder = gst_element_factory_make("vtenc_h264_hw", "encoder"); if (encoder) { vtenc = true; - source = gst_element_factory_make("videotestsrc", "source"); + source = gst_element_factory_make("autovideosrc", "source"); if (source) { - LOG_DEBUG("Using videotestsrc"); + LOG_DEBUG("Using autovideosrc"); } else { LOG_ERROR("Failed to create videotestsrc"); return 1; @@ -403,7 +403,7 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem /* configure source */ if (vtenc) { - g_object_set(G_OBJECT(source), "is-live", TRUE, NULL); + // g_object_set(G_OBJECT(source), "is-live", TRUE, NULL); } else { g_object_set(G_OBJECT(source), "do-timestamp", TRUE, "device", "/dev/video0", NULL); } @@ -729,9 +729,26 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) { std::thread stream_timer(timer, data); stream_timer.detach(); } + LOG_DEBUG("before main loop"); data->main_loop = g_main_loop_new(NULL, FALSE); - g_main_loop_run(data->main_loop); + std::thread mainLoopThread(g_main_loop_run, data->main_loop); + + sleep(10); + LOG_DEBUG("Pausing..."); + GstEvent* eos; + eos = gst_event_new_eos(); + gst_element_send_event(pipeline, eos); + + sleep(10); + LOG_DEBUG("Playing..."); + GstEvent* flush_start = gst_event_new_flush_start(); + gst_element_send_event(pipeline, flush_start); + GstEvent*flush_stop = gst_event_new_flush_stop(true); + gst_element_send_event(pipeline, flush_stop); + + + mainLoopThread.join(); LOG_DEBUG("after main loop") diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp new file mode 100644 index 00000000..e69de29b diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 34c7d76f..11a7d83a 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -1258,26 +1258,26 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, info.data = NULL; // eos reached - if (buf == NULL && track_data == NULL) { - LOG_INFO("Received event for " << kvssink->stream_name); - // Need this check in case pipeline is already being set to NULL and - // stream is being or/already stopped. Although stopSync() is an idempotent call, - // we want to avoid an extra call. It is not possible for this callback to be invoked - // after stopSync() since we stop collecting on pads before invoking. But having this - // check anyways in case it happens - if(!data->streamingStopped.load()) { - data->kinesis_video_stream->stopSync(); - data->streamingStopped.store(true); - LOG_INFO("Sending eos for " << kvssink->stream_name); - } - - // send out eos message to gstreamer bus - message = gst_message_new_eos (GST_OBJECT_CAST (kvssink)); - gst_element_post_message (GST_ELEMENT_CAST (kvssink), message); - - ret = GST_FLOW_EOS; - goto CleanUp; - } + // if (buf == NULL && track_data == NULL) { + // LOG_INFO("Received event for " << kvssink->stream_name); + // // Need this check in case pipeline is already being set to NULL and + // // stream is being or/already stopped. Although stopSync() is an idempotent call, + // // we want to avoid an extra call. It is not possible for this callback to be invoked + // // after stopSync() since we stop collecting on pads before invoking. But having this + // // check anyways in case it happens + // if(!data->streamingStopped.load()) { + // // data->kinesis_video_stream->stopSync(); + // // data->streamingStopped.store(true); + // // LOG_INFO("Sending eos for " << kvssink->stream_name); + // } + + // // send out eos message to gstreamer bus + // message = gst_message_new_eos (GST_OBJECT_CAST (kvssink)); + // gst_element_post_message (GST_ELEMENT_CAST (kvssink), message); + + // ret = GST_FLOW_EOS; + // goto CleanUp; + // } if (STATUS_FAILED(stream_status)) { // in offline case, we cant tell the pipeline to restream the file again in case of network outage. @@ -1297,6 +1297,9 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, } if(buf != NULL) { + + // GST_BUFFER_FLAG_IS_SET uses bitwise & to compare, 1536 = GST_BUFFER_FLAG_MARKER (512) + GST_BUFFER_FLAG_HEADER (1024) + isDroppable = GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_CORRUPTED) || GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DECODE_ONLY) || (GST_BUFFER_FLAGS(buf) == GST_BUFFER_FLAG_DISCONT) || @@ -1364,9 +1367,9 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, } } - put_frame_status = put_frame(data, info.data, info.size, - std::chrono::nanoseconds(buf->pts), - std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count); + put_frame(kvssink->data, info.data, info.size, + std::chrono::nanoseconds(buf->pts), + std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count); data->frame_count++; } else { @@ -1381,11 +1384,6 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, if (buf != NULL) { gst_buffer_unref (buf); } - - if (STATUS_FAILED(put_frame_status)) { - GST_ELEMENT_WARNING (kvssink, RESOURCE, WRITE, (NULL), - ("put frame error occurred. Status: 0x%08x", put_frame_status)); - } return ret; } From 35747c534992ae45cc93ce104724cbc1b928797f Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Mar 2024 09:37:23 -0700 Subject: [PATCH 02/27] Started clean sample --- .gitignore | 2 +- samples/kvssink_intermittent_sample.cpp | 184 ++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 5f8d045a..0099dc64 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,5 @@ open-source/ outputs tags dependency - .vs +.vscode diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index e69de29b..bd0c94ea 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -0,0 +1,184 @@ +#include +#include + +using namespace std; +using namespace com::amazonaws::kinesis::video; +using namespace log4cplus; + + +typedef enum _StreamSource { + TEST_SOURCE, + DEVICE_SOURCE, + RTSP_SOURCE +} StreamSource; + +static gboolean +bus_call (GstBus *bus, + GstMessage *msg, + gpointer data) +{ + GMainLoop *loop = (GMainLoop *) data; + + switch (GST_MESSAGE_TYPE (msg)) { + + case GST_MESSAGE_EOS: + g_print ("End of stream\n"); + g_main_loop_quit (loop); + break; + + case GST_MESSAGE_ERROR: { + gchar *debug; + GError *error; + + gst_message_parse_error (msg, &error, &debug); + g_free (debug); + + g_printerr ("Error: %s\n", error->message); + g_error_free (error); + + g_main_loop_quit (loop); + break; + } + default: + break; + } + + return TRUE; +} + + +static void +on_pad_added (GstElement *element, + GstPad *pad, + gpointer data) +{ + GstPad *sinkpad; + GstElement *decoder = (GstElement *) data; + + /* We can now link this pad with the vorbis-decoder sink pad */ + g_print ("Dynamic pad created, linking demuxer/decoder\n"); + + sinkpad = gst_element_get_static_pad (decoder, "sink"); + + gst_pad_link (pad, sinkpad); + + gst_object_unref (sinkpad); +} + + + +int main (int argc, char *argv[]) +{ + GMainLoop *loop; + + GstElement *pipeline, *source, *video_convert, *source_filter, *encoder, *h264parse, *kvssink; + GstCaps *source_caps; + + GstBus *bus; + guint bus_watch_id; + + StreamSource source_type; + + /* GStreamer Initialisation */ + gst_init (&argc, &argv); + + loop = g_main_loop_new (NULL, FALSE); + + /* Check input arguments */ + if (argc > 2) { + g_printerr ("Usage: %s \n", argv[0]); + return -1; + } + + if(argc > 1) { + if(0 == STRCMPI(argv[1], "tesetesrc")) { + source_type = TEST_SOURCE; + } else if (0 == STRCMPI(argv[1], "devicesrc")) { + source_type = DEVICE_SOURCE; + } else if (0 == STRCMPI(argv[1], "rtspsrc")) { + source_type = RTSP_SOURCE; + } + } else { + source_type = TEST_SOURCE; + } + + /* Create gstreamer elements */ + pipeline = gst_pipeline_new ("kvs-pipeline"); + + if(source_type == TEST_SOURCE) { + source = gst_element_factory_make ("videotestsrc", "test-source"); + } else if (source_type == DEVICE_SOURCE) { + source = gst_element_factory_make ("autovideosrc", "device-source"); + } else { // RTSP_SOURCE + } + + video_convert = gst_element_factory_make("videoconvert", "video_convert"); + + source_filter = gst_element_factory_make("capsfilter", "source-filter"); + source_caps = gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", NULL); + g_object_set(G_OBJECT(source_filter), "caps", source_caps, NULL); + + encoder = gst_element_factory_make("x264enc", "encoder"); + g_object_set(G_OBJECT(encoder), "bframes", 0, "tune", "zero-latency", NULL); + + h264parse = gst_element_factory_make("h264parse", "h264parse"); + + kvssink = gst_element_factory_make("kvssink", "kvssink") + + if (!kvssink) { + LOG_ERROR("Failed to create kvssink element"); + return -1; + } + + if (!pipeline || !source || !video_convert || !source_filter || !encoder || !h264parse) { + g_printerr ("Not all GStreamer elements could be created.\n"); + return -1; + } + + /* Set up the pipeline */ + + /* we add a message handler */ + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + bus_watch_id = gst_bus_add_watch (bus, bus_call, loop); + gst_object_unref (bus); + + /* we add all elements into the pipeline */ + /* file-source | ogg-demuxer | vorbis-decoder | converter | alsa-output */ + gst_bin_add_many (GST_BIN (pipeline), + source, demuxer, decoder, conv, sink, NULL); + + /* we link the elements together */ + /* file-source -> ogg-demuxer ~> vorbis-decoder -> converter -> alsa-output */ + gst_element_link (source, demuxer); + gst_element_link_many (decoder, conv, sink, NULL); + g_signal_connect (demuxer, "pad-added", G_CALLBACK (on_pad_added), decoder); + + /* note that the demuxer will be linked to the decoder dynamically. + The reason is that Ogg may contain various streams (for example + audio and video). The source pad(s) will be created at run time, + by the demuxer when it detects the amount and nature of streams. + Therefore we connect a callback function which will be executed + when the "pad-added" is emitted.*/ + + + /* Set the pipeline to "playing" state*/ + g_print ("Now playing: %s\n", argv[1]); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + + /* Iterate */ + g_print ("Running...\n"); + g_main_loop_run (loop); + + + /* Out of the main loop, clean up nicely */ + g_print ("Returned, stopping playback\n"); + gst_element_set_state (pipeline, GST_STATE_NULL); + + g_print ("Deleting pipeline\n"); + gst_object_unref (GST_OBJECT (pipeline)); + g_source_remove (bus_watch_id); + g_main_loop_unref (loop); + + return 0; +} \ No newline at end of file From 235d0d07d5ff434084bd33ea69f56df2dd07d1bd Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Mar 2024 10:10:33 -0700 Subject: [PATCH 03/27] more --- samples/kvssink_intermittent_sample.cpp | 124 ++++++++++++++---------- 1 file changed, 71 insertions(+), 53 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index bd0c94ea..8b5be605 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -1,10 +1,13 @@ #include #include +#include "gstreamer/gstkvssink.h" + using namespace std; using namespace com::amazonaws::kinesis::video; using namespace log4cplus; +GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); typedef enum _StreamSource { TEST_SOURCE, @@ -12,6 +15,14 @@ typedef enum _StreamSource { RTSP_SOURCE } StreamSource; +void sigint_handler(int sigint){ + LOG_DEBUG("SIGINT received. Exiting..."); + + if(main_loop != NULL){ + g_main_loop_quit(main_loop); + } +} + static gboolean bus_call (GstBus *bus, GstMessage *msg, @@ -22,8 +33,7 @@ bus_call (GstBus *bus, switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_EOS: - g_print ("End of stream\n"); - g_main_loop_quit (loop); + g_print ("Received EOS\n"); break; case GST_MESSAGE_ERROR: { @@ -46,63 +56,76 @@ bus_call (GstBus *bus, return TRUE; } - -static void -on_pad_added (GstElement *element, - GstPad *pad, - gpointer data) -{ - GstPad *sinkpad; - GstElement *decoder = (GstElement *) data; - - /* We can now link this pad with the vorbis-decoder sink pad */ - g_print ("Dynamic pad created, linking demuxer/decoder\n"); - - sinkpad = gst_element_get_static_pad (decoder, "sink"); - - gst_pad_link (pad, sinkpad); - - gst_object_unref (sinkpad); +void determine_credentials(GstElement *kvssink, G_TYPE_STRING streamName) { + + char const *iot_credential_endpoint; + char const *cert_path; + char const *private_key_path; + char const *role_alias; + char const *ca_cert_path; + char const *credential_path; + if (nullptr != (iot_credential_endpoint = getenv("IOT_GET_CREDENTIAL_ENDPOINT")) && + nullptr != (cert_path = getenv("CERT_PATH")) && + nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) && + nullptr != (role_alias = getenv("ROLE_ALIAS")) && + nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) { + // set the IoT Credentials if provided in envvar + GstStructure *iot_credentials = gst_structure_new( + "iot-certificate", + "iot-thing-name", G_TYPE_STRING, streamName, + "endpoint", G_TYPE_STRING, iot_credential_endpoint, + "cert-path", G_TYPE_STRING, cert_path, + "key-path", G_TYPE_STRING, private_key_path, + "ca-path", G_TYPE_STRING, ca_cert_path, + "role-aliases", G_TYPE_STRING, role_alias, NULL); + + g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); + gst_structure_free(iot_credentials); + // kvssink will search for long term credentials in envvar automatically so no need to include here + // if no long credentials or IoT credentials provided will look for credential file as last resort + } else if(nullptr != (credential_path = getenv("AWS_CREDENTIAL_PATH"))){ + g_object_set(G_OBJECT (kvssink), "credential-path", credential_path, NULL); + } } - int main (int argc, char *argv[]) { - GMainLoop *loop; + signal(SIGINT, sigint_handler); GstElement *pipeline, *source, *video_convert, *source_filter, *encoder, *h264parse, *kvssink; GstCaps *source_caps; - GstBus *bus; guint bus_watch_id; - StreamSource source_type; + char stream_name[MAX_STREAM_NAME_LEN + 1]; /* GStreamer Initialisation */ gst_init (&argc, &argv); - loop = g_main_loop_new (NULL, FALSE); - /* Check input arguments */ - if (argc > 2) { - g_printerr ("Usage: %s \n", argv[0]); + if (argc > 3 || argc < 2) { + g_printerr ("Usage: %s \n", argv[0]); return -1; } - if(argc > 1) { - if(0 == STRCMPI(argv[1], "tesetesrc")) { + STRNCPY(stream_name, argv[1], MAX_STREAM_NAME_LEN); + stream_name[MAX_STREAM_NAME_LEN] = '\0'; + + if(argc > 2) { + if(0 == STRCMPI(argv[2], "testsrc")) { source_type = TEST_SOURCE; - } else if (0 == STRCMPI(argv[1], "devicesrc")) { + } else if (0 == STRCMPI(argv[2], "devicesrc")) { source_type = DEVICE_SOURCE; - } else if (0 == STRCMPI(argv[1], "rtspsrc")) { + } else if (0 == STRCMPI(argv[2], "rtspsrc")) { source_type = RTSP_SOURCE; } } else { source_type = TEST_SOURCE; } - /* Create gstreamer elements */ + + /* Create GStreamer elements */ pipeline = gst_pipeline_new ("kvs-pipeline"); if(source_type == TEST_SOURCE) { @@ -124,7 +147,9 @@ int main (int argc, char *argv[]) h264parse = gst_element_factory_make("h264parse", "h264parse"); kvssink = gst_element_factory_make("kvssink", "kvssink") - + g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); + determine_credentials(kvssink, stream_name); + if (!kvssink) { LOG_ERROR("Failed to create kvssink element"); return -1; @@ -135,40 +160,33 @@ int main (int argc, char *argv[]) return -1; } + /* Set up the pipeline */ /* we add a message handler */ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); - bus_watch_id = gst_bus_add_watch (bus, bus_call, loop); + bus_watch_id = gst_bus_add_watch (bus, bus_call, main_loop); gst_object_unref (bus); - /* we add all elements into the pipeline */ - /* file-source | ogg-demuxer | vorbis-decoder | converter | alsa-output */ + /* Add elements into the pipeline */ gst_bin_add_many (GST_BIN (pipeline), - source, demuxer, decoder, conv, sink, NULL); - - /* we link the elements together */ - /* file-source -> ogg-demuxer ~> vorbis-decoder -> converter -> alsa-output */ - gst_element_link (source, demuxer); - gst_element_link_many (decoder, conv, sink, NULL); - g_signal_connect (demuxer, "pad-added", G_CALLBACK (on_pad_added), decoder); - - /* note that the demuxer will be linked to the decoder dynamically. - The reason is that Ogg may contain various streams (for example - audio and video). The source pad(s) will be created at run time, - by the demuxer when it detects the amount and nature of streams. - Therefore we connect a callback function which will be executed - when the "pad-added" is emitted.*/ + source, video_convert, source_filter, encoder, h264parse, kvssink, NULL); + /* Link the elements together */ + if (!gst_element_link_many(source, video_convert, source_filter, encoder, h264parse, kvssink, NULL)) { + g_printerr("Elements could not be linked.\n"); + gst_object_unref(pipeline); + return -1; + } /* Set the pipeline to "playing" state*/ - g_print ("Now playing: %s\n", argv[1]); + g_print ("Playing..\n"); gst_element_set_state (pipeline, GST_STATE_PLAYING); /* Iterate */ g_print ("Running...\n"); - g_main_loop_run (loop); + g_main_loop_run (main_loop); /* Out of the main loop, clean up nicely */ @@ -178,7 +196,7 @@ int main (int argc, char *argv[]) g_print ("Deleting pipeline\n"); gst_object_unref (GST_OBJECT (pipeline)); g_source_remove (bus_watch_id); - g_main_loop_unref (loop); + g_main_loop_unref (main_loop); return 0; } \ No newline at end of file From ff0c19fdf4519e6deeee6f7a3d3148697e567e02 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Mar 2024 10:25:14 -0700 Subject: [PATCH 04/27] Streaming to KVS working --- CMakeLists.txt | 3 +++ samples/kvssink_intermittent_sample.cpp | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index be5c863d..96290778 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -257,6 +257,9 @@ if(BUILD_GSTREAMER_PLUGIN) add_executable(kvs_gstreamer_file_uploader_sample samples/kvs_gstreamer_file_uploader_sample.cpp) target_link_libraries(kvs_gstreamer_file_uploader_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES}) + add_executable(kvssink_intermittent_sample samples/kvssink_intermittent_sample.cpp) + target_link_libraries(kvssink_intermittent_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} kvspic) + install( TARGETS gstkvssink ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 8b5be605..84b34568 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -7,6 +7,8 @@ using namespace std; using namespace com::amazonaws::kinesis::video; using namespace log4cplus; +LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); + GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); typedef enum _StreamSource { @@ -56,7 +58,7 @@ bus_call (GstBus *bus, return TRUE; } -void determine_credentials(GstElement *kvssink, G_TYPE_STRING streamName) { +void determine_credentials(GstElement *kvssink, char* streamName) { char const *iot_credential_endpoint; char const *cert_path; @@ -146,7 +148,7 @@ int main (int argc, char *argv[]) h264parse = gst_element_factory_make("h264parse", "h264parse"); - kvssink = gst_element_factory_make("kvssink", "kvssink") + kvssink = gst_element_factory_make("kvssink", "kvssink"); g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); determine_credentials(kvssink, stream_name); From 8f31a63bdf6d9fdcf168ce9c479eb7ed9d3c3e38 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Mar 2024 12:48:30 -0700 Subject: [PATCH 05/27] Intermittent streaming working. --- samples/kvssink_intermittent_sample.cpp | 78 ++++++++++++++++++++----- 1 file changed, 65 insertions(+), 13 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 84b34568..617147b6 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -1,15 +1,24 @@ +#include // std::thread +#include // std::chrono::seconds +#include // std::mutex, std::unique_lock +#include // std::condition_variable, std::cv_status + #include #include #include "gstreamer/gstkvssink.h" -using namespace std; using namespace com::amazonaws::kinesis::video; using namespace log4cplus; LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); +#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 10 +#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 10 + GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); +std::atomic terminated(FALSE); +std::condition_variable cv; typedef enum _StreamSource { TEST_SOURCE, @@ -19,7 +28,8 @@ typedef enum _StreamSource { void sigint_handler(int sigint){ LOG_DEBUG("SIGINT received. Exiting..."); - + terminated = TRUE; + cv.notify_all(); if(main_loop != NULL){ g_main_loop_quit(main_loop); } @@ -58,8 +68,7 @@ bus_call (GstBus *bus, return TRUE; } -void determine_credentials(GstElement *kvssink, char* streamName) { - +void determine_aws_credentials(GstElement *kvssink, char* streamName) { char const *iot_credential_endpoint; char const *cert_path; char const *private_key_path; @@ -90,13 +99,40 @@ void determine_credentials(GstElement *kvssink, char* streamName) { } } +void stopStartLoop(GstElement *pipeline) { + std::mutex cv_m; + std::unique_lock lck(cv_m); + + while (!terminated) { + if (cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) { + break; + } + + LOG_DEBUG("Pausing..."); + // EOS is necessary to push frame(s) buffered by h264enc element + GstEvent* eos; + eos = gst_event_new_eos(); + gst_element_send_event(pipeline, eos); + if (cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) { + break; + } + + LOG_DEBUG("Playing..."); + // Flushing to remove EOS from elements + GstEvent* flush_start = gst_event_new_flush_start(); + gst_element_send_event(pipeline, flush_start); + GstEvent*flush_stop = gst_event_new_flush_stop(true); + gst_element_send_event(pipeline, flush_stop); + } + LOG_DEBUG("Exited stopStartLoop"); +} int main (int argc, char *argv[]) { signal(SIGINT, sigint_handler); - GstElement *pipeline, *source, *video_convert, *source_filter, *encoder, *h264parse, *kvssink; - GstCaps *source_caps; + GstElement *pipeline, *source, *clock_overlay, *video_convert, *source_filter, *encoder, *sink_filter, *kvssink; + GstCaps *source_caps, *sink_caps; GstBus *bus; guint bus_watch_id; StreamSource source_type; @@ -137,27 +173,39 @@ int main (int argc, char *argv[]) } else { // RTSP_SOURCE } + clock_overlay = gst_element_factory_make("clockoverlay", "clock_overlay"); + video_convert = gst_element_factory_make("videoconvert", "video_convert"); source_filter = gst_element_factory_make("capsfilter", "source-filter"); source_caps = gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", NULL); g_object_set(G_OBJECT(source_filter), "caps", source_caps, NULL); + gst_caps_unref(source_caps); encoder = gst_element_factory_make("x264enc", "encoder"); g_object_set(G_OBJECT(encoder), "bframes", 0, "tune", "zero-latency", NULL); - h264parse = gst_element_factory_make("h264parse", "h264parse"); + sink_filter = gst_element_factory_make("capsfilter", "sink-filter"); + sink_caps = gst_caps_new_simple("video/x-h264", + "stream-format", G_TYPE_STRING, "avc", + "alignment", G_TYPE_STRING, "au", + // "framerate", GST_TYPE_FRACTION, 20, 1, + NULL); + g_object_set(G_OBJECT(sink_filter), "caps", sink_caps, NULL); + gst_caps_unref(sink_caps); kvssink = gst_element_factory_make("kvssink", "kvssink"); g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); - determine_credentials(kvssink, stream_name); + determine_aws_credentials(kvssink, stream_name); + + if (!kvssink) { LOG_ERROR("Failed to create kvssink element"); return -1; } - if (!pipeline || !source || !video_convert || !source_filter || !encoder || !h264parse) { + if (!pipeline || !source || !video_convert || !source_filter || !encoder) { g_printerr ("Not all GStreamer elements could be created.\n"); return -1; } @@ -172,10 +220,10 @@ int main (int argc, char *argv[]) /* Add elements into the pipeline */ gst_bin_add_many (GST_BIN (pipeline), - source, video_convert, source_filter, encoder, h264parse, kvssink, NULL); + source, clock_overlay, video_convert, source_filter, encoder, kvssink, NULL); /* Link the elements together */ - if (!gst_element_link_many(source, video_convert, source_filter, encoder, h264parse, kvssink, NULL)) { + if (!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, kvssink, NULL)) { g_printerr("Elements could not be linked.\n"); gst_object_unref(pipeline); return -1; @@ -185,13 +233,17 @@ int main (int argc, char *argv[]) g_print ("Playing..\n"); gst_element_set_state (pipeline, GST_STATE_PLAYING); + /* Start stop/start thread for intermittent streaming */ + std::thread stopStartThread(stopStartLoop, pipeline); + /* Iterate */ g_print ("Running...\n"); g_main_loop_run (main_loop); + stopStartThread.join(); - /* Out of the main loop, clean up nicely */ + /* Out of the main loop, clean up */ g_print ("Returned, stopping playback\n"); gst_element_set_state (pipeline, GST_STATE_NULL); @@ -201,4 +253,4 @@ int main (int argc, char *argv[]) g_main_loop_unref (main_loop); return 0; -} \ No newline at end of file +} From bcec5b600f46656a72016950854bf86e29bee42e Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Mar 2024 12:59:28 -0700 Subject: [PATCH 06/27] Add comments, cleanup --- samples/kvssink_intermittent_sample.cpp | 27 ++++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 617147b6..271dd48f 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -36,9 +36,7 @@ void sigint_handler(int sigint){ } static gboolean -bus_call (GstBus *bus, - GstMessage *msg, - gpointer data) +bus_call (GstBus *bus, GstMessage *msg, gpointer data) { GMainLoop *loop = (GMainLoop *) data; @@ -99,17 +97,19 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { } } +// This function handles the starting and stopping of the stream. void stopStartLoop(GstElement *pipeline) { std::mutex cv_m; std::unique_lock lck(cv_m); while (!terminated) { + // Use cv.wait_for to break sleep early upon signal interrupt. if (cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) { break; } LOG_DEBUG("Pausing..."); - // EOS is necessary to push frame(s) buffered by h264enc element + // EOS is necessary to push frame(s) buffered by the h264enc element GstEvent* eos; eos = gst_event_new_eos(); gst_element_send_event(pipeline, eos); @@ -118,10 +118,10 @@ void stopStartLoop(GstElement *pipeline) { } LOG_DEBUG("Playing..."); - // Flushing to remove EOS from elements + // Flushing to remove EOS status GstEvent* flush_start = gst_event_new_flush_start(); gst_element_send_event(pipeline, flush_start); - GstEvent*flush_stop = gst_event_new_flush_stop(true); + GstEvent* flush_stop = gst_event_new_flush_stop(true); gst_element_send_event(pipeline, flush_stop); } LOG_DEBUG("Exited stopStartLoop"); @@ -164,8 +164,10 @@ int main (int argc, char *argv[]) /* Create GStreamer elements */ + pipeline = gst_pipeline_new ("kvs-pipeline"); + /* configure source */ if(source_type == TEST_SOURCE) { source = gst_element_factory_make ("videotestsrc", "test-source"); } else if (source_type == DEVICE_SOURCE) { @@ -173,18 +175,23 @@ int main (int argc, char *argv[]) } else { // RTSP_SOURCE } + /* clock overlay */ clock_overlay = gst_element_factory_make("clockoverlay", "clock_overlay"); + /* video convert */ video_convert = gst_element_factory_make("videoconvert", "video_convert"); + /* source filter */ source_filter = gst_element_factory_make("capsfilter", "source-filter"); source_caps = gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", NULL); g_object_set(G_OBJECT(source_filter), "caps", source_caps, NULL); gst_caps_unref(source_caps); + /* encoder */ encoder = gst_element_factory_make("x264enc", "encoder"); g_object_set(G_OBJECT(encoder), "bframes", 0, "tune", "zero-latency", NULL); + /* sink filter */ sink_filter = gst_element_factory_make("capsfilter", "sink-filter"); sink_caps = gst_caps_new_simple("video/x-h264", "stream-format", G_TYPE_STRING, "avc", @@ -194,11 +201,13 @@ int main (int argc, char *argv[]) g_object_set(G_OBJECT(sink_filter), "caps", sink_caps, NULL); gst_caps_unref(sink_caps); + /* kvssink */ kvssink = gst_element_factory_make("kvssink", "kvssink"); g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); determine_aws_credentials(kvssink, stream_name); + /* Check that GStreamer elements were all successfully created */ if (!kvssink) { LOG_ERROR("Failed to create kvssink element"); @@ -213,7 +222,7 @@ int main (int argc, char *argv[]) /* Set up the pipeline */ - /* we add a message handler */ + /* Add a message handler */ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); bus_watch_id = gst_bus_add_watch (bus, bus_call, main_loop); gst_object_unref (bus); @@ -236,9 +245,7 @@ int main (int argc, char *argv[]) /* Start stop/start thread for intermittent streaming */ std::thread stopStartThread(stopStartLoop, pipeline); - - /* Iterate */ - g_print ("Running...\n"); + g_print ("Running main loop...\n"); g_main_loop_run (main_loop); stopStartThread.join(); From 9c371da21eaca5c3b74d693966960957875f6ad7 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Mar 2024 14:30:50 -0700 Subject: [PATCH 07/27] EOS stream message working --- samples/kvssink_intermittent_sample.cpp | 96 +++++++++++++++---------- src/gstreamer/gstkvssink.cpp | 3 + 2 files changed, 63 insertions(+), 36 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 271dd48f..371477c7 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -18,6 +18,7 @@ LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); std::atomic terminated(FALSE); +std::atomic eosReceived(FALSE); // TODO: Need this to handle waiting for eos from pausing std::condition_variable cv; typedef enum _StreamSource { @@ -26,6 +27,16 @@ typedef enum _StreamSource { RTSP_SOURCE } StreamSource; +typedef struct _CustomData { + _CustomData() : + main_loop(NULL), + pipeline(NULL) {} + + GMainLoop *main_loop; + GstElement *pipeline; + +} CustomData; + void sigint_handler(int sigint){ LOG_DEBUG("SIGINT received. Exiting..."); terminated = TRUE; @@ -38,32 +49,40 @@ void sigint_handler(int sigint){ static gboolean bus_call (GstBus *bus, GstMessage *msg, gpointer data) { - GMainLoop *loop = (GMainLoop *) data; - - switch (GST_MESSAGE_TYPE (msg)) { + GMainLoop *loop = (GMainLoop *) ((CustomData *)data)->main_loop; + GstElement *pipeline = (GstElement *) ((CustomData *)data)->pipeline; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_EOS: { + LOG_DEBUG("Received EOS"); + if(!terminated) { + GstEvent* flush_start = gst_event_new_flush_start(); + gst_element_send_event(pipeline, flush_start); + } + eosReceived = TRUE; + break; + } - case GST_MESSAGE_EOS: - g_print ("Received EOS\n"); - break; + case GST_MESSAGE_ERROR: { + gchar *debug; + GError *error; - case GST_MESSAGE_ERROR: { - gchar *debug; - GError *error; + gst_message_parse_error (msg, &error, &debug); + g_free (debug); - gst_message_parse_error (msg, &error, &debug); - g_free (debug); + g_printerr ("Error: %s\n", error->message); + g_error_free (error); - g_printerr ("Error: %s\n", error->message); - g_error_free (error); + g_main_loop_quit (loop); + break; + } - g_main_loop_quit (loop); - break; + default: { + break; + } } - default: - break; - } - return TRUE; + return TRUE; } void determine_aws_credentials(GstElement *kvssink, char* streamName) { @@ -78,17 +97,17 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) && nullptr != (role_alias = getenv("ROLE_ALIAS")) && nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) { - // set the IoT Credentials if provided in envvar - GstStructure *iot_credentials = gst_structure_new( - "iot-certificate", - "iot-thing-name", G_TYPE_STRING, streamName, - "endpoint", G_TYPE_STRING, iot_credential_endpoint, - "cert-path", G_TYPE_STRING, cert_path, - "key-path", G_TYPE_STRING, private_key_path, - "ca-path", G_TYPE_STRING, ca_cert_path, - "role-aliases", G_TYPE_STRING, role_alias, NULL); - - g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); + // set the IoT Credentials if provided in envvar + GstStructure *iot_credentials = gst_structure_new( + "iot-certificate", + "iot-thing-name", G_TYPE_STRING, streamName, + "endpoint", G_TYPE_STRING, iot_credential_endpoint, + "cert-path", G_TYPE_STRING, cert_path, + "key-path", G_TYPE_STRING, private_key_path, + "ca-path", G_TYPE_STRING, ca_cert_path, + "role-aliases", G_TYPE_STRING, role_alias, NULL); + + g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); gst_structure_free(iot_credentials); // kvssink will search for long term credentials in envvar automatically so no need to include here // if no long credentials or IoT credentials provided will look for credential file as last resort @@ -113,14 +132,16 @@ void stopStartLoop(GstElement *pipeline) { GstEvent* eos; eos = gst_event_new_eos(); gst_element_send_event(pipeline, eos); + + // Use cv.wait_for to break sleep early upon signal interrupt. if (cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) { break; } LOG_DEBUG("Playing..."); // Flushing to remove EOS status - GstEvent* flush_start = gst_event_new_flush_start(); - gst_element_send_event(pipeline, flush_start); + // GstEvent* flush_start = gst_event_new_flush_start(); + // gst_element_send_event(pipeline, flush_start); GstEvent* flush_stop = gst_event_new_flush_stop(true); gst_element_send_event(pipeline, flush_stop); } @@ -131,6 +152,7 @@ int main (int argc, char *argv[]) { signal(SIGINT, sigint_handler); + CustomData customData; GstElement *pipeline, *source, *clock_overlay, *video_convert, *source_filter, *encoder, *sink_filter, *kvssink; GstCaps *source_caps, *sink_caps; GstBus *bus; @@ -189,7 +211,7 @@ int main (int argc, char *argv[]) /* encoder */ encoder = gst_element_factory_make("x264enc", "encoder"); - g_object_set(G_OBJECT(encoder), "bframes", 0, "tune", "zero-latency", NULL); + g_object_set(G_OBJECT(encoder), "bframes", 0, NULL); // TODO: put back the zerolatency tune /* sink filter */ sink_filter = gst_element_factory_make("capsfilter", "sink-filter"); @@ -215,16 +237,18 @@ int main (int argc, char *argv[]) } if (!pipeline || !source || !video_convert || !source_filter || !encoder) { + // TODO: Gprint -> log4cplus g_printerr ("Not all GStreamer elements could be created.\n"); return -1; } - - /* Set up the pipeline */ + /* Populate data struct */ + customData.main_loop = main_loop; + customData.pipeline = pipeline; /* Add a message handler */ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); - bus_watch_id = gst_bus_add_watch (bus, bus_call, main_loop); + bus_watch_id = gst_bus_add_watch (bus, bus_call, &customData); gst_object_unref (bus); /* Add elements into the pipeline */ diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 11a7d83a..751bfe0b 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -1188,6 +1188,9 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads, } case GST_EVENT_EOS: { LOG_INFO("EOS Event received in sink for " << kvssink->stream_name); + // This is necessary for application to get eos message. + GstMessage * message = gst_message_new_eos (GST_OBJECT_CAST (kvssink)); + gst_element_post_message (GST_ELEMENT_CAST (kvssink), message); break; } default: From 34a6c71866b6a57bdc22df805c446c479cb70196 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Mar 2024 16:34:38 -0700 Subject: [PATCH 08/27] Improvements --- samples/kvssink_intermittent_sample.cpp | 47 ++++++++++++++++--------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 371477c7..db403ad9 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -1,6 +1,6 @@ -#include // std::thread -#include // std::chrono::seconds -#include // std::mutex, std::unique_lock +#include // std::chrono::seconds +#include // std::thread +#include // std::mutex, std::unique_lock #include // std::condition_variable, std::cv_status #include @@ -13,12 +13,12 @@ using namespace log4cplus; LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); -#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 10 -#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 10 +#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 120 +#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 120 GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); std::atomic terminated(FALSE); -std::atomic eosReceived(FALSE); // TODO: Need this to handle waiting for eos from pausing +std::atomic eosHandled(FALSE); std::condition_variable cv; typedef enum _StreamSource { @@ -41,6 +41,7 @@ void sigint_handler(int sigint){ LOG_DEBUG("SIGINT received. Exiting..."); terminated = TRUE; cv.notify_all(); + //eosHandled.notify_all(); if(main_loop != NULL){ g_main_loop_quit(main_loop); } @@ -56,10 +57,13 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data) case GST_MESSAGE_EOS: { LOG_DEBUG("Received EOS"); if(!terminated) { + LOG_DEBUG("[TESTING] HANDLING EOS"); + // Flushing to remove EOS status GstEvent* flush_start = gst_event_new_flush_start(); gst_element_send_event(pipeline, flush_start); } - eosReceived = TRUE; + eosHandled = TRUE; + eosHandled.notify_all(); break; } @@ -122,26 +126,35 @@ void stopStartLoop(GstElement *pipeline) { std::unique_lock lck(cv_m); while (!terminated) { - // Use cv.wait_for to break sleep early upon signal interrupt. + // Using cv.wait_for to break sleep early upon signal interrupt. if (cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) { break; } LOG_DEBUG("Pausing..."); - // EOS is necessary to push frame(s) buffered by the h264enc element + // EOS event is necessary to push frames buffered by the h264enc element GstEvent* eos; eos = gst_event_new_eos(); + eosHandled = FALSE; + + LOG_DEBUG("[TESTING] SENDING EOS"); gst_element_send_event(pipeline, eos); - - // Use cv.wait_for to break sleep early upon signal interrupt. - if (cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) { + + // Wait for the EOS event to return to bus. + LOG_DEBUG("[TESTING] WAITING FOR EOS"); + eosHandled.wait(FALSE); + LOG_DEBUG("[TESTING] EOS RETURNED"); + + // Flushing to remove EOS status + GstEvent* flush_start = gst_event_new_flush_start(); + gst_element_send_event(pipeline, flush_start); + + // Using cv.wait_for to break sleep early upon signal interrupt. Checking for termination again before waiting. + if (terminated || cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) { break; } - + LOG_DEBUG("Playing..."); - // Flushing to remove EOS status - // GstEvent* flush_start = gst_event_new_flush_start(); - // gst_element_send_event(pipeline, flush_start); GstEvent* flush_stop = gst_event_new_flush_stop(true); gst_element_send_event(pipeline, flush_stop); } @@ -163,7 +176,9 @@ int main (int argc, char *argv[]) /* GStreamer Initialisation */ gst_init (&argc, &argv); + /* Check input arguments */ + if (argc > 3 || argc < 2) { g_printerr ("Usage: %s \n", argv[0]); return -1; From 7e72aa7c0e2488ac162c8b8c1d3b14a45b586af5 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:09:10 -0700 Subject: [PATCH 09/27] Revert changes to original sample --- samples/kvssink_gstreamer_sample.cpp | 32 ++++++---------------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/samples/kvssink_gstreamer_sample.cpp b/samples/kvssink_gstreamer_sample.cpp index b2b23cba..6ba4bd1d 100644 --- a/samples/kvssink_gstreamer_sample.cpp +++ b/samples/kvssink_gstreamer_sample.cpp @@ -166,8 +166,8 @@ static void eos_cb(GstElement *sink, GstMessage *message, CustomData *data) { data->file_list.at(data->current_file_idx).last_fragment_ts = data->key_frame_pts; } } - //LOG_DEBUG("Terminating pipeline due to EOS"); - //g_main_loop_quit(data->main_loop); + LOG_DEBUG("Terminating pipeline due to EOS"); + g_main_loop_quit(data->main_loop); } /* This function is called when an error message is posted on the bus */ @@ -356,9 +356,9 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem encoder = gst_element_factory_make("vtenc_h264_hw", "encoder"); if (encoder) { vtenc = true; - source = gst_element_factory_make("autovideosrc", "source"); + source = gst_element_factory_make("videotestsrc", "source"); if (source) { - LOG_DEBUG("Using autovideosrc"); + LOG_DEBUG("Using videotestsrc"); } else { LOG_ERROR("Failed to create videotestsrc"); return 1; @@ -403,7 +403,7 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem /* configure source */ if (vtenc) { - // g_object_set(G_OBJECT(source), "is-live", TRUE, NULL); + g_object_set(G_OBJECT(source), "is-live", TRUE, NULL); } else { g_object_set(G_OBJECT(source), "do-timestamp", TRUE, "device", "/dev/video0", NULL); } @@ -729,26 +729,9 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) { std::thread stream_timer(timer, data); stream_timer.detach(); } - LOG_DEBUG("before main loop"); data->main_loop = g_main_loop_new(NULL, FALSE); - std::thread mainLoopThread(g_main_loop_run, data->main_loop); - - sleep(10); - LOG_DEBUG("Pausing..."); - GstEvent* eos; - eos = gst_event_new_eos(); - gst_element_send_event(pipeline, eos); - - sleep(10); - LOG_DEBUG("Playing..."); - GstEvent* flush_start = gst_event_new_flush_start(); - gst_element_send_event(pipeline, flush_start); - GstEvent*flush_stop = gst_event_new_flush_stop(true); - gst_element_send_event(pipeline, flush_stop); - - - mainLoopThread.join(); + g_main_loop_run(data->main_loop); LOG_DEBUG("after main loop") @@ -890,5 +873,4 @@ int main(int argc, char *argv[]) { } return 0; -} - +} \ No newline at end of file From 0a958dab8e9b080e1e320d58645e91e6f494741d Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:10:24 -0700 Subject: [PATCH 10/27] more --- samples/kvssink_gstreamer_sample.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/kvssink_gstreamer_sample.cpp b/samples/kvssink_gstreamer_sample.cpp index 6ba4bd1d..a3f98230 100644 --- a/samples/kvssink_gstreamer_sample.cpp +++ b/samples/kvssink_gstreamer_sample.cpp @@ -873,4 +873,4 @@ int main(int argc, char *argv[]) { } return 0; -} \ No newline at end of file +} From 137a8b03fc19bb8b61c1a872298aff5b35271f0c Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:11:24 -0700 Subject: [PATCH 11/27] more --- samples/kvssink_gstreamer_sample.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/kvssink_gstreamer_sample.cpp b/samples/kvssink_gstreamer_sample.cpp index a3f98230..ade28782 100644 --- a/samples/kvssink_gstreamer_sample.cpp +++ b/samples/kvssink_gstreamer_sample.cpp @@ -874,3 +874,4 @@ int main(int argc, char *argv[]) { return 0; } + From 12bbc8d9cd279e12a6d5c7d6d435677bba3cd4cf Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:18:49 -0700 Subject: [PATCH 12/27] Cleanup kvssink --- src/gstreamer/gstkvssink.cpp | 39 ++++++++++-------------------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 751bfe0b..1cd51e19 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -1188,7 +1188,9 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads, } case GST_EVENT_EOS: { LOG_INFO("EOS Event received in sink for " << kvssink->stream_name); - // This is necessary for application to get eos message. + /* "The downstream element should forward the EOS event to its downstream peer elements. + This way the event will eventually reach the sinks which should then post an EOS message + on the bus when in PLAYING." - GStreamer, Events, EOS */ GstMessage * message = gst_message_new_eos (GST_OBJECT_CAST (kvssink)); gst_element_post_message (GST_ELEMENT_CAST (kvssink), message); break; @@ -1260,27 +1262,6 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, STATUS put_frame_status = STATUS_SUCCESS; info.data = NULL; - // eos reached - // if (buf == NULL && track_data == NULL) { - // LOG_INFO("Received event for " << kvssink->stream_name); - // // Need this check in case pipeline is already being set to NULL and - // // stream is being or/already stopped. Although stopSync() is an idempotent call, - // // we want to avoid an extra call. It is not possible for this callback to be invoked - // // after stopSync() since we stop collecting on pads before invoking. But having this - // // check anyways in case it happens - // if(!data->streamingStopped.load()) { - // // data->kinesis_video_stream->stopSync(); - // // data->streamingStopped.store(true); - // // LOG_INFO("Sending eos for " << kvssink->stream_name); - // } - - // // send out eos message to gstreamer bus - // message = gst_message_new_eos (GST_OBJECT_CAST (kvssink)); - // gst_element_post_message (GST_ELEMENT_CAST (kvssink), message); - - // ret = GST_FLOW_EOS; - // goto CleanUp; - // } if (STATUS_FAILED(stream_status)) { // in offline case, we cant tell the pipeline to restream the file again in case of network outage. @@ -1300,9 +1281,6 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, } if(buf != NULL) { - - // GST_BUFFER_FLAG_IS_SET uses bitwise & to compare, 1536 = GST_BUFFER_FLAG_MARKER (512) + GST_BUFFER_FLAG_HEADER (1024) - isDroppable = GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_CORRUPTED) || GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DECODE_ONLY) || (GST_BUFFER_FLAGS(buf) == GST_BUFFER_FLAG_DISCONT) || @@ -1370,9 +1348,9 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, } } - put_frame(kvssink->data, info.data, info.size, - std::chrono::nanoseconds(buf->pts), - std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count); + put_frame_status = put_frame(data, info.data, info.size, + std::chrono::nanoseconds(buf->pts), + std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count); data->frame_count++; } else { @@ -1388,6 +1366,11 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, gst_buffer_unref (buf); } + if (STATUS_FAILED(put_frame_status)) { + GST_ELEMENT_WARNING (kvssink, RESOURCE, WRITE, (NULL), + ("put frame error occurred. Status: 0x%08x", put_frame_status)); + } + return ret; } From 76f6d127130b60264729b432c63c9676cd699bd1 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 11:00:09 -0700 Subject: [PATCH 13/27] Cleanup intermittent sample and kvssink --- samples/kvssink_intermittent_sample.cpp | 68 ++++++++++++------------- src/gstreamer/gstkvssink.cpp | 12 ++--- 2 files changed, 35 insertions(+), 45 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index db403ad9..ca8f82e8 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -1,7 +1,7 @@ -#include // std::chrono::seconds -#include // std::thread -#include // std::mutex, std::unique_lock -#include // std::condition_variable, std::cv_status +#include +#include +#include +#include #include #include @@ -11,11 +11,12 @@ using namespace com::amazonaws::kinesis::video; using namespace log4cplus; -LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); - #define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 120 #define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 120 + +LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); + GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); std::atomic terminated(FALSE); std::atomic eosHandled(FALSE); @@ -34,7 +35,6 @@ typedef struct _CustomData { GMainLoop *main_loop; GstElement *pipeline; - } CustomData; void sigint_handler(int sigint){ @@ -55,7 +55,7 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data) switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_EOS: { - LOG_DEBUG("Received EOS"); + LOG_DEBUG("[KVS sample] Received EOS message"); if(!terminated) { LOG_DEBUG("[TESTING] HANDLING EOS"); // Flushing to remove EOS status @@ -74,7 +74,7 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data) gst_message_parse_error (msg, &error, &debug); g_free (debug); - g_printerr ("Error: %s\n", error->message); + LOG_ERROR("[KVS sample] GStreamer error: %s", error->message); g_error_free (error); g_main_loop_quit (loop); @@ -120,7 +120,7 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { } } -// This function handles the starting and stopping of the stream. +// This function handles the intermittent starting and stopping of the stream in a loop. void stopStartLoop(GstElement *pipeline) { std::mutex cv_m; std::unique_lock lck(cv_m); @@ -131,21 +131,18 @@ void stopStartLoop(GstElement *pipeline) { break; } - LOG_DEBUG("Pausing..."); - // EOS event is necessary to push frames buffered by the h264enc element - GstEvent* eos; - eos = gst_event_new_eos(); - eosHandled = FALSE; + LOG_INFO("[KVS sample] Stopping stream to KVS for %d seconds", KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS); - LOG_DEBUG("[TESTING] SENDING EOS"); + // EOS event pushes frames buffered by the h264enc element down to kvssink. + GstEvent* eos = gst_event_new_eos(); + eosHandled = FALSE; gst_element_send_event(pipeline, eos); - // Wait for the EOS event to return to bus. - LOG_DEBUG("[TESTING] WAITING FOR EOS"); + // Wait for the EOS event to return from kvssink to the bus which means all elements are done handling the EOS. + // We don't want to flush until the EOS is done to ensure all frames buffered in the pipeline have been processed. eosHandled.wait(FALSE); - LOG_DEBUG("[TESTING] EOS RETURNED"); - // Flushing to remove EOS status + // Flushing to remove EOS status. GstEvent* flush_start = gst_event_new_flush_start(); gst_element_send_event(pipeline, flush_start); @@ -154,11 +151,11 @@ void stopStartLoop(GstElement *pipeline) { break; } - LOG_DEBUG("Playing..."); + LOG_INFO("[KVS sample] Starting stream to KVS for %d seconds", KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS); GstEvent* flush_stop = gst_event_new_flush_stop(true); gst_element_send_event(pipeline, flush_stop); } - LOG_DEBUG("Exited stopStartLoop"); + LOG_DEBUG("[KVS sample] Exited stopStartLoop"); } int main (int argc, char *argv[]) @@ -173,14 +170,14 @@ int main (int argc, char *argv[]) StreamSource source_type; char stream_name[MAX_STREAM_NAME_LEN + 1]; - /* GStreamer Initialisation */ + /* GStreamer Initialization */ gst_init (&argc, &argv); /* Check input arguments */ if (argc > 3 || argc < 2) { - g_printerr ("Usage: %s \n", argv[0]); + LOG_ERROR("[KVS sample] Usage: %s ", argv[0]); return -1; } @@ -194,6 +191,9 @@ int main (int argc, char *argv[]) source_type = DEVICE_SOURCE; } else if (0 == STRCMPI(argv[2], "rtspsrc")) { source_type = RTSP_SOURCE; + } else { + LOG_ERROR("[KVS sample] Usage: %s ", argv[0]); + return -1; } } else { source_type = TEST_SOURCE; @@ -226,14 +226,13 @@ int main (int argc, char *argv[]) /* encoder */ encoder = gst_element_factory_make("x264enc", "encoder"); - g_object_set(G_OBJECT(encoder), "bframes", 0, NULL); // TODO: put back the zerolatency tune + g_object_set(G_OBJECT(encoder), "bframes", 0, NULL); /* sink filter */ sink_filter = gst_element_factory_make("capsfilter", "sink-filter"); sink_caps = gst_caps_new_simple("video/x-h264", "stream-format", G_TYPE_STRING, "avc", "alignment", G_TYPE_STRING, "au", - // "framerate", GST_TYPE_FRACTION, 20, 1, NULL); g_object_set(G_OBJECT(sink_filter), "caps", sink_caps, NULL); gst_caps_unref(sink_caps); @@ -247,13 +246,12 @@ int main (int argc, char *argv[]) /* Check that GStreamer elements were all successfully created */ if (!kvssink) { - LOG_ERROR("Failed to create kvssink element"); + LOG_ERROR("[KVS sample] Failed to create kvssink element"); return -1; } if (!pipeline || !source || !video_convert || !source_filter || !encoder) { - // TODO: Gprint -> log4cplus - g_printerr ("Not all GStreamer elements could be created.\n"); + LOG_ERROR("[KVS sample] Not all GStreamer elements could be created."); return -1; } @@ -272,28 +270,26 @@ int main (int argc, char *argv[]) /* Link the elements together */ if (!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, kvssink, NULL)) { - g_printerr("Elements could not be linked.\n"); + LOG_ERROR("[KVS sample] Elements could not be linked"); gst_object_unref(pipeline); return -1; } /* Set the pipeline to "playing" state*/ - g_print ("Playing..\n"); + LOG_INFO("[KVS sample] Starting stream to KVS for %d seconds", KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS); gst_element_set_state (pipeline, GST_STATE_PLAYING); - /* Start stop/start thread for intermittent streaming */ + /* Start the stop/start thread for intermittent streaming */ std::thread stopStartThread(stopStartLoop, pipeline); - g_print ("Running main loop...\n"); + LOG_ERROR("[KVS sample] Starter GStreamer main loop"); g_main_loop_run (main_loop); stopStartThread.join(); /* Out of the main loop, clean up */ - g_print ("Returned, stopping playback\n"); + LOG_INFO("[KVS sample] Streaming terminated, cleaning up"); gst_element_set_state (pipeline, GST_STATE_NULL); - - g_print ("Deleting pipeline\n"); gst_object_unref (GST_OBJECT (pipeline)); g_source_remove (bus_watch_id); g_main_loop_unref (main_loop); diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 1cd51e19..8af36431 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -1188,6 +1188,7 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads, } case GST_EVENT_EOS: { LOG_INFO("EOS Event received in sink for " << kvssink->stream_name); + /* "The downstream element should forward the EOS event to its downstream peer elements. This way the event will eventually reach the sinks which should then post an EOS message on the bus when in PLAYING." - GStreamer, Events, EOS */ @@ -1648,15 +1649,8 @@ gst_kvs_sink_change_state(GstElement *element, GstStateChange transition) { // Downward transitions switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: - // Need this check in case an EOS was received in the buffer handler and - // stream was already stopped. Although stopSync() is an idempotent call, - // we want to avoid an extra call - if(!data->streamingStopped.load()) { - data->kinesis_video_stream->stopSync(); - data->streamingStopped.store(true); - } else { - LOG_INFO("Streaming already stopped for " << kvssink->stream_name); - } + data->kinesis_video_stream->stopSync(); + data->streamingStopped.store(true); LOG_INFO("Stopped kvssink for " << kvssink->stream_name); break; case GST_STATE_CHANGE_READY_TO_NULL: From 63b619fa9585de8af599316980a4d2694469861a Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 11:19:53 -0700 Subject: [PATCH 14/27] more --- samples/kvssink_intermittent_sample.cpp | 47 +++++++++++-------------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index ca8f82e8..47c35948 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -11,8 +11,9 @@ using namespace com::amazonaws::kinesis::video; using namespace log4cplus; -#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 120 -#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 120 +/* modify these values to change start/stop interval */ +#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 8 +#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 8 LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); @@ -41,7 +42,6 @@ void sigint_handler(int sigint){ LOG_DEBUG("SIGINT received. Exiting..."); terminated = TRUE; cv.notify_all(); - //eosHandled.notify_all(); if(main_loop != NULL){ g_main_loop_quit(main_loop); } @@ -56,12 +56,6 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data) switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_EOS: { LOG_DEBUG("[KVS sample] Received EOS message"); - if(!terminated) { - LOG_DEBUG("[TESTING] HANDLING EOS"); - // Flushing to remove EOS status - GstEvent* flush_start = gst_event_new_flush_start(); - gst_element_send_event(pipeline, flush_start); - } eosHandled = TRUE; eosHandled.notify_all(); break; @@ -74,7 +68,7 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data) gst_message_parse_error (msg, &error, &debug); g_free (debug); - LOG_ERROR("[KVS sample] GStreamer error: %s", error->message); + LOG_ERROR("[KVS sample] GStreamer error: " << error->message); g_error_free (error); g_main_loop_quit (loop); @@ -101,7 +95,7 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) && nullptr != (role_alias = getenv("ROLE_ALIAS")) && nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) { - // set the IoT Credentials if provided in envvar + // Set the IoT Credentials if provided in envvar. GstStructure *iot_credentials = gst_structure_new( "iot-certificate", "iot-thing-name", G_TYPE_STRING, streamName, @@ -114,7 +108,7 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); gst_structure_free(iot_credentials); // kvssink will search for long term credentials in envvar automatically so no need to include here - // if no long credentials or IoT credentials provided will look for credential file as last resort + // if no long credentials or IoT credentials provided will look for credential file as last resort. } else if(nullptr != (credential_path = getenv("AWS_CREDENTIAL_PATH"))){ g_object_set(G_OBJECT (kvssink), "credential-path", credential_path, NULL); } @@ -131,7 +125,7 @@ void stopStartLoop(GstElement *pipeline) { break; } - LOG_INFO("[KVS sample] Stopping stream to KVS for %d seconds", KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS); + LOG_INFO("[KVS sample] Stopping stream to KVS for " << KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS << " seconds"); // EOS event pushes frames buffered by the h264enc element down to kvssink. GstEvent* eos = gst_event_new_eos(); @@ -151,7 +145,7 @@ void stopStartLoop(GstElement *pipeline) { break; } - LOG_INFO("[KVS sample] Starting stream to KVS for %d seconds", KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS); + LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds"); GstEvent* flush_stop = gst_event_new_flush_stop(true); gst_element_send_event(pipeline, flush_stop); } @@ -170,14 +164,13 @@ int main (int argc, char *argv[]) StreamSource source_type; char stream_name[MAX_STREAM_NAME_LEN + 1]; - /* GStreamer Initialization */ gst_init (&argc, &argv); /* Check input arguments */ if (argc > 3 || argc < 2) { - LOG_ERROR("[KVS sample] Usage: %s ", argv[0]); + LOG_ERROR("[KVS sample] Usage: " << argv[0] << " "); return -1; } @@ -192,7 +185,7 @@ int main (int argc, char *argv[]) } else if (0 == STRCMPI(argv[2], "rtspsrc")) { source_type = RTSP_SOURCE; } else { - LOG_ERROR("[KVS sample] Usage: %s ", argv[0]); + LOG_ERROR("[KVS sample] Usage: " << argv[0] << " "); return -1; } } else { @@ -204,7 +197,7 @@ int main (int argc, char *argv[]) pipeline = gst_pipeline_new ("kvs-pipeline"); - /* configure source */ + /* source */ if(source_type == TEST_SOURCE) { source = gst_element_factory_make ("videotestsrc", "test-source"); } else if (source_type == DEVICE_SOURCE) { @@ -255,39 +248,39 @@ int main (int argc, char *argv[]) return -1; } - /* Populate data struct */ + // Populate data struct. customData.main_loop = main_loop; customData.pipeline = pipeline; - /* Add a message handler */ + //Add a message handler. bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); bus_watch_id = gst_bus_add_watch (bus, bus_call, &customData); gst_object_unref (bus); - /* Add elements into the pipeline */ + // Add elements into the pipeline. gst_bin_add_many (GST_BIN (pipeline), source, clock_overlay, video_convert, source_filter, encoder, kvssink, NULL); - /* Link the elements together */ + // Link the elements together. if (!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, kvssink, NULL)) { LOG_ERROR("[KVS sample] Elements could not be linked"); gst_object_unref(pipeline); return -1; } - /* Set the pipeline to "playing" state*/ - LOG_INFO("[KVS sample] Starting stream to KVS for %d seconds", KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS); + // Set the pipeline to playing state. + LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds"); gst_element_set_state (pipeline, GST_STATE_PLAYING); - /* Start the stop/start thread for intermittent streaming */ + // Start the stop/start thread for intermittent streaming. std::thread stopStartThread(stopStartLoop, pipeline); - LOG_ERROR("[KVS sample] Starter GStreamer main loop"); + LOG_ERROR("[KVS sample] Starting GStreamer main loop"); g_main_loop_run (main_loop); stopStartThread.join(); - /* Out of the main loop, clean up */ + // Application terminated, cleanup. LOG_INFO("[KVS sample] Streaming terminated, cleaning up"); gst_element_set_state (pipeline, GST_STATE_NULL); gst_object_unref (GST_OBJECT (pipeline)); From 91725e193ca6bfc24cda687d594f85d7bada7f71 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 12:54:41 -0700 Subject: [PATCH 15/27] Remove rtsp related things --- samples/kvssink_intermittent_sample.cpp | 41 +++++++++++++++---------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 47c35948..7849912f 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -12,8 +12,8 @@ using namespace com::amazonaws::kinesis::video; using namespace log4cplus; /* modify these values to change start/stop interval */ -#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 8 -#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 8 +#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 20 +#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 40 LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); @@ -25,8 +25,7 @@ std::condition_variable cv; typedef enum _StreamSource { TEST_SOURCE, - DEVICE_SOURCE, - RTSP_SOURCE + DEVICE_SOURCE } StreamSource; typedef struct _CustomData { @@ -41,7 +40,9 @@ typedef struct _CustomData { void sigint_handler(int sigint){ LOG_DEBUG("SIGINT received. Exiting..."); terminated = TRUE; + eosHandled = TRUE; cv.notify_all(); + eosHandled.notify_all(); if(main_loop != NULL){ g_main_loop_quit(main_loop); } @@ -167,28 +168,34 @@ int main (int argc, char *argv[]) gst_init (&argc, &argv); - /* Check input arguments */ + /* Parse input arguments */ + // Check for invalid argument count. if (argc > 3 || argc < 2) { - LOG_ERROR("[KVS sample] Usage: " << argv[0] << " "); + LOG_ERROR("[KVS sample] Invalid argument count"); + LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); return -1; } + // Get stream name. STRNCPY(stream_name, argv[1], MAX_STREAM_NAME_LEN); stream_name[MAX_STREAM_NAME_LEN] = '\0'; + // Get source type. if(argc > 2) { if(0 == STRCMPI(argv[2], "testsrc")) { + LOG_INFO("[KVS sample] Using test source (videotestsrc)"); source_type = TEST_SOURCE; } else if (0 == STRCMPI(argv[2], "devicesrc")) { + LOG_INFO("[KVS sample] Using device source (autovideosrc)"); source_type = DEVICE_SOURCE; - } else if (0 == STRCMPI(argv[2], "rtspsrc")) { - source_type = RTSP_SOURCE; } else { - LOG_ERROR("[KVS sample] Usage: " << argv[0] << " "); + LOG_ERROR("[KVS sample] Invalid source type"); + LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); return -1; } } else { + LOG_ERROR("[KVS sample] No source specified, defualting to test source (videotestsrc)"); source_type = TEST_SOURCE; } @@ -199,11 +206,11 @@ int main (int argc, char *argv[]) /* source */ if(source_type == TEST_SOURCE) { - source = gst_element_factory_make ("videotestsrc", "test-source"); + source = gst_element_factory_make ("videotestsrc", "test-source"); + g_object_set(G_OBJECT(source), "is-live", TRUE, NULL); } else if (source_type == DEVICE_SOURCE) { - source = gst_element_factory_make ("autovideosrc", "device-source"); - } else { // RTSP_SOURCE - } + source = gst_element_factory_make ("autovideosrc", "device-source"); + } /* clock overlay */ clock_overlay = gst_element_factory_make("clockoverlay", "clock_overlay"); @@ -243,7 +250,7 @@ int main (int argc, char *argv[]) return -1; } - if (!pipeline || !source || !video_convert || !source_filter || !encoder) { + if (!pipeline || !source || !clock_overlay || !video_convert || !source_filter || !encoder || !sink_filter) { LOG_ERROR("[KVS sample] Not all GStreamer elements could be created."); return -1; } @@ -259,10 +266,10 @@ int main (int argc, char *argv[]) // Add elements into the pipeline. gst_bin_add_many (GST_BIN (pipeline), - source, clock_overlay, video_convert, source_filter, encoder, kvssink, NULL); + source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL); // Link the elements together. - if (!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, kvssink, NULL)) { + if (!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL)) { LOG_ERROR("[KVS sample] Elements could not be linked"); gst_object_unref(pipeline); return -1; @@ -275,7 +282,7 @@ int main (int argc, char *argv[]) // Start the stop/start thread for intermittent streaming. std::thread stopStartThread(stopStartLoop, pipeline); - LOG_ERROR("[KVS sample] Starting GStreamer main loop"); + LOG_INFO("[KVS sample] Starting GStreamer main loop"); g_main_loop_run (main_loop); stopStartThread.join(); From 7dafff883fdd8dcfd53787b42643fe2d60790425 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:31:46 -0700 Subject: [PATCH 16/27] Switch to cv wait for all waits --- samples/kvssink_intermittent_sample.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 7849912f..e4b042ce 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -20,7 +20,6 @@ LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); std::atomic terminated(FALSE); -std::atomic eosHandled(FALSE); std::condition_variable cv; typedef enum _StreamSource { @@ -40,9 +39,7 @@ typedef struct _CustomData { void sigint_handler(int sigint){ LOG_DEBUG("SIGINT received. Exiting..."); terminated = TRUE; - eosHandled = TRUE; cv.notify_all(); - eosHandled.notify_all(); if(main_loop != NULL){ g_main_loop_quit(main_loop); } @@ -57,8 +54,7 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data) switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_EOS: { LOG_DEBUG("[KVS sample] Received EOS message"); - eosHandled = TRUE; - eosHandled.notify_all(); + cv.notify_all(); break; } @@ -130,12 +126,11 @@ void stopStartLoop(GstElement *pipeline) { // EOS event pushes frames buffered by the h264enc element down to kvssink. GstEvent* eos = gst_event_new_eos(); - eosHandled = FALSE; gst_element_send_event(pipeline, eos); // Wait for the EOS event to return from kvssink to the bus which means all elements are done handling the EOS. // We don't want to flush until the EOS is done to ensure all frames buffered in the pipeline have been processed. - eosHandled.wait(FALSE); + cv.wait(lck); // Flushing to remove EOS status. GstEvent* flush_start = gst_event_new_flush_start(); From 39817bc61e924dd6b7db21e796fb49e15c8f2862 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 12:28:55 -0700 Subject: [PATCH 17/27] Address some PR comments --- CMakeLists.txt | 6 +- samples/kvssink_intermittent_sample.cpp | 94 +++++++++++++------------ 2 files changed, 52 insertions(+), 48 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2b5dcbb9..cdba2316 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -243,10 +243,10 @@ if(BUILD_GSTREAMER_PLUGIN) target_link_libraries(gstkvssink PRIVATE ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} KinesisVideoProducer) add_executable(kvssink_gstreamer_sample samples/kvssink_gstreamer_sample.cpp) - target_link_libraries(kvssink_gstreamer_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} kvspic) + target_link_libraries(kvssink_gstreamer_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} kvspicUtils) add_executable(kvs_gstreamer_sample samples/kvs_gstreamer_sample.cpp) - target_link_libraries(kvs_gstreamer_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} KinesisVideoProducer kvspic) + target_link_libraries(kvs_gstreamer_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} KinesisVideoProducer kvspicUtils) add_executable(kvs_gstreamer_multistream_sample samples/kvs_gstreamer_multistream_sample.cpp) target_link_libraries(kvs_gstreamer_multistream_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} KinesisVideoProducer) @@ -258,7 +258,7 @@ if(BUILD_GSTREAMER_PLUGIN) target_link_libraries(kvs_gstreamer_file_uploader_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES}) add_executable(kvssink_intermittent_sample samples/kvssink_intermittent_sample.cpp) - target_link_libraries(kvssink_intermittent_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} kvspic) + target_link_libraries(kvssink_intermittent_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES}) install( TARGETS gstkvssink diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index e4b042ce..804df42b 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -11,14 +11,14 @@ using namespace com::amazonaws::kinesis::video; using namespace log4cplus; -/* modify these values to change start/stop interval */ +/* Modify these values to change start/stop interval. */ #define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 20 #define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 40 LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); -GMainLoop *main_loop = g_main_loop_new (NULL, FALSE); +GMainLoop *main_loop = g_main_loop_new(NULL, FALSE); std::atomic terminated(FALSE); std::condition_variable cv; @@ -46,12 +46,12 @@ void sigint_handler(int sigint){ } static gboolean -bus_call (GstBus *bus, GstMessage *msg, gpointer data) +bus_call(GstBus *bus, GstMessage *msg, gpointer data) { GMainLoop *loop = (GMainLoop *) ((CustomData *)data)->main_loop; GstElement *pipeline = (GstElement *) ((CustomData *)data)->pipeline; - switch (GST_MESSAGE_TYPE (msg)) { + switch(GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_EOS: { LOG_DEBUG("[KVS sample] Received EOS message"); cv.notify_all(); @@ -62,13 +62,13 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data) gchar *debug; GError *error; - gst_message_parse_error (msg, &error, &debug); - g_free (debug); + gst_message_parse_error(msg, &error, &debug); + g_free(debug); LOG_ERROR("[KVS sample] GStreamer error: " << error->message); - g_error_free (error); + g_error_free(error); - g_main_loop_quit (loop); + g_main_loop_quit(loop); break; } @@ -87,11 +87,11 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { char const *role_alias; char const *ca_cert_path; char const *credential_path; - if (nullptr != (iot_credential_endpoint = getenv("IOT_GET_CREDENTIAL_ENDPOINT")) && - nullptr != (cert_path = getenv("CERT_PATH")) && - nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) && - nullptr != (role_alias = getenv("ROLE_ALIAS")) && - nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) { + if(nullptr != (iot_credential_endpoint = GETENV("IOT_GET_CREDENTIAL_ENDPOINT")) && + nullptr != (cert_path = GETENV("CERT_PATH")) && + nullptr != (private_key_path = GETENV("PRIVATE_KEY_PATH")) && + nullptr != (role_alias = GETENV("ROLE_ALIAS")) && + nullptr != (ca_cert_path = GETENV("CA_CERT_PATH"))) { // Set the IoT Credentials if provided in envvar. GstStructure *iot_credentials = gst_structure_new( "iot-certificate", @@ -102,12 +102,12 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { "ca-path", G_TYPE_STRING, ca_cert_path, "role-aliases", G_TYPE_STRING, role_alias, NULL); - g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); + g_object_set(G_OBJECT(kvssink), "iot-certificate", iot_credentials, NULL); gst_structure_free(iot_credentials); // kvssink will search for long term credentials in envvar automatically so no need to include here // if no long credentials or IoT credentials provided will look for credential file as last resort. - } else if(nullptr != (credential_path = getenv("AWS_CREDENTIAL_PATH"))){ - g_object_set(G_OBJECT (kvssink), "credential-path", credential_path, NULL); + } else if(nullptr != (credential_path = GETENV("AWS_CREDENTIAL_PATH"))){ + g_object_set(G_OBJECT(kvssink), "credential-path", credential_path, NULL); } } @@ -116,9 +116,9 @@ void stopStartLoop(GstElement *pipeline) { std::mutex cv_m; std::unique_lock lck(cv_m); - while (!terminated) { + while(!terminated) { // Using cv.wait_for to break sleep early upon signal interrupt. - if (cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) { + if(cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) { break; } @@ -137,7 +137,7 @@ void stopStartLoop(GstElement *pipeline) { gst_element_send_event(pipeline, flush_start); // Using cv.wait_for to break sleep early upon signal interrupt. Checking for termination again before waiting. - if (terminated || cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) { + if(terminated || cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) { break; } @@ -148,7 +148,7 @@ void stopStartLoop(GstElement *pipeline) { LOG_DEBUG("[KVS sample] Exited stopStartLoop"); } -int main (int argc, char *argv[]) +int main(int argc, char *argv[]) { signal(SIGINT, sigint_handler); @@ -160,14 +160,18 @@ int main (int argc, char *argv[]) StreamSource source_type; char stream_name[MAX_STREAM_NAME_LEN + 1]; - gst_init (&argc, &argv); + gst_init(&argc, &argv); /* Parse input arguments */ // Check for invalid argument count. - if (argc > 3 || argc < 2) { - LOG_ERROR("[KVS sample] Invalid argument count"); + if(argc > 3) { + LOG_ERROR("[KVS sample] Invalid argument count, too many arguments."); + LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); + return -1; + } else if(argc < 2) { + LOG_ERROR("[KVS sample] Invalid argument count, not enough arguments."); LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); return -1; } @@ -181,7 +185,7 @@ int main (int argc, char *argv[]) if(0 == STRCMPI(argv[2], "testsrc")) { LOG_INFO("[KVS sample] Using test source (videotestsrc)"); source_type = TEST_SOURCE; - } else if (0 == STRCMPI(argv[2], "devicesrc")) { + } else if(0 == STRCMPI(argv[2], "devicesrc")) { LOG_INFO("[KVS sample] Using device source (autovideosrc)"); source_type = DEVICE_SOURCE; } else { @@ -197,14 +201,14 @@ int main (int argc, char *argv[]) /* Create GStreamer elements */ - pipeline = gst_pipeline_new ("kvs-pipeline"); + pipeline = gst_pipeline_new("kvs-pipeline"); /* source */ if(source_type == TEST_SOURCE) { - source = gst_element_factory_make ("videotestsrc", "test-source"); + source = gst_element_factory_make("videotestsrc", "test-source"); g_object_set(G_OBJECT(source), "is-live", TRUE, NULL); - } else if (source_type == DEVICE_SOURCE) { - source = gst_element_factory_make ("autovideosrc", "device-source"); + } else if(source_type == DEVICE_SOURCE) { + source = gst_element_factory_make("autovideosrc", "device-source"); } /* clock overlay */ @@ -240,12 +244,12 @@ int main (int argc, char *argv[]) /* Check that GStreamer elements were all successfully created */ - if (!kvssink) { + if(!kvssink) { LOG_ERROR("[KVS sample] Failed to create kvssink element"); return -1; } - if (!pipeline || !source || !clock_overlay || !video_convert || !source_filter || !encoder || !sink_filter) { + if(!pipeline || !source || !clock_overlay || !video_convert || !source_filter || !encoder || !sink_filter) { LOG_ERROR("[KVS sample] Not all GStreamer elements could be created."); return -1; } @@ -254,40 +258,40 @@ int main (int argc, char *argv[]) customData.main_loop = main_loop; customData.pipeline = pipeline; - //Add a message handler. - bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); - bus_watch_id = gst_bus_add_watch (bus, bus_call, &customData); - gst_object_unref (bus); + // Add a message handler. + bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + bus_watch_id = gst_bus_add_watch(bus, bus_call, &customData); + gst_object_unref(bus); // Add elements into the pipeline. - gst_bin_add_many (GST_BIN (pipeline), + gst_bin_add_many(GST_BIN(pipeline), source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL); // Link the elements together. - if (!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL)) { - LOG_ERROR("[KVS sample] Elements could not be linked"); - gst_object_unref(pipeline); - return -1; + if(!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL)) { + LOG_ERROR("[KVS sample] Elements could not be linked"); + gst_object_unref(pipeline); + return -1; } // Set the pipeline to playing state. LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds"); - gst_element_set_state (pipeline, GST_STATE_PLAYING); + gst_element_set_state(pipeline, GST_STATE_PLAYING); // Start the stop/start thread for intermittent streaming. std::thread stopStartThread(stopStartLoop, pipeline); LOG_INFO("[KVS sample] Starting GStreamer main loop"); - g_main_loop_run (main_loop); + g_main_loop_run(main_loop); stopStartThread.join(); // Application terminated, cleanup. LOG_INFO("[KVS sample] Streaming terminated, cleaning up"); - gst_element_set_state (pipeline, GST_STATE_NULL); - gst_object_unref (GST_OBJECT (pipeline)); - g_source_remove (bus_watch_id); - g_main_loop_unref (main_loop); + gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(GST_OBJECT(pipeline)); + g_source_remove(bus_watch_id); + g_main_loop_unref(main_loop); return 0; } From 533da5b7d61aa321d7cf317d098194c7311c534b Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 12:50:10 -0700 Subject: [PATCH 18/27] Link with kvspicUtils --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index cdba2316..d31b8cc2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -257,8 +257,8 @@ if(BUILD_GSTREAMER_PLUGIN) add_executable(kvs_gstreamer_file_uploader_sample samples/kvs_gstreamer_file_uploader_sample.cpp) target_link_libraries(kvs_gstreamer_file_uploader_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES}) - add_executable(kvssink_intermittent_sample samples/kvssink_intermittent_sample.cpp) - target_link_libraries(kvssink_intermittent_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES}) + add_executable(kvssink_intermittent_sample samples/kvssink_intermittent_sample.cpp ) + target_link_libraries(kvssink_intermittent_sample ${LOG4CPLUS_LIBRARIES} ${GST_APP_LIBRARIES} kvspicUtils) install( TARGETS gstkvssink From 19794659db31b782a486d2a6f267484ea98b4494 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 16:24:48 -0700 Subject: [PATCH 19/27] kvssink to send eofr upon eos, fix testsrc not stopping issue --- samples/kvssink_intermittent_sample.cpp | 36 ++++++++++++++++++------- src/gstreamer/gstkvssink.cpp | 17 +++++++++--- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 804df42b..ee173cd8 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -15,6 +15,9 @@ using namespace log4cplus; #define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 20 #define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 40 +#define KVS_GST_TEST_SOURCE_NAME "test-source" +#define KVS_GST_DEVICE_SOURCE_NAME "device-source" + LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); @@ -36,11 +39,11 @@ typedef struct _CustomData { GstElement *pipeline; } CustomData; -void sigint_handler(int sigint){ +void sigint_handler(int sigint) { LOG_DEBUG("SIGINT received. Exiting..."); terminated = TRUE; cv.notify_all(); - if(main_loop != NULL){ + if(main_loop != NULL) { g_main_loop_quit(main_loop); } } @@ -106,13 +109,13 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { gst_structure_free(iot_credentials); // kvssink will search for long term credentials in envvar automatically so no need to include here // if no long credentials or IoT credentials provided will look for credential file as last resort. - } else if(nullptr != (credential_path = GETENV("AWS_CREDENTIAL_PATH"))){ + } else if(nullptr != (credential_path = GETENV("AWS_CREDENTIAL_PATH"))) { g_object_set(G_OBJECT(kvssink), "credential-path", credential_path, NULL); } } // This function handles the intermittent starting and stopping of the stream in a loop. -void stopStartLoop(GstElement *pipeline) { +void stopStartLoop(GstElement *pipeline, GstElement *source) { std::mutex cv_m; std::unique_lock lck(cv_m); @@ -132,6 +135,12 @@ void stopStartLoop(GstElement *pipeline) { // We don't want to flush until the EOS is done to ensure all frames buffered in the pipeline have been processed. cv.wait(lck); + // Set videotestsrc to paused state because it does not stop producing frames upon EOS, + // and the frames are not cleared upon flushing. + if(strcmp(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { + gst_element_set_state(source, GST_STATE_PAUSED); + } + // Flushing to remove EOS status. GstEvent* flush_start = gst_event_new_flush_start(); gst_element_send_event(pipeline, flush_start); @@ -142,8 +151,15 @@ void stopStartLoop(GstElement *pipeline) { } LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds"); + + // Stop the flush now that we are resuming streaming. GstEvent* flush_stop = gst_event_new_flush_stop(true); gst_element_send_event(pipeline, flush_stop); + + // Set videotestsrc back to playing state. + if(strcmp(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { + gst_element_set_state(source, GST_STATE_PLAYING); + } } LOG_DEBUG("[KVS sample] Exited stopStartLoop"); } @@ -205,11 +221,11 @@ int main(int argc, char *argv[]) /* source */ if(source_type == TEST_SOURCE) { - source = gst_element_factory_make("videotestsrc", "test-source"); - g_object_set(G_OBJECT(source), "is-live", TRUE, NULL); + source = gst_element_factory_make("videotestsrc", KVS_GST_TEST_SOURCE_NAME); + g_object_set(G_OBJECT(source), "is-live", TRUE, "pattern", 18, "background-color", 0xff003181, "foreground-color", 0xffff9900, NULL); } else if(source_type == DEVICE_SOURCE) { - source = gst_element_factory_make("autovideosrc", "device-source"); - } + source = gst_element_factory_make("autovideosrc", KVS_GST_DEVICE_SOURCE_NAME); + } /* clock overlay */ clock_overlay = gst_element_factory_make("clockoverlay", "clock_overlay"); @@ -225,7 +241,7 @@ int main(int argc, char *argv[]) /* encoder */ encoder = gst_element_factory_make("x264enc", "encoder"); - g_object_set(G_OBJECT(encoder), "bframes", 0, NULL); + g_object_set(G_OBJECT(encoder), "bframes", 0, "key-int-max", 100, NULL); /* sink filter */ sink_filter = gst_element_factory_make("capsfilter", "sink-filter"); @@ -279,7 +295,7 @@ int main(int argc, char *argv[]) gst_element_set_state(pipeline, GST_STATE_PLAYING); // Start the stop/start thread for intermittent streaming. - std::thread stopStartThread(stopStartLoop, pipeline); + std::thread stopStartThread(stopStartLoop, pipeline, source); LOG_INFO("[KVS sample] Starting GStreamer main loop"); g_main_loop_run(main_loop); diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 8af36431..ebc6a5bc 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -1188,12 +1188,23 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads, } case GST_EVENT_EOS: { LOG_INFO("EOS Event received in sink for " << kvssink->stream_name); + + if(data && data->kinesis_video_stream) { + Frame eofr = EOFR_FRAME_INITIALIZER; + STATUS put_eofr_status = data->kinesis_video_stream->putFrame(eofr); + if(STATUS_FAILED(put_eofr_status)) { + LOG_WARN("Failed to put EOFR for " << kvssink->stream_name); + } + } else { + LOG_WARN("NULL pointer! Failed to put EOFR for " << kvssink->stream_name); + } + /* "The downstream element should forward the EOS event to its downstream peer elements. This way the event will eventually reach the sinks which should then post an EOS message - on the bus when in PLAYING." - GStreamer, Events, EOS */ - GstMessage * message = gst_message_new_eos (GST_OBJECT_CAST (kvssink)); - gst_element_post_message (GST_ELEMENT_CAST (kvssink), message); + on the bus when in PLAYING." - GStreamerDocs->Events->EOS */ + GstMessage * message = gst_message_new_eos(GST_OBJECT_CAST (kvssink)); + gst_element_post_message (GST_ELEMENT_CAST(kvssink), message); break; } default: From f993930f740d5ae6a439bf1e7a340f9fbb207ec3 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 16:50:41 -0700 Subject: [PATCH 20/27] Address review comments --- samples/kvssink_intermittent_sample.cpp | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index ee173cd8..ab2947b7 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -95,6 +95,7 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { nullptr != (private_key_path = GETENV("PRIVATE_KEY_PATH")) && nullptr != (role_alias = GETENV("ROLE_ALIAS")) && nullptr != (ca_cert_path = GETENV("CA_CERT_PATH"))) { + LOG_DEBUG("[KVS sample] Using IoT credentials."); // Set the IoT Credentials if provided in envvar. GstStructure *iot_credentials = gst_structure_new( "iot-certificate", @@ -110,7 +111,10 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) { // kvssink will search for long term credentials in envvar automatically so no need to include here // if no long credentials or IoT credentials provided will look for credential file as last resort. } else if(nullptr != (credential_path = GETENV("AWS_CREDENTIAL_PATH"))) { + LOG_DEBUG("[KVS sample] Using AWS_CREDENTIAL_PATH long term credentials."); g_object_set(G_OBJECT(kvssink), "credential-path", credential_path, NULL); + } else { + LOG_DEBUG("[KVS sample] Using credentials set by AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars."); } } @@ -222,13 +226,18 @@ int main(int argc, char *argv[]) /* source */ if(source_type == TEST_SOURCE) { source = gst_element_factory_make("videotestsrc", KVS_GST_TEST_SOURCE_NAME); - g_object_set(G_OBJECT(source), "is-live", TRUE, "pattern", 18, "background-color", 0xff003181, "foreground-color", 0xffff9900, NULL); + g_object_set(G_OBJECT(source), + "is-live", TRUE, + "pattern", 18, + "background-color", 0xff003181, + "foreground-color", 0xffff9900, NULL); } else if(source_type == DEVICE_SOURCE) { source = gst_element_factory_make("autovideosrc", KVS_GST_DEVICE_SOURCE_NAME); } /* clock overlay */ clock_overlay = gst_element_factory_make("clockoverlay", "clock_overlay"); + g_object_set(G_OBJECT(clock_overlay),"time-format", "%a %B %d, %Y %I:%M:%S %p", NULL); /* video convert */ video_convert = gst_element_factory_make("videoconvert", "video_convert"); @@ -241,7 +250,9 @@ int main(int argc, char *argv[]) /* encoder */ encoder = gst_element_factory_make("x264enc", "encoder"); - g_object_set(G_OBJECT(encoder), "bframes", 0, "key-int-max", 100, NULL); + g_object_set(G_OBJECT(encoder), + "bframes", 0, + "key-int-max", 120, NULL); /* sink filter */ sink_filter = gst_element_factory_make("capsfilter", "sink-filter"); From b92ccb78ba71915adccbedeb666d9c1b2a1c9753 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 18:08:39 -0700 Subject: [PATCH 21/27] Don't require stream name arg --- samples/kvssink_intermittent_sample.cpp | 29 ++++++++++++------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index ab2947b7..20fb704e 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -141,7 +141,7 @@ void stopStartLoop(GstElement *pipeline, GstElement *source) { // Set videotestsrc to paused state because it does not stop producing frames upon EOS, // and the frames are not cleared upon flushing. - if(strcmp(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { + if(STRCMPI(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { gst_element_set_state(source, GST_STATE_PAUSED); } @@ -161,7 +161,7 @@ void stopStartLoop(GstElement *pipeline, GstElement *source) { gst_element_send_event(pipeline, flush_stop); // Set videotestsrc back to playing state. - if(strcmp(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { + if(STRCMPI(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { gst_element_set_state(source, GST_STATE_PLAYING); } } @@ -178,28 +178,23 @@ int main(int argc, char *argv[]) GstBus *bus; guint bus_watch_id; StreamSource source_type; - char stream_name[MAX_STREAM_NAME_LEN + 1]; + char stream_name[MAX_STREAM_NAME_LEN + 1] = {0}; gst_init(&argc, &argv); /* Parse input arguments */ - // Check for invalid argument count. + // Check for invalid argument count, get stream name. if(argc > 3) { LOG_ERROR("[KVS sample] Invalid argument count, too many arguments."); - LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); - return -1; - } else if(argc < 2) { - LOG_ERROR("[KVS sample] Invalid argument count, not enough arguments."); - LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); + LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); return -1; + } else if(argc > 1) { + STRNCPY(stream_name, argv[1], MAX_STREAM_NAME_LEN); + stream_name[MAX_STREAM_NAME_LEN] = '\0'; } - // Get stream name. - STRNCPY(stream_name, argv[1], MAX_STREAM_NAME_LEN); - stream_name[MAX_STREAM_NAME_LEN] = '\0'; - // Get source type. if(argc > 2) { if(0 == STRCMPI(argv[2], "testsrc")) { @@ -210,7 +205,7 @@ int main(int argc, char *argv[]) source_type = DEVICE_SOURCE; } else { LOG_ERROR("[KVS sample] Invalid source type"); - LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); + LOG_INFO("[KVS sample] Usage: " << argv[0] << " "); return -1; } } else { @@ -265,7 +260,11 @@ int main(int argc, char *argv[]) /* kvssink */ kvssink = gst_element_factory_make("kvssink", "kvssink"); - g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); + if (stream_name[0] != '\0') { + g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); + } else { + LOG_INFO("No stream name specified, using default kvssink stream name.") + } determine_aws_credentials(kvssink, stream_name); From 4b0972464e6b9bbc5978c63f0f1a277cfefec344 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 18:36:41 -0700 Subject: [PATCH 22/27] Add sample instructions to README.md --- README.md | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a2875a70..1b0b5d21 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ If the library needs to be installed, run `make install`. This will install in d ## Run ### GStreamer Plugin (kvssink) -#### Loading Element +#### Loading kvssink Element The GStreamer plugin is located in your `build` directory. To load this plugin set the following environment variables. This should be run from the root of the repo, NOT the `build` directory. @@ -180,7 +180,7 @@ No such element or plugin 'kvssink' ``` -#### Using Element +#### Using kvssink Element The kvssink element has the following required parameters: * `stream-name` -- The name of the destination Kinesis video stream. @@ -190,7 +190,25 @@ The kvssink element has the following required parameters: * `credential-path` -- A path to a file containing your credentials for accessing Kinesis Video Streams. For example credential files, see Sample Static Credential and Sample Rotating Credential. For more information on rotating credentials, see Managing Access Keys for IAM Users. You must provide either this parameter or access-key and secret-key. -For examples of common use cases you can look at [Example: Kinesis Video Streams Producer SDK GStreamer Plugin](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html) +### Running kvssink Samples +The SDK comes with two programatic GStreamer samples: `kvssink_gstreamer_sample` and `kvssink_intermittent_sample`. For more use cases, see the CLI pipeline examples at [Example: Kinesis Video Streams Producer SDK GStreamer Plugin](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html). + +The programtic samples require for the AWS region to be set with the `AWS_DEFAULT_REGION` environment variable. For example: +``` +export AWS_DEFAULT_REGION=us-west-2 +``` + +After building the SDK, loading kvssink into the GStreamer plugin path, and setting a region, the sample executables, which are located in the `build` directory, can be run. + +#### kvssink Intermittent Sample +Usage: +``` +./kvssink_intermittent_sample +``` +Setting the source to `testsrc` will use [videotestsrc](https://gstreamer.freedesktop.org/documentation/videotestsrc/?gi-language=c) and to `devicesrc` will use [autovideosrc](https://gstreamer.freedesktop.org/documentation/autodetect/autovideosrc.html?gi-language=c). By default, kvssink uses "DEFAULT_STREAM" as the stream name, and the sample uses videotestsrc as the source. If a stream with the provided or default name does not exist, the stream will automatically be created. + +The intermittent kvssink sample will stream video for 20 seconds, then pause for 40 seconds, and repeat until an inturupt signal is received. To manually adjust the streaming/paused intervals, you can change the `KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS` and `KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS` values in the *kvssink_intermittent_sample.cpp* file. + ## Running in offline mode By default, the samples run in near realtime mode. To set offline mode, set streamInfo.streamCaps.streamingType to `STREAMING_TYPE_OFFLINE`, where, `streamInfo` is of type `StreamInfo`, `streamCaps` is of type `StreamCaps` and `streamingType` is of type `STREAMING_TYPE`. From 0c32012e826b8307320508a46b61d6c472b7e3fb Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 18:45:07 -0700 Subject: [PATCH 23/27] Fix typos in ReadMe, add language to code blocks --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1b0b5d21..74035ea7 100644 --- a/README.md +++ b/README.md @@ -191,10 +191,10 @@ The kvssink element has the following required parameters: ### Running kvssink Samples -The SDK comes with two programatic GStreamer samples: `kvssink_gstreamer_sample` and `kvssink_intermittent_sample`. For more use cases, see the CLI pipeline examples at [Example: Kinesis Video Streams Producer SDK GStreamer Plugin](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html). +The SDK comes with two programmatic GStreamer samples: `kvssink_gstreamer_sample` and `kvssink_intermittent_sample`. For more use cases, see the CLI pipeline examples at [Example: Kinesis Video Streams Producer SDK GStreamer Plugin](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html). -The programtic samples require for the AWS region to be set with the `AWS_DEFAULT_REGION` environment variable. For example: -``` +The programmatic samples require the AWS region to be set with the `AWS_DEFAULT_REGION` environment variable. For example: +```bash export AWS_DEFAULT_REGION=us-west-2 ``` @@ -202,12 +202,12 @@ After building the SDK, loading kvssink into the GStreamer plugin path, and sett #### kvssink Intermittent Sample Usage: -``` +```bash ./kvssink_intermittent_sample ``` Setting the source to `testsrc` will use [videotestsrc](https://gstreamer.freedesktop.org/documentation/videotestsrc/?gi-language=c) and to `devicesrc` will use [autovideosrc](https://gstreamer.freedesktop.org/documentation/autodetect/autovideosrc.html?gi-language=c). By default, kvssink uses "DEFAULT_STREAM" as the stream name, and the sample uses videotestsrc as the source. If a stream with the provided or default name does not exist, the stream will automatically be created. -The intermittent kvssink sample will stream video for 20 seconds, then pause for 40 seconds, and repeat until an inturupt signal is received. To manually adjust the streaming/paused intervals, you can change the `KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS` and `KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS` values in the *kvssink_intermittent_sample.cpp* file. +The intermittent kvssink sample will stream video for 20 seconds, then pause for 40 seconds, and repeat until an interrupt signal is received. To manually adjust the streaming and paused intervals, you can change the `KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS` and `KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS` values in the *kvssink_intermittent_sample.cpp* file. ## Running in offline mode From 0ad5deb4be0486ac42480d1812998b12b532780d Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 18 Apr 2024 21:52:43 -0700 Subject: [PATCH 24/27] Install pkgconfiglite --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 27442f27..cfaed44b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -321,6 +321,7 @@ jobs: Move-Item -Path "D:\a\amazon-kinesis-video-streams-producer-sdk-cpp\amazon-kinesis-video-streams-producer-sdk-cpp\*" -Destination "C:\amazon-kinesis-video-streams-producer-sdk-cpp" - name: Install dependencies run: | + choco install pkgconfiglite choco install nasm strawberryperl choco install gstreamer --version=1.22.8 choco install gstreamer-devel --version=1.22.8 From 566ad7cbb28b9ed7fff1a180596f43639a05c761 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 23 Apr 2024 10:54:20 -0700 Subject: [PATCH 25/27] Address comments --- samples/kvssink_intermittent_sample.cpp | 7 +++---- src/gstreamer/gstkvssink.cpp | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 20fb704e..9d76c2b1 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -192,7 +192,6 @@ int main(int argc, char *argv[]) return -1; } else if(argc > 1) { STRNCPY(stream_name, argv[1], MAX_STREAM_NAME_LEN); - stream_name[MAX_STREAM_NAME_LEN] = '\0'; } // Get source type. @@ -260,10 +259,10 @@ int main(int argc, char *argv[]) /* kvssink */ kvssink = gst_element_factory_make("kvssink", "kvssink"); - if (stream_name[0] != '\0') { - g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); - } else { + if (IS_EMPTY_STRING(stream_name)) { LOG_INFO("No stream name specified, using default kvssink stream name.") + } else { + g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); } determine_aws_credentials(kvssink, stream_name); diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index ebc6a5bc..27ef24b7 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -1191,12 +1191,13 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads, if(data && data->kinesis_video_stream) { Frame eofr = EOFR_FRAME_INITIALIZER; + LOG_INFO("Sending EOFR for " << kvssink->stream_name); STATUS put_eofr_status = data->kinesis_video_stream->putFrame(eofr); if(STATUS_FAILED(put_eofr_status)) { LOG_WARN("Failed to put EOFR for " << kvssink->stream_name); } } else { - LOG_WARN("NULL pointer! Failed to put EOFR for " << kvssink->stream_name); + LOG_WARN("Null argument, failed to put EOFR for " << kvssink->stream_name); } From 2fcc6bfec8b64cf6260339732e5162908838d622 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 24 Apr 2024 14:22:06 -0700 Subject: [PATCH 26/27] Fix double space typo --- samples/kvssink_intermittent_sample.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/kvssink_intermittent_sample.cpp b/samples/kvssink_intermittent_sample.cpp index 9d76c2b1..21112ced 100644 --- a/samples/kvssink_intermittent_sample.cpp +++ b/samples/kvssink_intermittent_sample.cpp @@ -40,7 +40,7 @@ typedef struct _CustomData { } CustomData; void sigint_handler(int sigint) { - LOG_DEBUG("SIGINT received. Exiting..."); + LOG_DEBUG("SIGINT received. Exiting..."); terminated = TRUE; cv.notify_all(); if(main_loop != NULL) { From 0df1386cee3162def50669dd81ced449577645ba Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Fri, 26 Apr 2024 16:27:46 -0700 Subject: [PATCH 27/27] nit ReadMe change --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 74035ea7..3b95114d 100644 --- a/README.md +++ b/README.md @@ -191,7 +191,7 @@ The kvssink element has the following required parameters: ### Running kvssink Samples -The SDK comes with two programmatic GStreamer samples: `kvssink_gstreamer_sample` and `kvssink_intermittent_sample`. For more use cases, see the CLI pipeline examples at [Example: Kinesis Video Streams Producer SDK GStreamer Plugin](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html). +The SDK comes with two programmatic GStreamer kvssink samples: `kvssink_gstreamer_sample` and `kvssink_intermittent_sample`. For more use cases, see the CLI pipeline examples at [Example: Kinesis Video Streams Producer SDK GStreamer Plugin](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html). The programmatic samples require the AWS region to be set with the `AWS_DEFAULT_REGION` environment variable. For example: ```bash