MT Core (C++)
Core library for replacing C++ standard in project usage
Loading...
Searching...
No Matches
mtcore::thread::broadcaster< T > Struct Template Reference

A broadcaster is able to send messages to multiple channels, with each channel having its own lock Sending messages requires locking the entire broadcaster. More...

#include <broadcaster.hpp>

Collaboration diagram for mtcore::thread::broadcaster< T >:

Public Member Functions

Result< void, AllocationErrorinit (Allocator &alloc, size_t initCapacity=64)
 Initializes a broadcaster.
 
Result< void, ChannelSendBlockErrorssend_block (Allocator &alloc, const T &msg)
 Blocks while sending data.
 
Result< BroadcastSubscription< T >, BroadcastSubscribeErrorsubscribe (Allocator &alloc)
 Subscribes to a broadcaster Will create a new inbox for the subscription.
 
void close ()
 Closes the broadcaster which prevents messages from being sent (which aren't already being sent) and it also prevents new subscriptions from being created It will also close all existing subscriptions, which can result in partial writes (e.g.
 
void deinit (Allocator &alloc)
 Will close the broadcaster and will also clean up the broadcaster's references to each subscription (a single Arc.release()) Will also close any subscriptions that didn't get cleaned up.
 

Public Attributes

GenList< BroadcastSubscription< T > > channels
 
std::mutex mux
 
bool closed = false
 

Detailed Description

template<typename T>
struct mtcore::thread::broadcaster< T >

A broadcaster is able to send messages to multiple channels, with each channel having its own lock Sending messages requires locking the entire broadcaster.

However, reading messages allows for many separate locks. This makes it ideal for scenarios with one publisher and many subscribers.

Note that there is less insight into the state of each channel as they are locked separately. Because of this, only blocking sends and blocking receives are allowed, as acquiring the lock and waiting is the only way to really prevent partial sends.

So, while we should have better read throughput than the Publisher/Subscription flow, we have less flexibility in the API and only have blocking operations. This also means we cannot use Broadcaster with select()

Locks:

  • 1 lock for entire structure
  • 1 lock per channel

Atomic Reference Counting:

  • 1 per channel

Condition Variables:

  • 2 condition variables per channel

Non-Blocking Availability:

  • None

Known Race Conditions (aka. designed in):

  • If a send is started, the broadcaster may get closed or denitialized
    • Planned Behavior: Channels will get closed and may prevent the send from completing successfully. The send may fail and enter a partial send state
Template Parameters
TMessage Type

Definition at line 103 of file broadcaster.hpp.

Member Function Documentation

◆ close()

template<typename T>
void mtcore::thread::broadcaster< T >::close ( )
inline

Closes the broadcaster which prevents messages from being sent (which aren't already being sent) and it also prevents new subscriptions from being created It will also close all existing subscriptions, which can result in partial writes (e.g.

a message is blocked after sending to half of the subscriptions)

Definition at line 201 of file broadcaster.hpp.

201 {
202 auto l = std::unique_lock{mux};
203 closed = true;
205 auto iter = channels.iter();
206 while (iter.next().copy_if_present(cur)) {
207 if (cur.inbox.valid()) {
208 cur.inbox->close();
209 }
210 }
211 }
A broadcaster is able to send messages to multiple channels, with each channel having its own lock Se...
GenList< BroadcastSubscription< T > > channels
void close()
Closes the broadcaster which prevents messages from being sent (which aren't already being sent) and ...

◆ deinit()

template<typename T>
void mtcore::thread::broadcaster< T >::deinit ( Allocator & alloc)
inline

Will close the broadcaster and will also clean up the broadcaster's references to each subscription (a single Arc.release()) Will also close any subscriptions that didn't get cleaned up.

Parameters
allocAllocator to free memory with

Definition at line 219 of file broadcaster.hpp.

219 {
220 auto l = std::unique_lock{mux};
221 closed = true;
223 auto iter = channels.iter();
224 while (iter.next().copy_if_present(cur)) {
226 }
227 channels.deinit(alloc);
228 }
void deinit(Allocator &alloc)
Will close the broadcaster and will also clean up the broadcaster's references to each subscription (...
Here is the call graph for this function:

◆ init()

template<typename T>
Result< void, AllocationError > mtcore::thread::broadcaster< T >::init ( Allocator & alloc,
size_t initCapacity = 64 )
inline

Initializes a broadcaster.

Parameters
allocAllocator for allocating memory
initCapacityInitial capacity
Returns

Definition at line 114 of file broadcaster.hpp.

114 {
115 return channels.init(alloc, initCapacity);
116 }

◆ send_block()

template<typename T>
Result< void, ChannelSendBlockErrors > mtcore::thread::broadcaster< T >::send_block ( Allocator & alloc,
const T & msg )
inline

Blocks while sending data.

Will grow inboxes as needed, and will free closed subscriptions as needed

Parameters
allocAllocator to use for growing and closing
msgMessage to send
Returns
Errors if they happened

Definition at line 124 of file broadcaster.hpp.

124 {
125 // We need the lock since we'll be iterating and removing stuff
126 auto l = std::unique_lock{mux};
127
128 if (closed) {
130 }
131
133 auto iter = channels.handles();
134 while (iter.next().copy_if_present(handle)) {
135 auto &e = channels[handle];
136 if (e.inbox.valid()) {
137 if (auto res = e.inbox->send_block(alloc, msg); res.is_error()) {
138 // If we failed allocation, try a non-allocating send
140 // We can still fail if our channel is closed. In this case, remove the channel
141 if (auto res2 = e.inbox->send_block(msg); res2.is_error()) {
142 e.inbox.release(alloc);
143 channels.remove(handle);
144 }
145 }
146 // otherwise, our channel is closed, just remove the channel
147 else {
148 e.inbox.release(alloc);
149 channels.remove(handle);
150 }
151 }
152 }
153 else {
154 // clean up and remove the channel
155 e.deinit(alloc);
156 channels.remove(handle);
157 }
158 }
159 return success();
160 }
Success< void > success()
Creates a successful void Result object.
Definition result.hpp:398
Error< Underlying > error(Underlying err)
Creates an error.
Definition result.hpp:425
Result< void, ChannelSendBlockErrors > send_block(Allocator &alloc, const T &msg)
Blocks while sending data.
Here is the call graph for this function:

◆ subscribe()

template<typename T>
Result< BroadcastSubscription< T >, BroadcastSubscribeError > mtcore::thread::broadcaster< T >::subscribe ( Allocator & alloc)
inline

Subscribes to a broadcaster Will create a new inbox for the subscription.

Parameters
allocAllocator to use for creating the subscription
Returns
the subscription on success or an error on failure

Definition at line 168 of file broadcaster.hpp.

168 {
170 if (auto subRes = sub.init(alloc); subRes.is_error()) {
172 }
173
174 // We need the lock since we'll be removing stuff
175 {
176 auto l = std::unique_lock{mux};
177 if (closed) {
180 }
181
182 if (auto addRes = channels.add(alloc, sub); addRes.is_error()) {
185 }
186 }
187 auto acquireRes = sub.inbox.acquire(alloc);
188 if (acquireRes.is_error()) {
191 }
192 return BroadcastSubscription<T>{acquireRes.value()};
193 }
Result< void, AllocationError > init(Allocator &alloc, size_t initCapacity=64)
Initializes a broadcaster.
Here is the call graph for this function:

Member Data Documentation

◆ channels

template<typename T>
GenList<BroadcastSubscription<T> > mtcore::thread::broadcaster< T >::channels

Definition at line 104 of file broadcaster.hpp.

◆ closed

template<typename T>
bool mtcore::thread::broadcaster< T >::closed = false

Definition at line 106 of file broadcaster.hpp.

◆ mux

template<typename T>
std::mutex mtcore::thread::broadcaster< T >::mux

Definition at line 105 of file broadcaster.hpp.


The documentation for this struct was generated from the following file: