diff --git a/libavdevice/decklink_common.cpp b/libavdevice/decklink_common.cpp index acd1f77e6c..74e26e986d 100644 --- a/libavdevice/decklink_common.cpp +++ b/libavdevice/decklink_common.cpp @@ -390,6 +390,121 @@ int ff_decklink_set_format(AVFormatContext *avctx, decklink_direction_t directio return ff_decklink_set_format(avctx, 0, 0, 0, 0, AV_FIELD_UNKNOWN, direction); } +void ff_decklink_packet_queue_init(AVFormatContext *avctx, DecklinkPacketQueue *q) +{ + struct decklink_cctx *ctx = (struct decklink_cctx *)avctx->priv_data; + memset(q, 0, sizeof(DecklinkPacketQueue)); + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); + q->avctx = avctx; + q->max_q_size = ctx->queue_size; +} + +void ff_decklink_packet_queue_flush(DecklinkPacketQueue *q) +{ + PacketListEntry *pkt, *pkt1; + + pthread_mutex_lock(&q->mutex); + for (pkt = q->pkt_list.head; pkt != NULL; pkt = pkt1) { + pkt1 = pkt->next; + av_packet_unref(&pkt->pkt); + av_freep(&pkt); + } + q->pkt_list.head = NULL; + q->pkt_list.tail = NULL; + q->nb_packets = 0; + q->size = 0; + pthread_mutex_unlock(&q->mutex); +} + +void ff_decklink_packet_queue_end(DecklinkPacketQueue *q) +{ + ff_decklink_packet_queue_flush(q); + pthread_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->cond); +} + +unsigned long long ff_decklink_packet_queue_size(DecklinkPacketQueue *q) +{ + unsigned long long size; + pthread_mutex_lock(&q->mutex); + size = q->size; + pthread_mutex_unlock(&q->mutex); + return size; +} + +int ff_decklink_packet_queue_put(DecklinkPacketQueue *q, AVPacket *pkt) +{ + PacketListEntry *pkt1; + + // Drop Packet if queue size is > maximum queue size + if (ff_decklink_packet_queue_size(q) > (uint64_t)q->max_q_size) { + av_packet_unref(pkt); + av_log(q->avctx, AV_LOG_WARNING, "Decklink input buffer overrun!\n"); + return -1; + } + /* ensure the packet is reference counted */ + if (av_packet_make_refcounted(pkt) < 0) { + av_packet_unref(pkt); + return -1; + } + + pkt1 = (PacketListEntry *)av_malloc(sizeof(*pkt1)); + if (!pkt1) { + av_packet_unref(pkt); + return -1; + } + av_packet_move_ref(&pkt1->pkt, pkt); + pkt1->next = NULL; + + pthread_mutex_lock(&q->mutex); + + if (!q->pkt_list.tail) { + q->pkt_list.head = pkt1; + } else { + q->pkt_list.tail->next = pkt1; + } + + q->pkt_list.tail = pkt1; + q->nb_packets++; + q->size += pkt1->pkt.size + sizeof(*pkt1); + + pthread_cond_signal(&q->cond); + + pthread_mutex_unlock(&q->mutex); + return 0; +} + +int ff_decklink_packet_queue_get(DecklinkPacketQueue *q, AVPacket *pkt, int block) +{ + int ret; + + pthread_mutex_lock(&q->mutex); + + for (;; ) { + PacketListEntry *pkt1 = q->pkt_list.head; + if (pkt1) { + q->pkt_list.head = pkt1->next; + if (!q->pkt_list.head) { + q->pkt_list.tail = NULL; + } + q->nb_packets--; + q->size -= pkt1->pkt.size + sizeof(*pkt1); + *pkt = pkt1->pkt; + av_free(pkt1); + ret = 1; + break; + } else if (!block) { + ret = 0; + break; + } else { + pthread_cond_wait(&q->cond, &q->mutex); + } + } + pthread_mutex_unlock(&q->mutex); + return ret; +} + int ff_decklink_list_devices(AVFormatContext *avctx, struct AVDeviceInfoList *device_list, int show_inputs, int show_outputs) diff --git a/libavdevice/decklink_common.h b/libavdevice/decklink_common.h index 0d33f94a6e..1cc6d9c2cc 100644 --- a/libavdevice/decklink_common.h +++ b/libavdevice/decklink_common.h @@ -78,7 +78,7 @@ static char *dup_cfstring_to_utf8(CFStringRef w) class decklink_output_callback; class decklink_input_callback; -typedef struct AVPacketQueue { +typedef struct DecklinkPacketQueue { PacketList pkt_list; int nb_packets; unsigned long long size; @@ -87,7 +87,7 @@ typedef struct AVPacketQueue { pthread_cond_t cond; AVFormatContext *avctx; int64_t max_q_size; -} AVPacketQueue; +} DecklinkPacketQueue; struct decklink_ctx { /* DeckLink SDK interfaces */ @@ -111,7 +111,7 @@ struct decklink_ctx { int supports_vanc; /* Capture buffer queue */ - AVPacketQueue queue; + DecklinkPacketQueue queue; AVCCFifo *cc_fifo; ///< closed captions @@ -235,4 +235,11 @@ int ff_decklink_list_formats(AVFormatContext *avctx, decklink_direction_t direct void ff_decklink_cleanup(AVFormatContext *avctx); int ff_decklink_init_device(AVFormatContext *avctx, const char* name); +void ff_decklink_packet_queue_init(AVFormatContext *avctx, DecklinkPacketQueue *q); +void ff_decklink_packet_queue_flush(DecklinkPacketQueue *q); +void ff_decklink_packet_queue_end(DecklinkPacketQueue *q); +unsigned long long ff_decklink_packet_queue_size(DecklinkPacketQueue *q); +int ff_decklink_packet_queue_put(DecklinkPacketQueue *q, AVPacket *pkt); +int ff_decklink_packet_queue_get(DecklinkPacketQueue *q, AVPacket *pkt, int block); + #endif /* AVDEVICE_DECKLINK_COMMON_H */ diff --git a/libavdevice/decklink_dec.cpp b/libavdevice/decklink_dec.cpp index 7bf5e3724c..66abee1268 100644 --- a/libavdevice/decklink_dec.cpp +++ b/libavdevice/decklink_dec.cpp @@ -471,120 +471,6 @@ skip_packet: return tgt; } -static void avpacket_queue_init(AVFormatContext *avctx, AVPacketQueue *q) -{ - struct decklink_cctx *ctx = (struct decklink_cctx *)avctx->priv_data; - memset(q, 0, sizeof(AVPacketQueue)); - pthread_mutex_init(&q->mutex, NULL); - pthread_cond_init(&q->cond, NULL); - q->avctx = avctx; - q->max_q_size = ctx->queue_size; -} - -static void avpacket_queue_flush(AVPacketQueue *q) -{ - PacketListEntry *pkt, *pkt1; - - pthread_mutex_lock(&q->mutex); - for (pkt = q->pkt_list.head; pkt != NULL; pkt = pkt1) { - pkt1 = pkt->next; - av_packet_unref(&pkt->pkt); - av_freep(&pkt); - } - q->pkt_list.head = NULL; - q->pkt_list.tail = NULL; - q->nb_packets = 0; - q->size = 0; - pthread_mutex_unlock(&q->mutex); -} - -static void avpacket_queue_end(AVPacketQueue *q) -{ - avpacket_queue_flush(q); - pthread_mutex_destroy(&q->mutex); - pthread_cond_destroy(&q->cond); -} - -static unsigned long long avpacket_queue_size(AVPacketQueue *q) -{ - unsigned long long size; - pthread_mutex_lock(&q->mutex); - size = q->size; - pthread_mutex_unlock(&q->mutex); - return size; -} - -static int avpacket_queue_put(AVPacketQueue *q, AVPacket *pkt) -{ - PacketListEntry *pkt1; - - // Drop Packet if queue size is > maximum queue size - if (avpacket_queue_size(q) > (uint64_t)q->max_q_size) { - av_packet_unref(pkt); - av_log(q->avctx, AV_LOG_WARNING, "Decklink input buffer overrun!\n"); - return -1; - } - /* ensure the packet is reference counted */ - if (av_packet_make_refcounted(pkt) < 0) { - av_packet_unref(pkt); - return -1; - } - - pkt1 = (PacketListEntry *)av_malloc(sizeof(*pkt1)); - if (!pkt1) { - av_packet_unref(pkt); - return -1; - } - av_packet_move_ref(&pkt1->pkt, pkt); - pkt1->next = NULL; - - pthread_mutex_lock(&q->mutex); - - if (!q->pkt_list.tail) { - q->pkt_list.head = pkt1; - } else { - q->pkt_list.tail->next = pkt1; - } - - q->pkt_list.tail = pkt1; - q->nb_packets++; - q->size += pkt1->pkt.size + sizeof(*pkt1); - - pthread_cond_signal(&q->cond); - - pthread_mutex_unlock(&q->mutex); - return 0; -} - -static int avpacket_queue_get(AVPacketQueue *q, AVPacket *pkt, int block) -{ - int ret; - - pthread_mutex_lock(&q->mutex); - - for (;; ) { - PacketListEntry *pkt1 = q->pkt_list.head; - if (pkt1) { - q->pkt_list.head = pkt1->next; - if (!q->pkt_list.head) { - q->pkt_list.tail = NULL; - } - q->nb_packets--; - q->size -= pkt1->pkt.size + sizeof(*pkt1); - *pkt = pkt1->pkt; - av_free(pkt1); - ret = 1; - break; - } else if (!block) { - ret = 0; - break; - } else { - pthread_cond_wait(&q->cond, &q->mutex); - } - } - pthread_mutex_unlock(&q->mutex); - return ret; -} static void handle_klv(AVFormatContext *avctx, decklink_ctx *ctx, IDeckLinkVideoInputFrame *videoFrame, int64_t pts) { @@ -682,7 +568,7 @@ static void handle_klv(AVFormatContext *avctx, decklink_ctx *ctx, IDeckLinkVideo klv_packet.data = klv.data(); klv_packet.size = klv.size(); - if (avpacket_queue_put(&ctx->queue, &klv_packet) < 0) { + if (ff_decklink_packet_queue_put(&ctx->queue, &klv_packet) < 0) { ++ctx->dropped; } } @@ -874,7 +760,7 @@ HRESULT decklink_input_callback::VideoInputFrameArrived( if (videoFrame) { AVPacket pkt = { 0 }; if (ctx->frameCount % 25 == 0) { - unsigned long long qsize = avpacket_queue_size(&ctx->queue); + unsigned long long qsize = ff_decklink_packet_queue_size(&ctx->queue); av_log(avctx, AV_LOG_DEBUG, "Frame received (#%lu) - Valid (%liB) - QSize %fMB\n", ctx->frameCount, @@ -1038,7 +924,7 @@ HRESULT decklink_input_callback::VideoInputFrameArrived( txt_pkt.stream_index = ctx->teletext_st->index; txt_pkt.data = txt_buf0; txt_pkt.size = txt_buf - txt_buf0; - if (avpacket_queue_put(&ctx->queue, &txt_pkt) < 0) { + if (ff_decklink_packet_queue_put(&ctx->queue, &txt_pkt) < 0) { ++ctx->dropped; } } @@ -1049,7 +935,7 @@ HRESULT decklink_input_callback::VideoInputFrameArrived( if (pkt.buf) videoFrame->AddRef(); - if (avpacket_queue_put(&ctx->queue, &pkt) < 0) { + if (ff_decklink_packet_queue_put(&ctx->queue, &pkt) < 0) { ++ctx->dropped; } } @@ -1071,7 +957,7 @@ HRESULT decklink_input_callback::VideoInputFrameArrived( pkt.stream_index = ctx->audio_st->index; pkt.data = (uint8_t *)audioFrameBytes; - if (avpacket_queue_put(&ctx->queue, &pkt) < 0) { + if (ff_decklink_packet_queue_put(&ctx->queue, &pkt) < 0) { ++ctx->dropped; } } @@ -1153,7 +1039,7 @@ av_cold int ff_decklink_read_close(AVFormatContext *avctx) } ff_decklink_cleanup(avctx); - avpacket_queue_end(&ctx->queue); + ff_decklink_packet_queue_end(&ctx->queue); av_freep(&cctx->ctx); @@ -1411,7 +1297,7 @@ av_cold int ff_decklink_read_header(AVFormatContext *avctx) goto error; } - avpacket_queue_init (avctx, &ctx->queue); + ff_decklink_packet_queue_init(avctx, &ctx->queue); if (ctx->dli->StartStreams() != S_OK) { av_log(avctx, AV_LOG_ERROR, "Cannot start input stream\n"); @@ -1431,7 +1317,7 @@ int ff_decklink_read_packet(AVFormatContext *avctx, AVPacket *pkt) struct decklink_cctx *cctx = (struct decklink_cctx *)avctx->priv_data; struct decklink_ctx *ctx = (struct decklink_ctx *)cctx->ctx; - avpacket_queue_get(&ctx->queue, pkt, 1); + ff_decklink_packet_queue_get(&ctx->queue, pkt, 1); if (ctx->tc_format && !(av_dict_get(ctx->video_st->metadata, "timecode", NULL, 0))) { size_t size;