MT Core (C++)
Core library for replacing C++ standard in project usage
Loading...
Searching...
No Matches
pub_sub.hpp
Go to the documentation of this file.
1/*
2
3Copyright 2025 Matthew Tolman
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17*/
18
19#ifndef MTCORE_THREAD_PUB_SUB_HPP
20#define MTCORE_THREAD_PUB_SUB_HPP
21
23#include "mtcore_thread/arc.hpp"
25#include "mtcore/alloc.hpp"
27#include <memory>
28
29namespace mtcore::thread {
30
48 template<typename T>
49 struct Subscription;
50
51 namespace impl {
52 template<typename T>
53 struct Broker {
54 Allocator *alloc;
55 size_t queueSize;
56 std::timed_mutex mux;
57 std::condition_variable_any readSignal;
58 std::condition_variable_any senderSignal;
59 bool closed = false;
60 bool hasReservedWriter = false;
61
62 Broker(Allocator *alloc, size_t queueSize) : alloc(alloc), queueSize(queueSize) { ensure(alloc); }
63
64 GenList<Queue<T>> inboxes;
65
66 ~Broker() {
67 auto lock = std::unique_lock(mux);
68 auto iter = inboxes.ptr_iter();
69 Queue<T> *q;
70 while (iter.next().copy_if_present(q)) {
71 print_debug("%s", "NOT ALL BROKER SUBSCRIPTIONS WERE CLEANED UP! CLEANING UP A "
72 "SUBSCRIPTION IN Arc DESTRUCTOR!");
73 q->deinit(*alloc);
74 }
75
76 inboxes.deinit(*alloc);
77 }
78 };
79 } // namespace impl
80
87
94
101
102 template<typename T>
104 using Message = T;
105
112 if (!broker.valid()) {
113 return success(Optional<T>{nullopt});
114 }
115
116 auto lock = std::unique_lock(broker->mux, std::try_to_lock_t{});
117 if (!lock.owns_lock()) {
119 }
120
121 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
122 return success(Optional<T>{nullopt});
123 }
124
125 if (auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
126 broker->senderSignal.notify_all();
127 return res;
128 }
129
130 if (broker->closed) {
131 return success(Optional<T>{nullopt});
132 }
133
135 }
136
143 if (!broker.valid()) {
144 return Optional<T>{nullopt};
145 }
146
147 auto lock = std::unique_lock(broker->mux);
148
149 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
150 return Optional<T>{nullopt};
151 }
152
153 do {
154 if (auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
155 broker->senderSignal.notify_all();
156 return res;
157 }
158
159 if (broker->closed) {
160 return Optional<T>{nullopt};
161 }
162 broker->readSignal.wait(lock);
163 } while (true);
164 }
165
171 template<typename Clock, typename Duration>
173 receive_before(const std::chrono::time_point<Clock, Duration> &timePoint) {
174 if (!broker.valid()) {
175 return success(Optional<T>{nullopt});
176 }
177
178 auto lock = std::unique_lock(broker->mux, timePoint);
179 if (!lock.owns_lock()) {
181 }
182
183 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
184 return success(Optional<T>{nullopt});
185 }
186
187 do {
188 if (auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
189 broker->senderSignal.notify_all();
190 return res;
191 }
192
193 if (broker->closed) {
194 return success(Optional<T>{nullopt});
195 }
196
197 if (broker->readSignal.wait_until(lock, timePoint) == std::cv_status::timeout) {
199 }
200 } while (true);
201 }
202
207 void deinit(Allocator &alloc) {
208 if (!broker.valid()) {
209 return;
210 }
211 {
212 auto lock = std::unique_lock(broker->mux);
213 broker->inboxes[inboxHandle].deinit(*broker->alloc);
214 broker->inboxes.remove(inboxHandle);
215 broker->senderSignal.notify_all();
216 }
217 broker.release(alloc);
218 }
219
220 void close() { deinit(); }
221
222 Subscription() = default;
223
224 Subscription(const Arc<impl::Broker<T>> &broker, const Handle &handle) : broker(broker), inboxHandle(handle) {}
225
226 private:
227 Arc<impl::Broker<T>> broker;
228 Handle inboxHandle = {0, 0};
229 };
230
231 static_assert(is_receiver<Subscription<int>>, "SUBSCRIPTION ISN'T A RECEIVER");
232 static_assert(is_receiver_block<Subscription<int>>, "SUBSCRIPTION ISN'T A BLOCK RECEIVER");
233 static_assert(is_receiver_before<Subscription<int>>, "SUBSCRIPTION ISN'T A BEFORE RECEIVER");
234 static_assert(is_closeable<Subscription<int>>, "SUBSCRIPTION ISN'T A CLOSEABLE");
235
252 template<typename T>
253 struct Publisher {
254 using Message = T;
255
257
264 Result<void, AllocationError> init(Allocator &alloc, size_t queueSize = 10) {
265 ensure(!broker.valid(), "ALREADY INITIALIZED");
266 if (auto brokerRes = broker.init(alloc, &alloc, queueSize); brokerRes.is_error()) {
267 return brokerRes.error();
268 }
269 if (auto res = broker->inboxes.init(alloc); res.is_error()) {
270 broker.deinit(alloc);
271 return res.error();
272 }
273 return success();
274 }
275
280 void close() {
281 auto lock = std::unique_lock(broker->mux);
282
283 // close out all our inboxes
284 broker->closed = true;
285 broker->senderSignal.notify_all();
286 broker->readSignal.notify_all();
287 }
288
295 void deinit(Allocator &alloc) {
296 if (!broker.valid()) {
297 return;
298 }
299 close();
300 broker.deinit(alloc);
301 }
302
308 ensure(broker.valid(), "NOT INITIALIZED");
309 auto lock = std::unique_lock(broker->mux);
310 Queue<T> q;
311 if (auto res = q.init(*broker->alloc, broker->queueSize); res.is_error()) {
312 return res.error();
313 }
314
315 auto handleRes = broker->inboxes.add(q);
316 if (handleRes.is_error()) {
317 q.deinit(*broker->alloc);
319 }
320
321 auto subRef = broker.acquire(*broker->alloc);
322 if (subRef.is_error()) {
323 broker->inboxes.remove(handleRes.value());
324 q.deinit(*broker->alloc);
326 }
327 return success(Subscription<T>{subRef.value(), *handleRes});
328 }
329
338 ensure(broker.valid(), "NOT INITIALIZED");
339 auto lock = std::unique_lock(broker->mux);
340
341 if (broker->closed) {
343 }
344
345 bool ready;
346 do {
347 ready = true;
348 // pre-check that we won't error out when adding
349 auto iter = broker->inboxes.ptr_iter();
350 Queue<T> *cur;
351 while (iter.next().copy_if_present(cur)) {
352 if (cur->is_full()) {
353 ready = false;
354 }
355 }
356 if (!ready) {
357 broker->senderSignal.wait(lock);
358 }
359 } while (!ready);
360
361 {
362 // Now that we're good, send
363 auto iter = broker->inboxes.ptr_iter();
364 Queue<T> *cur;
365 while (iter.next().copy_if_present(cur)) {
366 ensure(cur->push(message).is_success(), "PRE CHECK FAILED");
367 }
368 broker->readSignal.notify_all();
369 }
370 return success();
371 }
372
383 ensure(broker.valid(), "NOT INITIALIZED");
384 auto lock = std::unique_lock(broker->mux);
385
386 if (broker->closed) {
388 }
389
390 auto iter = broker->inboxes.ptr_iter();
391 Queue<T> *cur;
392 bool allocated = false;
393 bool sentSomething = false;
395 while (iter.next().copy_if_present(cur)) {
396 allocated |= cur->is_full();
397 if (!cur->push(alloc, message)) {
399 }
400 else {
401 sentSomething = true;
402 }
403 }
404
405 if (res.is_error() && sentSomething) {
407 }
408
409 if (sentSomething) {
410 broker->readSignal.notify_all();
411 if (allocated) {
412 broker->senderSignal.notify_all();
413 }
414 }
415 return res;
416 }
417
429 template<typename Clock, typename Duration>
431 const std::chrono::time_point<Clock, Duration> &timePoint) {
432 ensure(broker.valid(), "NOT INITIALIZED");
433 auto lock = std::unique_lock(broker->mux, timePoint);
434 if (!lock.owns_lock()) {
436 }
437
438 if (broker->closed) {
440 }
441
442 bool ready;
443 do {
444 ready = true;
445 // pre-check that we won't error out when adding
446 auto iter = broker->inboxes.ptr_iter();
447 Queue<T> *cur;
448 while (iter.next().copy_if_present(cur)) {
449 if (cur->is_full()) {
450 ready = false;
451 }
452 }
453 if (!ready) {
454 if (broker->senderSignal.wait_until(lock, timePoint) == std::cv_status::timeout) {
456 }
457 }
458 } while (!ready);
459 {
460 // Now that we're good, send
461 auto iter = broker->inboxes.ptr_iter();
462 Queue<T> *cur;
463 while (iter.next().copy_if_present(cur)) {
464 ensure(cur->push(message).is_success(), "PRE CHECK FAILED");
465 }
466 broker->readSignal.notify_all();
467 }
468 return success();
469 }
470
483 template<typename Clock, typename Duration>
485 const std::chrono::time_point<Clock, Duration> &timePoint) {
486 ensure(broker.valid(), "NOT INITIALIZED");
487 auto lock = std::unique_lock(broker->mux, timePoint);
488 if (!lock.owns_lock()) {
490 }
491
492 if (broker->closed) {
494 }
495
496 auto iter = broker->inboxes.ptr_iter();
497 Queue<T> *cur;
498 bool allocated = false;
499 bool sentSomething = false;
501 while (iter.next().copy_if_present(cur)) {
502 allocated |= cur->is_full();
503 if (!cur->push(alloc, message)) {
505 }
506 else {
507 sentSomething = true;
508 }
509 }
510
511 if (res.is_error() && sentSomething) {
513 }
514
515 // We've most likely at least partially unblocked someone, let people know
516 if (sentSomething) {
517 broker->readSignal.notify_all();
518
519 if (allocated) {
520 broker->senderSignal.notify_all();
521 }
522 }
523 return res;
524 }
525
534 ensure(broker.valid(), "NOT INITIALIZED");
535 auto lock = std::unique_lock(broker->mux, std::try_to_lock_t{});
536 if (!lock.owns_lock()) {
538 }
539
540 if (broker->closed) {
542 }
543 {
544 // pre-check that we won't block
545 auto iter = broker->inboxes.ptr_iter();
546 Queue<T> *cur;
547 while (iter.next().copy_if_present(cur)) {
548 if (cur->is_full()) {
550 }
551 }
552 }
553 {
554 // Now that we're good, send
555 auto iter = broker->inboxes.ptr_iter();
556 Queue<T> *cur;
557 while (iter.next().copy_if_present(cur)) {
558 ensure(cur->push(message).is_success(), "PRE CHECK FAILED");
559 }
560 broker->readSignal.notify_all();
561 }
562 return success();
563 }
564
575 ensure(broker.valid(), "NOT INITIALIZED");
576 auto lock = std::unique_lock(broker->mux, std::try_to_lock_t{});
577 if (!lock.owns_lock()) {
579 }
580
581 if (broker->closed) {
583 }
584
585 auto iter = broker->inboxes.ptr_iter();
586 bool allocated = false;
587 bool sentSomething = false;
589 if (Queue<T> cur; iter.next().copy_if_present(cur)) {
590 allocated |= cur.is_full();
591 if (auto push = cur.push(alloc, message); push.is_error()) {
593 }
594 else {
595 sentSomething = true;
596 }
597 }
598
599 if (res.is_error() && sentSomething) {
601 }
602
603 if (sentSomething) {
604 broker->readSignal.notify_all();
605 if (allocated) {
606 broker->senderSignal.notify_all();
607 }
608 }
609 return res;
610 }
611 };
612
613 static_assert(is_sender<Publisher<int>>, "PUBLISHER ISN'T A SENDER");
614 static_assert(is_sender_grow<Publisher<int>>, "PUBLISHER ISN'T A GROWABLE SENDER");
615 static_assert(is_sender_block<Publisher<int>>, "PUBLISHER ISN'T A SENDER");
616 static_assert(is_sender_block_grow<Publisher<int>>, "PUBLISHER ISN'T A GROWABLE SENDER");
617 static_assert(is_sender_before<Publisher<int>>, "PUBLISHER ISN'T A SENDER");
618 static_assert(is_sender_before_grow<Publisher<int>>, "PUBLISHER ISN'T A GROWABLE SENDER");
619 static_assert(is_closeable<Publisher<int>>, "PUBLISHER ISN'T CLOSEABLE");
620} // namespace mtcore::thread
621
622#endif // MTCORE_THREAD_PUB_SUB_HPP
#define print_debug(fmt,...)
Prints a debug message when in debug mode.
constexpr auto nullopt
Placeholder value for an empty Optional.
Definition optional.hpp:409
ChannelReceiveBeforeErrors
Errors for when timed receives fail Note: If the channel is closed, a nullopt will be returned instea...
Definition channels.hpp:89
PubSubSendBeforeGrowsError
Errors for when timed blocking publishing fails.
Definition pub_sub.hpp:92
ChannelTryReceiveErrors
Errors for when non-blocking receives fail Note: If the channel is closed, a nullopt will be returned...
Definition channels.hpp:79
AllocationError
Error indicating failed allocation.
PubSubSendBlockGrowErrors
Errors for when blocking publishing fails.
Definition pub_sub.hpp:85
PubSubTrySendGrowsError
Errors for when non-blocking publishing fails.
Definition pub_sub.hpp:99
#define ensure(check,...)
Ensures that a check holds true, aborts the program if not true Will print error if the condition is ...
Success< void > success()
Creates a successful void Result object.
Definition result.hpp:398
Error< Underlying > error(Underlying err)
Creates an error.
Definition result.hpp:425
constexpr bool is_receiver_block
Checks if a type can be received from (blocking)
constexpr bool is_sender_before_grow
Checks if a type can be sent to (blocking, timeout) with growing.
constexpr bool is_receiver
Checks if a type can be received from.
constexpr bool is_sender
Checks if a type can be sent to.
constexpr bool has_closed_error
Checks if an error type has a recognized closed error.
constexpr bool is_sender_before
Checks if a type can be sent to (blocking, timeout)
constexpr bool is_sender_block
Checks if a type can be sent to (blocking)
constexpr bool is_sender_block_grow
Checks if a type can be sent to (blocking) with growing.
constexpr bool is_closeable
Checks if a type is closeable.
constexpr bool is_receiver_before
Checks if a type can be received from (blocking, timeout)
constexpr bool is_sender_grow
Checks if a type can be sent to with growing.
Generic iterator defaults built on common contracts Does not guarantee performance of iterators Actua...
Definition iter.hpp:91
Thread-related namespace The methods and classes provided by this class are thread-safe Classes and m...
Represents a memory allocator Exact behavior depends on the underlying VTable used Should use the a_*...
Handle to an item in the list.
Definition gen_list.hpp:36
Represents a value that may or may not exist (an "Optional" value) Similar concept to std::optional,...
Definition optional.hpp:235
FIFO Queue (First In, First Out) Dynamically allocated, can be resized.
Definition queue.hpp:37
bool is_full() const noexcept
Checks if ring buffer is full.
Definition queue.hpp:71
Result< void, AllocationError > init(Allocator &alloc, size_t capacity=10)
Initializes Queue with capacity.
Definition queue.hpp:48
Result< void, CollectionAddNoAllocationError > push(const T &elem) noexcept
Tries to add an element to the Queue Fails if Queue is empty.
Definition queue.hpp:119
void deinit(Allocator &alloc)
Cleans up allocated memory.
Definition queue.hpp:65
Represents a Result that may have an error (error code) or a success value A type of "void" means the...
Definition result.hpp:170
bool is_error() const noexcept
Checks if is an error Result.
Definition result.hpp:266
Automic Reference Count.
Definition arc.hpp:190
Represents a pub/sub publisher which can publish messages of type T Supports blocking and non-blockin...
Definition pub_sub.hpp:253
Result< void, ChannelSendBeforeErrors > send_before(const T &message, const std::chrono::time_point< Clock, Duration > &timePoint)
Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantee...
Definition pub_sub.hpp:430
Result< void, ChannelSendBlockErrors > send_block(const T &message)
Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantee...
Definition pub_sub.hpp:337
Result< void, PubSubSendBlockGrowErrors > send_block(Allocator &alloc, const T &message)
Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough...
Definition pub_sub.hpp:382
void deinit(Allocator &alloc)
Closes a pub/sub channel and dereferences broker Broker will be cleaned up once all subscribers are c...
Definition pub_sub.hpp:295
Result< void, PubSubTrySendGrowsError > try_send(Allocator &alloc, const T &message)
Sends a message to all subscribers Will not block.
Definition pub_sub.hpp:574
Result< Subscription< T >, AllocationError > subscribe()
Creates a new Subscription to pub/sub channel.
Definition pub_sub.hpp:307
void close()
Closes a pub/sub channel All subscribers will sbe notified of closure.
Definition pub_sub.hpp:280
Result< void, AllocationError > init(Allocator &alloc, size_t queueSize=10)
Initializes a publisher.
Definition pub_sub.hpp:264
Result< void, PubSubSendBeforeGrowsError > send_before(Allocator &alloc, const T &message, const std::chrono::time_point< Clock, Duration > &timePoint)
Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough...
Definition pub_sub.hpp:484
Arc< impl::Broker< T > > broker
Definition pub_sub.hpp:256
Result< void, PubSubTrySendGrowsError > try_send(const T &message)
Sends a message to all subscribers Will not block.
Definition pub_sub.hpp:533
Represents a pub/sub subscription which can receive messages of type T Under the hood,...
Definition pub_sub.hpp:103
Optional< T > receive_block()
Blocks until a message is ready.
Definition pub_sub.hpp:142
Result< Optional< T >, ChannelReceiveBeforeErrors > receive_before(const std::chrono::time_point< Clock, Duration > &timePoint)
Blocks until a message is ready, or erros if a timeout occurs.
Definition pub_sub.hpp:173
void deinit(Allocator &alloc)
Deinitializes and cancels Subscription Note: This will cancel all copies of this particular Subscript...
Definition pub_sub.hpp:207
Result< Optional< T >, ChannelTryReceiveErrors > try_receive()
Tries to receive a message without blocking.
Definition pub_sub.hpp:111
Subscription(const Arc< impl::Broker< T > > &broker, const Handle &handle)
Definition pub_sub.hpp:224