19#ifndef MTCORE_THREAD_PUB_SUB_HPP
20#define MTCORE_THREAD_PUB_SUB_HPP
57 std::condition_variable_any readSignal;
58 std::condition_variable_any senderSignal;
60 bool hasReservedWriter =
false;
62 Broker(Allocator *alloc,
size_t queueSize) : alloc(alloc), queueSize(queueSize) {
ensure(alloc); }
64 GenList<Queue<T>> inboxes;
67 auto lock = std::unique_lock(mux);
68 auto iter = inboxes.ptr_iter();
70 while (iter.next().copy_if_present(q)) {
71 print_debug(
"%s",
"NOT ALL BROKER SUBSCRIPTIONS WERE CLEANED UP! CLEANING UP A "
72 "SUBSCRIPTION IN Arc DESTRUCTOR!");
76 inboxes.deinit(*alloc);
112 if (!broker.valid()) {
116 auto lock = std::unique_lock(broker->mux, std::try_to_lock_t{});
117 if (!lock.owns_lock()) {
121 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
125 if (
auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
126 broker->senderSignal.notify_all();
130 if (broker->closed) {
143 if (!broker.valid()) {
147 auto lock = std::unique_lock(broker->mux);
149 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
154 if (
auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
155 broker->senderSignal.notify_all();
159 if (broker->closed) {
162 broker->readSignal.wait(lock);
171 template<
typename Clock,
typename Duration>
174 if (!broker.valid()) {
178 auto lock = std::unique_lock(broker->mux, timePoint);
179 if (!lock.owns_lock()) {
183 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
188 if (
auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
189 broker->senderSignal.notify_all();
193 if (broker->closed) {
197 if (broker->readSignal.wait_until(lock, timePoint) == std::cv_status::timeout) {
208 if (!broker.valid()) {
212 auto lock = std::unique_lock(broker->mux);
213 broker->inboxes[inboxHandle].deinit(*broker->alloc);
214 broker->inboxes.remove(inboxHandle);
215 broker->senderSignal.notify_all();
217 broker.release(alloc);
228 Handle inboxHandle = {0, 0};
266 if (
auto brokerRes =
broker.init(alloc, &alloc, queueSize); brokerRes.is_error()) {
267 return brokerRes.error();
269 if (
auto res =
broker->inboxes.init(alloc); res.is_error()) {
281 auto lock = std::unique_lock(
broker->mux);
285 broker->senderSignal.notify_all();
286 broker->readSignal.notify_all();
309 auto lock = std::unique_lock(
broker->mux);
315 auto handleRes =
broker->inboxes.add(q);
316 if (handleRes.is_error()) {
322 if (subRef.is_error()) {
323 broker->inboxes.remove(handleRes.value());
339 auto lock = std::unique_lock(
broker->mux);
351 while (
iter.next().copy_if_present(cur)) {
357 broker->senderSignal.wait(lock);
365 while (
iter.next().copy_if_present(cur)) {
366 ensure(cur->
push(message).is_success(),
"PRE CHECK FAILED");
368 broker->readSignal.notify_all();
384 auto lock = std::unique_lock(
broker->mux);
392 bool allocated =
false;
393 bool sentSomething =
false;
395 while (
iter.next().copy_if_present(cur)) {
397 if (!cur->
push(alloc, message)) {
401 sentSomething =
true;
405 if (res.
is_error() && sentSomething) {
410 broker->readSignal.notify_all();
412 broker->senderSignal.notify_all();
429 template<
typename Clock,
typename Duration>
431 const std::chrono::time_point<Clock, Duration> &timePoint) {
433 auto lock = std::unique_lock(
broker->mux, timePoint);
434 if (!lock.owns_lock()) {
448 while (
iter.next().copy_if_present(cur)) {
454 if (
broker->senderSignal.wait_until(lock, timePoint) == std::cv_status::timeout) {
463 while (
iter.next().copy_if_present(cur)) {
464 ensure(cur->
push(message).is_success(),
"PRE CHECK FAILED");
466 broker->readSignal.notify_all();
483 template<
typename Clock,
typename Duration>
485 const std::chrono::time_point<Clock, Duration> &timePoint) {
487 auto lock = std::unique_lock(
broker->mux, timePoint);
488 if (!lock.owns_lock()) {
498 bool allocated =
false;
499 bool sentSomething =
false;
501 while (
iter.next().copy_if_present(cur)) {
503 if (!cur->
push(alloc, message)) {
507 sentSomething =
true;
511 if (res.
is_error() && sentSomething) {
517 broker->readSignal.notify_all();
520 broker->senderSignal.notify_all();
535 auto lock = std::unique_lock(
broker->mux, std::try_to_lock_t{});
536 if (!lock.owns_lock()) {
547 while (
iter.next().copy_if_present(cur)) {
557 while (
iter.next().copy_if_present(cur)) {
558 ensure(cur->
push(message).is_success(),
"PRE CHECK FAILED");
560 broker->readSignal.notify_all();
576 auto lock = std::unique_lock(
broker->mux, std::try_to_lock_t{});
577 if (!lock.owns_lock()) {
586 bool allocated =
false;
587 bool sentSomething =
false;
590 allocated |= cur.is_full();
591 if (
auto push = cur.push(alloc, message); push.is_error()) {
595 sentSomething =
true;
599 if (res.
is_error() && sentSomething) {
604 broker->readSignal.notify_all();
606 broker->senderSignal.notify_all();
#define print_debug(fmt,...)
Prints a debug message when in debug mode.
constexpr auto nullopt
Placeholder value for an empty Optional.
ChannelReceiveBeforeErrors
Errors for when timed receives fail Note: If the channel is closed, a nullopt will be returned instea...
PubSubSendBeforeGrowsError
Errors for when timed blocking publishing fails.
ChannelTryReceiveErrors
Errors for when non-blocking receives fail Note: If the channel is closed, a nullopt will be returned...
AllocationError
Error indicating failed allocation.
PubSubSendBlockGrowErrors
Errors for when blocking publishing fails.
PubSubTrySendGrowsError
Errors for when non-blocking publishing fails.
#define ensure(check,...)
Ensures that a check holds true, aborts the program if not true Will print error if the condition is ...
Success< void > success()
Creates a successful void Result object.
Error< Underlying > error(Underlying err)
Creates an error.
constexpr bool is_receiver_block
Checks if a type can be received from (blocking)
constexpr bool is_sender_before_grow
Checks if a type can be sent to (blocking, timeout) with growing.
constexpr bool is_receiver
Checks if a type can be received from.
constexpr bool is_sender
Checks if a type can be sent to.
constexpr bool has_closed_error
Checks if an error type has a recognized closed error.
constexpr bool is_sender_before
Checks if a type can be sent to (blocking, timeout)
constexpr bool is_sender_block
Checks if a type can be sent to (blocking)
constexpr bool is_sender_block_grow
Checks if a type can be sent to (blocking) with growing.
constexpr bool is_closeable
Checks if a type is closeable.
constexpr bool is_receiver_before
Checks if a type can be received from (blocking, timeout)
constexpr bool is_sender_grow
Checks if a type can be sent to with growing.
Generic iterator defaults built on common contracts Does not guarantee performance of iterators Actua...
Thread-related namespace The methods and classes provided by this class are thread-safe Classes and m...
Represents a memory allocator Exact behavior depends on the underlying VTable used Should use the a_*...
Handle to an item in the list.
Represents a value that may or may not exist (an "Optional" value) Similar concept to std::optional,...
FIFO Queue (First In, First Out) Dynamically allocated, can be resized.
bool is_full() const noexcept
Checks if ring buffer is full.
Result< void, AllocationError > init(Allocator &alloc, size_t capacity=10)
Initializes Queue with capacity.
Result< void, CollectionAddNoAllocationError > push(const T &elem) noexcept
Tries to add an element to the Queue Fails if Queue is empty.
void deinit(Allocator &alloc)
Cleans up allocated memory.
Represents a Result that may have an error (error code) or a success value A type of "void" means the...
bool is_error() const noexcept
Checks if is an error Result.
Represents a pub/sub publisher which can publish messages of type T Supports blocking and non-blockin...
Result< void, ChannelSendBeforeErrors > send_before(const T &message, const std::chrono::time_point< Clock, Duration > &timePoint)
Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantee...
Result< void, ChannelSendBlockErrors > send_block(const T &message)
Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantee...
Result< void, PubSubSendBlockGrowErrors > send_block(Allocator &alloc, const T &message)
Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough...
void deinit(Allocator &alloc)
Closes a pub/sub channel and dereferences broker Broker will be cleaned up once all subscribers are c...
Result< void, PubSubTrySendGrowsError > try_send(Allocator &alloc, const T &message)
Sends a message to all subscribers Will not block.
Result< Subscription< T >, AllocationError > subscribe()
Creates a new Subscription to pub/sub channel.
void close()
Closes a pub/sub channel All subscribers will sbe notified of closure.
Result< void, AllocationError > init(Allocator &alloc, size_t queueSize=10)
Initializes a publisher.
Result< void, PubSubSendBeforeGrowsError > send_before(Allocator &alloc, const T &message, const std::chrono::time_point< Clock, Duration > &timePoint)
Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough...
Arc< impl::Broker< T > > broker
Result< void, PubSubTrySendGrowsError > try_send(const T &message)
Sends a message to all subscribers Will not block.
Represents a pub/sub subscription which can receive messages of type T Under the hood,...
Optional< T > receive_block()
Blocks until a message is ready.
Result< Optional< T >, ChannelReceiveBeforeErrors > receive_before(const std::chrono::time_point< Clock, Duration > &timePoint)
Blocks until a message is ready, or erros if a timeout occurs.
void deinit(Allocator &alloc)
Deinitializes and cancels Subscription Note: This will cancel all copies of this particular Subscript...
Result< Optional< T >, ChannelTryReceiveErrors > try_receive()
Tries to receive a message without blocking.
Subscription(const Arc< impl::Broker< T > > &broker, const Handle &handle)