#include "pipe.h" #include /** * Data shared between both sides. */ typedef struct { FuriSemaphore* instance_count; // role; } PipeState pipe_state(PipeSide* pipe) { furi_check(pipe); uint32_t count = furi_semaphore_get_count(pipe->shared->instance_count); return (count == 1) ? PipeStateOpen : PipeStateBroken; } void pipe_free(PipeSide* pipe) { furi_check(pipe); furi_check(!pipe->event_loop); furi_mutex_acquire(pipe->shared->state_transition, FuriWaitForever); FuriStatus status = furi_semaphore_acquire(pipe->shared->instance_count, 0); if(status == FuriStatusOk) { // the other side is still intact furi_mutex_release(pipe->shared->state_transition); free(pipe); } else { // the other side is gone too furi_stream_buffer_free(pipe->sending); furi_stream_buffer_free(pipe->receiving); furi_semaphore_free(pipe->shared->instance_count); furi_mutex_free(pipe->shared->state_transition); free(pipe->shared); free(pipe); } } static void _pipe_stdout_cb(const char* data, size_t size, void* context) { furi_assert(context); PipeSide* pipe = context; while(size) { size_t sent = pipe_send(pipe, data, size, pipe->stdout_timeout); if(!sent) break; data += sent; size -= sent; } } static size_t _pipe_stdin_cb(char* data, size_t size, FuriWait timeout, void* context) { furi_assert(context); PipeSide* pipe = context; return pipe_receive(pipe, data, size, timeout); } void pipe_install_as_stdio(PipeSide* pipe) { furi_check(pipe); furi_thread_set_stdout_callback(_pipe_stdout_cb, pipe); furi_thread_set_stdin_callback(_pipe_stdin_cb, pipe); } void pipe_set_stdout_timeout(PipeSide* pipe, FuriWait timeout) { furi_check(pipe); pipe->stdout_timeout = timeout; } size_t pipe_receive(PipeSide* pipe, void* data, size_t length, FuriWait timeout) { furi_check(pipe); return furi_stream_buffer_receive(pipe->receiving, data, length, timeout); } size_t pipe_send(PipeSide* pipe, const void* data, size_t length, FuriWait timeout) { furi_check(pipe); return furi_stream_buffer_send(pipe->sending, data, length, timeout); } size_t pipe_bytes_available(PipeSide* pipe) { furi_check(pipe); return furi_stream_buffer_bytes_available(pipe->receiving); } size_t pipe_spaces_available(PipeSide* pipe) { furi_check(pipe); return furi_stream_buffer_spaces_available(pipe->sending); } static void pipe_receiving_buffer_callback(FuriEventLoopObject* buffer, void* context) { UNUSED(buffer); PipeSide* pipe = context; furi_assert(pipe); if(pipe->on_space_freed) pipe->on_data_arrived(pipe, pipe->callback_context); } static void pipe_sending_buffer_callback(FuriEventLoopObject* buffer, void* context) { UNUSED(buffer); PipeSide* pipe = context; furi_assert(pipe); if(pipe->on_data_arrived) pipe->on_space_freed(pipe, pipe->callback_context); } static void pipe_semaphore_callback(FuriEventLoopObject* semaphore, void* context) { UNUSED(semaphore); PipeSide* pipe = context; furi_assert(pipe); if(pipe->on_pipe_broken) pipe->on_pipe_broken(pipe, pipe->callback_context); } void pipe_attach_to_event_loop(PipeSide* pipe, FuriEventLoop* event_loop) { furi_check(pipe); furi_check(event_loop); furi_check(!pipe->event_loop); pipe->event_loop = event_loop; } void pipe_detach_from_event_loop(PipeSide* pipe) { furi_check(pipe); furi_check(pipe->event_loop); furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->receiving); furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->sending); furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->shared->instance_count); pipe->event_loop = NULL; } void pipe_set_callback_context(PipeSide* pipe, void* context) { furi_check(pipe); pipe->callback_context = context; } void pipe_set_data_arrived_callback( PipeSide* pipe, PipeSideDataArrivedCallback callback, FuriEventLoopEvent event) { furi_check(pipe); furi_check(pipe->event_loop); furi_check((event & FuriEventLoopEventMask) == 0); furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->receiving); pipe->on_data_arrived = callback; if(callback) furi_event_loop_subscribe_stream_buffer( pipe->event_loop, pipe->receiving, FuriEventLoopEventIn | event, pipe_receiving_buffer_callback, pipe); } void pipe_set_space_freed_callback( PipeSide* pipe, PipeSideSpaceFreedCallback callback, FuriEventLoopEvent event) { furi_check(pipe); furi_check(pipe->event_loop); furi_check((event & FuriEventLoopEventMask) == 0); furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->sending); pipe->on_space_freed = callback; if(callback) furi_event_loop_subscribe_stream_buffer( pipe->event_loop, pipe->sending, FuriEventLoopEventOut | event, pipe_sending_buffer_callback, pipe); } void pipe_set_broken_callback( PipeSide* pipe, PipeSideBrokenCallback callback, FuriEventLoopEvent event) { furi_check(pipe); furi_check(pipe->event_loop); furi_check((event & FuriEventLoopEventMask) == 0); furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->shared->instance_count); pipe->on_pipe_broken = callback; if(callback) furi_event_loop_subscribe_semaphore( pipe->event_loop, pipe->shared->instance_count, FuriEventLoopEventOut | event, pipe_semaphore_callback, pipe); }