MT Core (C++)
Core library for replacing C++ standard in project usage
Loading...
Searching...
No Matches
mtcore::thread::Subscription< T > Struct Template Reference

Represents a pub/sub subscription which can receive messages of type T Under the hood, each publisher maintains a message broker which stores all messages Subscribers will hold an Arc to the broker and will retrieve messages from the broker Due to the broker being shared, it does require more locks for both reads and writes This means it has a pretty low throughput when there is lots of contention The primary advantage is it has a much more convenient API for more non-blocking operations or timed blocks The reason this happens is that publishers are given greater insight into each subscription, and can better avoid partial writes. More...

#include <pub_sub.hpp>

Public Types

using Message = T
 

Public Member Functions

Result< Optional< T >, ChannelTryReceiveErrorstry_receive ()
 Tries to receive a message without blocking.
 
Optional< T > receive_block ()
 Blocks until a message is ready.
 
template<typename Clock, typename Duration>
Result< Optional< T >, ChannelReceiveBeforeErrorsreceive_before (const std::chrono::time_point< Clock, Duration > &timePoint)
 Blocks until a message is ready, or erros if a timeout occurs.
 
void deinit (Allocator &alloc)
 Deinitializes and cancels Subscription Note: This will cancel all copies of this particular Subscription.
 
void close ()
 
 Subscription ()=default
 
 Subscription (const Arc< impl::Broker< T > > &broker, const Handle &handle)
 

Detailed Description

template<typename T>
struct mtcore::thread::Subscription< T >

Represents a pub/sub subscription which can receive messages of type T Under the hood, each publisher maintains a message broker which stores all messages Subscribers will hold an Arc to the broker and will retrieve messages from the broker Due to the broker being shared, it does require more locks for both reads and writes This means it has a pretty low throughput when there is lots of contention The primary advantage is it has a much more convenient API for more non-blocking operations or timed blocks The reason this happens is that publishers are given greater insight into each subscription, and can better avoid partial writes.

Partial sends may still happen, but they only happen if an allocating send function is used, and an allocation fails to grow the size of one of the channels.

Meant for use with select()

Template Parameters
TMessage Type

Definition at line 103 of file pub_sub.hpp.

Member Typedef Documentation

◆ Message

template<typename T>
using mtcore::thread::Subscription< T >::Message = T

Definition at line 104 of file pub_sub.hpp.

Constructor & Destructor Documentation

◆ Subscription() [1/2]

template<typename T>
mtcore::thread::Subscription< T >::Subscription ( )
default

◆ Subscription() [2/2]

template<typename T>
mtcore::thread::Subscription< T >::Subscription ( const Arc< impl::Broker< T > > & broker,
const Handle & handle )
inline

Definition at line 224 of file pub_sub.hpp.

224: broker(broker), inboxHandle(handle) {}
Represents a pub/sub subscription which can receive messages of type T Under the hood,...
Definition pub_sub.hpp:103

Member Function Documentation

◆ close()

template<typename T>
void mtcore::thread::Subscription< T >::close ( )
inline

Definition at line 220 of file pub_sub.hpp.

220{ deinit(); }
void deinit(Allocator &alloc)
Deinitializes and cancels Subscription Note: This will cancel all copies of this particular Subscript...
Definition pub_sub.hpp:207
Here is the call graph for this function:

◆ deinit()

template<typename T>
void mtcore::thread::Subscription< T >::deinit ( Allocator & alloc)
inline

Deinitializes and cancels Subscription Note: This will cancel all copies of this particular Subscription.

Definition at line 207 of file pub_sub.hpp.

207 {
208 if (!broker.valid()) {
209 return;
210 }
211 {
212 auto lock = std::unique_lock(broker->mux);
213 broker->inboxes[inboxHandle].deinit(*broker->alloc);
214 broker->inboxes.remove(inboxHandle);
215 broker->senderSignal.notify_all();
216 }
217 broker.release(alloc);
218 }
Here is the caller graph for this function:

◆ receive_before()

template<typename T>
template<typename Clock, typename Duration>
Result< Optional< T >, ChannelReceiveBeforeErrors > mtcore::thread::Subscription< T >::receive_before ( const std::chrono::time_point< Clock, Duration > & timePoint)
inline

Blocks until a message is ready, or erros if a timeout occurs.

Returns
Message (if present), nullopt if pub/sub closed or subscription cancelled, or error on timeout

Definition at line 173 of file pub_sub.hpp.

173 {
174 if (!broker.valid()) {
175 return success(Optional<T>{nullopt});
176 }
177
178 auto lock = std::unique_lock(broker->mux, timePoint);
179 if (!lock.owns_lock()) {
181 }
182
183 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
184 return success(Optional<T>{nullopt});
185 }
186
187 do {
188 if (auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
189 broker->senderSignal.notify_all();
190 return res;
191 }
192
193 if (broker->closed) {
194 return success(Optional<T>{nullopt});
195 }
196
197 if (broker->readSignal.wait_until(lock, timePoint) == std::cv_status::timeout) {
199 }
200 } while (true);
201 }
Success< void > success()
Creates a successful void Result object.
Definition result.hpp:398
Error< Underlying > error(Underlying err)
Creates an error.
Definition result.hpp:425
Here is the call graph for this function:

◆ receive_block()

template<typename T>
Optional< T > mtcore::thread::Subscription< T >::receive_block ( )
inline

Blocks until a message is ready.

Returns
Message (if present), nullopt if pub/sub closed or subscription cancelled

Definition at line 142 of file pub_sub.hpp.

142 {
143 if (!broker.valid()) {
144 return Optional<T>{nullopt};
145 }
146
147 auto lock = std::unique_lock(broker->mux);
148
149 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
150 return Optional<T>{nullopt};
151 }
152
153 do {
154 if (auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
155 broker->senderSignal.notify_all();
156 return res;
157 }
158
159 if (broker->closed) {
160 return Optional<T>{nullopt};
161 }
162 broker->readSignal.wait(lock);
163 } while (true);
164 }

◆ try_receive()

template<typename T>
Result< Optional< T >, ChannelTryReceiveErrors > mtcore::thread::Subscription< T >::try_receive ( )
inline

Tries to receive a message without blocking.

Returns
Message (if present), nullopt if pub/sub closed or subscription cancelled, or error if had to block

Definition at line 111 of file pub_sub.hpp.

111 {
112 if (!broker.valid()) {
113 return success(Optional<T>{nullopt});
114 }
115
116 auto lock = std::unique_lock(broker->mux, std::try_to_lock_t{});
117 if (!lock.owns_lock()) {
119 }
120
121 if (!broker->inboxes.is_handle_valid(inboxHandle)) {
122 return success(Optional<T>{nullopt});
123 }
124
125 if (auto res = broker->inboxes[inboxHandle].pop(); res.has_value()) {
126 broker->senderSignal.notify_all();
127 return res;
128 }
129
130 if (broker->closed) {
131 return success(Optional<T>{nullopt});
132 }
133
135 }
Here is the call graph for this function:

The documentation for this struct was generated from the following file: