diff --git a/libavformat/udp.c b/libavformat/udp.c index 70dc98e4de..58e7498845 100644 --- a/libavformat/udp.c +++ b/libavformat/udp.c @@ -29,6 +29,7 @@ #include "avformat.h" #include "avio_internal.h" +#include "libavutil/avassert.h" #include "libavutil/parseutils.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" @@ -93,6 +94,7 @@ typedef struct UDPContext { AVFifoBuffer *fifo; int circular_buffer_error; int64_t packet_gap; /* delay between transmitted packets */ + int close_req; #if HAVE_PTHREAD_CANCEL pthread_t circular_buffer_thread; pthread_mutex_t mutex; @@ -545,30 +547,6 @@ end: return NULL; } -static void do_udp_write(void *arg, void *buf, int size) { - URLContext *h = arg; - UDPContext *s = h->priv_data; - - int ret; - - if (!(h->flags & AVIO_FLAG_NONBLOCK)) { - ret = ff_network_wait_fd(s->udp_fd, 1); - if (ret < 0) { - s->circular_buffer_error = ret; - return; - } - } - - if (!s->is_connected) { - ret = sendto (s->udp_fd, buf, size, 0, - (struct sockaddr *) &s->dest_addr, - s->dest_addr_len); - } else - ret = send(s->udp_fd, buf, size, 0); - - s->circular_buffer_error=ret; -} - static void *circular_buffer_task_tx( void *_URLContext) { URLContext *h = _URLContext; @@ -576,41 +554,67 @@ static void *circular_buffer_task_tx( void *_URLContext) int old_cancelstate; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + pthread_mutex_lock(&s->mutex); + + if (ff_socket_nonblock(s->udp_fd, 0) < 0) { + av_log(h, AV_LOG_ERROR, "Failed to set blocking mode"); + s->circular_buffer_error = AVERROR(EIO); + goto end; + } for(;;) { int len; + const uint8_t *p; uint8_t tmp[4]; - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); - - av_usleep(s->packet_gap); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); - - pthread_mutex_lock(&s->mutex); - len=av_fifo_size(s->fifo); while (len<4) { + if (s->close_req) + goto end; if (pthread_cond_wait(&s->cond, &s->mutex) < 0) { goto end; } len=av_fifo_size(s->fifo); } - av_fifo_generic_peek(s->fifo, tmp, 4, NULL); + av_fifo_generic_read(s->fifo, tmp, 4, NULL); len=AV_RL32(tmp); - if (len>0 && av_fifo_size(s->fifo)>=len+4) { - av_fifo_drain(s->fifo, 4); /* skip packet length */ - av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */ - if (s->circular_buffer_error == len) { - /* all ok - reset error */ - s->circular_buffer_error=0; + av_assert0(len >= 0); + av_assert0(len <= sizeof(s->tmp)); + + av_fifo_generic_read(s->fifo, s->tmp, len, NULL); + + pthread_mutex_unlock(&s->mutex); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); + + p = s->tmp; + while (len) { + int ret; + av_assert0(len > 0); + if (!s->is_connected) { + ret = sendto (s->udp_fd, p, len, 0, + (struct sockaddr *) &s->dest_addr, + s->dest_addr_len); + } else + ret = send(s->udp_fd, p, len, 0); + if (ret >= 0) { + len -= ret; + p += ret; + } else { + ret = ff_neterrno(); + if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) { + s->circular_buffer_error = ret; + return NULL; + } } } - pthread_mutex_unlock(&s->mutex); + av_usleep(s->packet_gap); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + pthread_mutex_lock(&s->mutex); } end: @@ -1055,7 +1059,6 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size) */ if (s->circular_buffer_error<0) { int err=s->circular_buffer_error; - s->circular_buffer_error=0; pthread_mutex_unlock(&s->mutex); return err; } @@ -1093,13 +1096,26 @@ static int udp_close(URLContext *h) { UDPContext *s = h->priv_data; +#if HAVE_PTHREAD_CANCEL + // Request close once writing is finished + if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) { + int ret; + pthread_mutex_lock(&s->mutex); + s->close_req = 1; + pthread_cond_signal(&s->cond); + pthread_mutex_unlock(&s->mutex); + } +#endif + if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage); closesocket(s->udp_fd); #if HAVE_PTHREAD_CANCEL if (s->thread_started) { int ret; - pthread_cancel(s->circular_buffer_thread); + // Cancel only read, as write has been signaled as success to the user + if (h->flags & AVIO_FLAG_READ) + pthread_cancel(s->circular_buffer_thread); ret = pthread_join(s->circular_buffer_thread, NULL); if (ret != 0) av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));