udp: Replace double select() by select+mutex+cond.
When no data was available both the buffer thread as well as the main thread would block in select(), when data becomes available both should move forward and as data is read in the buffer thread the main thread would block in select() later the read data was put in the fifo but the main thread still would be blocked in select() until either the timeout or another packet would come in. This is solved in this commit by using a mutex and a condition variable Signed-off-by: Michael Niedermayer <michaelni@gmx.at>
This commit is contained in:
parent
3dcbafc777
commit
bc900501e0
@ -69,6 +69,8 @@ typedef struct {
|
|||||||
int circular_buffer_error;
|
int circular_buffer_error;
|
||||||
#if HAVE_PTHREADS
|
#if HAVE_PTHREADS
|
||||||
pthread_t circular_buffer_thread;
|
pthread_t circular_buffer_thread;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
pthread_cond_t cond;
|
||||||
#endif
|
#endif
|
||||||
uint8_t tmp[UDP_MAX_PKT_SIZE+4];
|
uint8_t tmp[UDP_MAX_PKT_SIZE+4];
|
||||||
int remaining_in_dg;
|
int remaining_in_dg;
|
||||||
@ -317,6 +319,7 @@ static int udp_get_file_handle(URLContext *h)
|
|||||||
return s->udp_fd;
|
return s->udp_fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if HAVE_PTHREADS
|
||||||
static void *circular_buffer_task( void *_URLContext)
|
static void *circular_buffer_task( void *_URLContext)
|
||||||
{
|
{
|
||||||
URLContext *h = _URLContext;
|
URLContext *h = _URLContext;
|
||||||
@ -331,7 +334,7 @@ static void *circular_buffer_task( void *_URLContext)
|
|||||||
|
|
||||||
if (ff_check_interrupt(&h->interrupt_callback)) {
|
if (ff_check_interrupt(&h->interrupt_callback)) {
|
||||||
s->circular_buffer_error = EINTR;
|
s->circular_buffer_error = EINTR;
|
||||||
return NULL;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
FD_ZERO(&rfds);
|
FD_ZERO(&rfds);
|
||||||
@ -343,7 +346,7 @@ static void *circular_buffer_task( void *_URLContext)
|
|||||||
if (ff_neterrno() == AVERROR(EINTR))
|
if (ff_neterrno() == AVERROR(EINTR))
|
||||||
continue;
|
continue;
|
||||||
s->circular_buffer_error = EIO;
|
s->circular_buffer_error = EIO;
|
||||||
return NULL;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
|
if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
|
||||||
@ -357,23 +360,31 @@ static void *circular_buffer_task( void *_URLContext)
|
|||||||
if(left < UDP_MAX_PKT_SIZE + 4) {
|
if(left < UDP_MAX_PKT_SIZE + 4) {
|
||||||
av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
|
av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
|
||||||
s->circular_buffer_error = EIO;
|
s->circular_buffer_error = EIO;
|
||||||
return NULL;
|
goto end;
|
||||||
}
|
}
|
||||||
left = FFMIN(left, s->fifo->end - s->fifo->wptr);
|
left = FFMIN(left, s->fifo->end - s->fifo->wptr);
|
||||||
len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
|
len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
|
if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
|
||||||
s->circular_buffer_error = EIO;
|
s->circular_buffer_error = EIO;
|
||||||
return NULL;
|
goto end;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
AV_WL32(s->tmp, len);
|
AV_WL32(s->tmp, len);
|
||||||
|
pthread_mutex_lock(&s->mutex);
|
||||||
av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
|
av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
|
||||||
|
pthread_cond_signal(&s->cond);
|
||||||
|
pthread_mutex_unlock(&s->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
pthread_mutex_lock(&s->mutex);
|
||||||
|
pthread_cond_signal(&s->cond);
|
||||||
|
pthread_mutex_unlock(&s->mutex);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* put it in UDP context */
|
/* put it in UDP context */
|
||||||
/* return non zero if error */
|
/* return non zero if error */
|
||||||
@ -516,6 +527,8 @@ static int udp_open(URLContext *h, const char *uri, int flags)
|
|||||||
if (!is_output && s->circular_buffer_size) {
|
if (!is_output && s->circular_buffer_size) {
|
||||||
/* start the task going */
|
/* start the task going */
|
||||||
s->fifo = av_fifo_alloc(s->circular_buffer_size);
|
s->fifo = av_fifo_alloc(s->circular_buffer_size);
|
||||||
|
pthread_mutex_init(&s->mutex, NULL);
|
||||||
|
pthread_cond_init(&s->cond, NULL);
|
||||||
if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) {
|
if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) {
|
||||||
av_log(h, AV_LOG_ERROR, "pthread_create failed\n");
|
av_log(h, AV_LOG_ERROR, "pthread_create failed\n");
|
||||||
goto fail;
|
goto fail;
|
||||||
@ -536,15 +549,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
|
|||||||
UDPContext *s = h->priv_data;
|
UDPContext *s = h->priv_data;
|
||||||
int ret;
|
int ret;
|
||||||
int avail;
|
int avail;
|
||||||
fd_set rfds;
|
|
||||||
struct timeval tv;
|
|
||||||
|
|
||||||
|
#if HAVE_PTHREADS
|
||||||
if (s->fifo) {
|
if (s->fifo) {
|
||||||
|
pthread_mutex_lock(&s->mutex);
|
||||||
do {
|
do {
|
||||||
avail = av_fifo_size(s->fifo);
|
avail = av_fifo_size(s->fifo);
|
||||||
if (avail) { // >=size) {
|
if (avail) { // >=size) {
|
||||||
uint8_t tmp[4];
|
uint8_t tmp[4];
|
||||||
|
pthread_mutex_unlock(&s->mutex);
|
||||||
|
|
||||||
av_fifo_generic_read(s->fifo, tmp, 4, NULL);
|
av_fifo_generic_read(s->fifo, tmp, 4, NULL);
|
||||||
avail= AV_RL32(tmp);
|
avail= AV_RL32(tmp);
|
||||||
@ -557,19 +570,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
|
|||||||
av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
|
av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
|
||||||
return avail;
|
return avail;
|
||||||
} else if(s->circular_buffer_error){
|
} else if(s->circular_buffer_error){
|
||||||
|
pthread_mutex_unlock(&s->mutex);
|
||||||
return s->circular_buffer_error;
|
return s->circular_buffer_error;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
FD_ZERO(&rfds);
|
pthread_cond_wait(&s->cond, &s->mutex);
|
||||||
FD_SET(s->udp_fd, &rfds);
|
|
||||||
tv.tv_sec = 1;
|
|
||||||
tv.tv_usec = 0;
|
|
||||||
ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
|
|
||||||
if (ret<0)
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
} while( 1);
|
} while( 1);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
|
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
|
||||||
ret = ff_network_wait_fd(s->udp_fd, 0);
|
ret = ff_network_wait_fd(s->udp_fd, 0);
|
||||||
@ -610,6 +619,10 @@ static int udp_close(URLContext *h)
|
|||||||
udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
|
udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
|
||||||
closesocket(s->udp_fd);
|
closesocket(s->udp_fd);
|
||||||
av_fifo_free(s->fifo);
|
av_fifo_free(s->fifo);
|
||||||
|
#if HAVE_PTHREADS
|
||||||
|
pthread_mutex_destroy(&s->mutex);
|
||||||
|
pthread_cond_destroy(&s->cond);
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user