MT Core (C++)
Core library for replacing C++ standard in project usage
Loading...
Searching...
No Matches
mtcore::thread::Publisher< T > Struct Template Reference

Represents a pub/sub publisher which can publish messages of type T Supports blocking and non-blocking publishing. More...

#include <pub_sub.hpp>

Collaboration diagram for mtcore::thread::Publisher< T >:

Public Types

using Message = T
 

Public Member Functions

Result< void, AllocationErrorinit (Allocator &alloc, size_t queueSize=10)
 Initializes a publisher.
 
void close ()
 Closes a pub/sub channel All subscribers will sbe notified of closure.
 
void deinit (Allocator &alloc)
 Closes a pub/sub channel and dereferences broker Broker will be cleaned up once all subscribers are cleaned up Note: all threads that are publishing should be joined prior to calling this method.
 
Result< Subscription< T >, AllocationErrorsubscribe ()
 Creates a new Subscription to pub/sub channel.
 
Result< void, ChannelSendBlockErrorssend_block (const T &message)
 Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantees a message is delivered to all or blocks indefinitely.
 
Result< void, PubSubSendBlockGrowErrorssend_block (Allocator &alloc, const T &message)
 Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough room If a Queue cannot be increased, then the message will be dropped for that subscriber.
 
template<typename Clock, typename Duration>
Result< void, ChannelSendBeforeErrorssend_before (const T &message, const std::chrono::time_point< Clock, Duration > &timePoint)
 Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantees a message is delivered to all or blocks It will timeout it the current time exceeds the provided time point.
 
template<typename Clock, typename Duration>
Result< void, PubSubSendBeforeGrowsErrorsend_before (Allocator &alloc, const T &message, const std::chrono::time_point< Clock, Duration > &timePoint)
 Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough room If a Queue cannot be increased, then the message will be dropped for that subscriber Will timeout if cannot send in time.
 
Result< void, PubSubTrySendGrowsErrortry_send (const T &message)
 Sends a message to all subscribers Will not block.
 
Result< void, PubSubTrySendGrowsErrortry_send (Allocator &alloc, const T &message)
 Sends a message to all subscribers Will not block.
 

Public Attributes

Arc< impl::Broker< T > > broker
 

Detailed Description

template<typename T>
struct mtcore::thread::Publisher< T >

Represents a pub/sub publisher which can publish messages of type T Supports blocking and non-blocking publishing.

Locks:

  • 1 for entire data structure and all subscriptions (single lock)

Condition Variables:

  • 2 condition variables (one to send, one to receive)

Non-Blocking Availability:

  • With Failure
Template Parameters
TMessage Type

Definition at line 253 of file pub_sub.hpp.

Member Typedef Documentation

◆ Message

template<typename T>
using mtcore::thread::Publisher< T >::Message = T

Definition at line 254 of file pub_sub.hpp.

Member Function Documentation

◆ close()

template<typename T>
void mtcore::thread::Publisher< T >::close ( )
inline

Closes a pub/sub channel All subscribers will sbe notified of closure.

Definition at line 280 of file pub_sub.hpp.

280 {
281 auto lock = std::unique_lock(broker->mux);
282
283 // close out all our inboxes
284 broker->closed = true;
285 broker->senderSignal.notify_all();
286 broker->readSignal.notify_all();
287 }
Represents a pub/sub publisher which can publish messages of type T Supports blocking and non-blockin...
Definition pub_sub.hpp:253
Arc< impl::Broker< T > > broker
Definition pub_sub.hpp:256
Here is the caller graph for this function:

◆ deinit()

template<typename T>
void mtcore::thread::Publisher< T >::deinit ( Allocator & alloc)
inline

Closes a pub/sub channel and dereferences broker Broker will be cleaned up once all subscribers are cleaned up Note: all threads that are publishing should be joined prior to calling this method.

Definition at line 295 of file pub_sub.hpp.

295 {
296 if (!broker.valid()) {
297 return;
298 }
299 close();
300 broker.deinit(alloc);
301 }
void close()
Closes a pub/sub channel All subscribers will sbe notified of closure.
Definition pub_sub.hpp:280
Here is the call graph for this function:

◆ init()

template<typename T>
Result< void, AllocationError > mtcore::thread::Publisher< T >::init ( Allocator & alloc,
size_t queueSize = 10 )
inline

Initializes a publisher.

Parameters
allocAllocates a publisher
queueSizeInitial Queue size
Returns
success or failure

Definition at line 264 of file pub_sub.hpp.

264 {
265 ensure(!broker.valid(), "ALREADY INITIALIZED");
266 if (auto brokerRes = broker.init(alloc, &alloc, queueSize); brokerRes.is_error()) {
267 return brokerRes.error();
268 }
269 if (auto res = broker->inboxes.init(alloc); res.is_error()) {
270 broker.deinit(alloc);
271 return res.error();
272 }
273 return success();
274 }
#define ensure(check,...)
Ensures that a check holds true, aborts the program if not true Will print error if the condition is ...
Success< void > success()
Creates a successful void Result object.
Definition result.hpp:398
Here is the call graph for this function:

◆ send_before() [1/2]

template<typename T>
template<typename Clock, typename Duration>
Result< void, PubSubSendBeforeGrowsError > mtcore::thread::Publisher< T >::send_before ( Allocator & alloc,
const T & message,
const std::chrono::time_point< Clock, Duration > & timePoint )
inline

Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough room If a Queue cannot be increased, then the message will be dropped for that subscriber Will timeout if cannot send in time.

Template Parameters
ClockClock to get time from
DurationDuration of time point
Parameters
allocAllocator for increasing Queue size
messageMessage to send
timePointWhen to time out
Returns
success (sent to all) or failure (sent to none or some)

Definition at line 484 of file pub_sub.hpp.

485 {
486 ensure(broker.valid(), "NOT INITIALIZED");
488 if (!lock.owns_lock()) {
490 }
491
492 if (broker->closed) {
494 }
495
496 auto iter = broker->inboxes.ptr_iter();
497 Queue<T> *cur;
498 bool allocated = false;
499 bool sentSomething = false;
501 while (iter.next().copy_if_present(cur)) {
502 allocated |= cur->is_full();
503 if (!cur->push(alloc, message)) {
505 }
506 else {
507 sentSomething = true;
508 }
509 }
510
511 if (res.is_error() && sentSomething) {
513 }
514
515 // We've most likely at least partially unblocked someone, let people know
516 if (sentSomething) {
517 broker->readSignal.notify_all();
518
519 if (allocated) {
520 broker->senderSignal.notify_all();
521 }
522 }
523 return res;
524 }
Error< Underlying > error(Underlying err)
Creates an error.
Definition result.hpp:425
Here is the call graph for this function:

◆ send_before() [2/2]

template<typename T>
template<typename Clock, typename Duration>
Result< void, ChannelSendBeforeErrors > mtcore::thread::Publisher< T >::send_before ( const T & message,
const std::chrono::time_point< Clock, Duration > & timePoint )
inline

Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantees a message is delivered to all or blocks It will timeout it the current time exceeds the provided time point.

Template Parameters
ClockClock to get time from
DurationDuration of time point
Parameters
messageMessage to send
timePointWhen to time out
Returns
success or faulre

Definition at line 430 of file pub_sub.hpp.

431 {
432 ensure(broker.valid(), "NOT INITIALIZED");
434 if (!lock.owns_lock()) {
436 }
437
438 if (broker->closed) {
440 }
441
442 bool ready;
443 do {
444 ready = true;
445 // pre-check that we won't error out when adding
446 auto iter = broker->inboxes.ptr_iter();
447 Queue<T> *cur;
448 while (iter.next().copy_if_present(cur)) {
449 if (cur->is_full()) {
450 ready = false;
451 }
452 }
453 if (!ready) {
454 if (broker->senderSignal.wait_until(lock, timePoint) == std::cv_status::timeout) {
456 }
457 }
458 } while (!ready);
459 {
460 // Now that we're good, send
461 auto iter = broker->inboxes.ptr_iter();
462 Queue<T> *cur;
463 while (iter.next().copy_if_present(cur)) {
464 ensure(cur->push(message).is_success(), "PRE CHECK FAILED");
465 }
466 broker->readSignal.notify_all();
467 }
468 return success();
469 }
Here is the call graph for this function:

◆ send_block() [1/2]

template<typename T>
Result< void, PubSubSendBlockGrowErrors > mtcore::thread::Publisher< T >::send_block ( Allocator & alloc,
const T & message )
inline

Sends a message to all subscribers Will try to increase subscriber Queue sizes if there is not enough room If a Queue cannot be increased, then the message will be dropped for that subscriber.

Parameters
allocAllocator for increasing Queue size
messageMessage to send
Returns
success (sent to all) or failure (sent to none or some)

Definition at line 382 of file pub_sub.hpp.

382 {
383 ensure(broker.valid(), "NOT INITIALIZED");
384 auto lock = std::unique_lock(broker->mux);
385
386 if (broker->closed) {
388 }
389
390 auto iter = broker->inboxes.ptr_iter();
391 Queue<T> *cur;
392 bool allocated = false;
393 bool sentSomething = false;
395 while (iter.next().copy_if_present(cur)) {
396 allocated |= cur->is_full();
397 if (!cur->push(alloc, message)) {
399 }
400 else {
401 sentSomething = true;
402 }
403 }
404
405 if (res.is_error() && sentSomething) {
407 }
408
409 if (sentSomething) {
410 broker->readSignal.notify_all();
411 if (allocated) {
412 broker->senderSignal.notify_all();
413 }
414 }
415 return res;
416 }
Here is the call graph for this function:

◆ send_block() [2/2]

template<typename T>
Result< void, ChannelSendBlockErrors > mtcore::thread::Publisher< T >::send_block ( const T & message)
inline

Sends a message to all subscribers Will block if needed to ensure all subscribers get Queue Guarantees a message is delivered to all or blocks indefinitely.

Parameters
messageMessage to send
Returns
success or failure

Definition at line 337 of file pub_sub.hpp.

337 {
338 ensure(broker.valid(), "NOT INITIALIZED");
339 auto lock = std::unique_lock(broker->mux);
340
341 if (broker->closed) {
343 }
344
345 bool ready;
346 do {
347 ready = true;
348 // pre-check that we won't error out when adding
349 auto iter = broker->inboxes.ptr_iter();
350 Queue<T> *cur;
351 while (iter.next().copy_if_present(cur)) {
352 if (cur->is_full()) {
353 ready = false;
354 }
355 }
356 if (!ready) {
357 broker->senderSignal.wait(lock);
358 }
359 } while (!ready);
360
361 {
362 // Now that we're good, send
363 auto iter = broker->inboxes.ptr_iter();
364 Queue<T> *cur;
365 while (iter.next().copy_if_present(cur)) {
366 ensure(cur->push(message).is_success(), "PRE CHECK FAILED");
367 }
368 broker->readSignal.notify_all();
369 }
370 return success();
371 }
Here is the call graph for this function:

◆ subscribe()

template<typename T>
Result< Subscription< T >, AllocationError > mtcore::thread::Publisher< T >::subscribe ( )
inline

Creates a new Subscription to pub/sub channel.

Returns
New subscription (on success) or error (on failure)

Definition at line 307 of file pub_sub.hpp.

307 {
308 ensure(broker.valid(), "NOT INITIALIZED");
309 auto lock = std::unique_lock(broker->mux);
310 Queue<T> q;
311 if (auto res = q.init(*broker->alloc, broker->queueSize); res.is_error()) {
312 return res.error();
313 }
314
315 auto handleRes = broker->inboxes.add(q);
316 if (handleRes.is_error()) {
317 q.deinit(*broker->alloc);
319 }
320
321 auto subRef = broker.acquire(*broker->alloc);
322 if (subRef.is_error()) {
323 broker->inboxes.remove(handleRes.value());
324 q.deinit(*broker->alloc);
326 }
327 return success(Subscription<T>{subRef.value(), *handleRes});
328 }
void deinit(Allocator &alloc)
Closes a pub/sub channel and dereferences broker Broker will be cleaned up once all subscribers are c...
Definition pub_sub.hpp:295
Result< void, AllocationError > init(Allocator &alloc, size_t queueSize=10)
Initializes a publisher.
Definition pub_sub.hpp:264
Here is the call graph for this function:

◆ try_send() [1/2]

template<typename T>
Result< void, PubSubTrySendGrowsError > mtcore::thread::Publisher< T >::try_send ( Allocator & alloc,
const T & message )
inline

Sends a message to all subscribers Will not block.

If blocking is required, will simply fail If there is not enough room, will try to grow a subscriber's Queue If growing fails, will not send to that subscriber

Parameters
allocAllocator to use to grow (if needed)
messageMessage to send
Returns
success or failure (no send or partial send)

Definition at line 574 of file pub_sub.hpp.

574 {
575 ensure(broker.valid(), "NOT INITIALIZED");
577 if (!lock.owns_lock()) {
579 }
580
581 if (broker->closed) {
583 }
584
585 auto iter = broker->inboxes.ptr_iter();
586 bool allocated = false;
587 bool sentSomething = false;
589 if (Queue<T> cur; iter.next().copy_if_present(cur)) {
590 allocated |= cur.is_full();
591 if (auto push = cur.push(alloc, message); push.is_error()) {
593 }
594 else {
595 sentSomething = true;
596 }
597 }
598
599 if (res.is_error() && sentSomething) {
601 }
602
603 if (sentSomething) {
604 broker->readSignal.notify_all();
605 if (allocated) {
606 broker->senderSignal.notify_all();
607 }
608 }
609 return res;
610 }
Here is the call graph for this function:

◆ try_send() [2/2]

template<typename T>
Result< void, PubSubTrySendGrowsError > mtcore::thread::Publisher< T >::try_send ( const T & message)
inline

Sends a message to all subscribers Will not block.

If blocking is required, will simply fail Guarantees a message is delivered to all or not at all

Parameters
messageMessage to send
Returns
success or failure

Definition at line 533 of file pub_sub.hpp.

533 {
534 ensure(broker.valid(), "NOT INITIALIZED");
536 if (!lock.owns_lock()) {
538 }
539
540 if (broker->closed) {
542 }
543 {
544 // pre-check that we won't block
545 auto iter = broker->inboxes.ptr_iter();
546 Queue<T> *cur;
547 while (iter.next().copy_if_present(cur)) {
548 if (cur->is_full()) {
550 }
551 }
552 }
553 {
554 // Now that we're good, send
555 auto iter = broker->inboxes.ptr_iter();
556 Queue<T> *cur;
557 while (iter.next().copy_if_present(cur)) {
558 ensure(cur->push(message).is_success(), "PRE CHECK FAILED");
559 }
560 broker->readSignal.notify_all();
561 }
562 return success();
563 }
Here is the call graph for this function:

Member Data Documentation

◆ broker

template<typename T>
Arc<impl::Broker<T> > mtcore::thread::Publisher< T >::broker

Definition at line 256 of file pub_sub.hpp.


The documentation for this struct was generated from the following file: