19#ifndef MTCORE_MTTHREAD_INBOX_HPP
20#define MTCORE_MTTHREAD_INBOX_HPP
78 std::timed_mutex channelMux = {};
79 std::condition_variable_any read_cond = {};
80 std::condition_variable_any send_cond = {};
93 auto lock = std::unique_lock(channelMux);
94 ensure(!initialized(),
"Already initialized! memory leak detected!");
95 return q.init(alloc, initCapacity);
105 auto lock = std::unique_lock(channelMux);
107 ensure(!initialized(),
"CLEAN UP FAILED!");
115 auto lock = std::unique_lock(channelMux);
116 ensure(initialized(),
"NOT INITIALIZED!");
118 send_cond.notify_all();
119 read_cond.notify_all();
129 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
130 ensure(initialized(),
"NOT INITIALIZED!");
132 if (!lock.owns_lock()) {
145 read_cond.notify_one();
156 auto lock = std::unique_lock(channelMux);
157 ensure(lock.owns_lock(),
"DID NOT LOCK");
158 ensure(initialized(),
"NOT INITIALIZED!");
160 while (!closed && q.is_full()) {
161 send_cond.wait(lock);
168 read_cond.notify_one();
180 template<
typename Clock = std::chrono::steady_clock,
typename Duration>
182 send_before(
const T &msg,
const std::chrono::time_point<Clock, Duration> &when) {
183 auto lock = std::unique_lock(channelMux, when);
184 if (!lock.owns_lock()) {
187 ensure(initialized(),
"NOT INITIALIZED!");
189 while (!closed && q.is_full()) {
190 if (send_cond.wait_until(lock, when) == std::cv_status::timeout) {
199 read_cond.notify_one();
212 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
214 if (!lock.owns_lock()) {
217 ensure(initialized(),
"NOT INITIALIZED!");
223 bool emit_notice = q.is_full();
224 auto res = q.push(alloc, msg);
225 if (res.is_error()) {
228 read_cond.notify_one();
232 send_cond.notify_all();
246 auto lock = std::unique_lock(channelMux);
247 ensure(lock.owns_lock(),
"DID NOT LOCK");
248 ensure(initialized(),
"NOT INITIALIZED!");
250 while (!closed && q.is_full()) {
251 send_cond.wait(lock);
257 bool emit_notice = q.is_full();
258 auto res = q.push(alloc, msg);
259 if (res.is_error()) {
262 while (!is_closed() && q.is_full()) {
263 send_cond.wait(lock);
272 if (
auto err = q.push(msg); err.is_error()) {
276 read_cond.notify_one();
280 send_cond.notify_all();
295 template<
typename Clock = std::chrono::steady_clock,
typename Duration>
298 auto lock = std::unique_lock(channelMux, when);
299 if (!lock.owns_lock()) {
302 ensure(initialized(),
"NOT INITIALIZED!");
308 bool emit_notice = q.is_full();
310 auto res = q.push(alloc, msg);
311 if (res.is_error()) {
315 while (!is_closed() && q.is_full()) {
316 if (send_cond.wait_until(lock, when) == std::cv_status::timeout) {
327 if (
auto err = q.push(msg); err.is_error()) {
331 read_cond.notify_one();
335 send_cond.notify_all();
349 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
351 if (!lock.owns_lock()) {
354 ensure(initialized(),
"NOT INITIALIZED!");
363 ensure(lock.owns_lock(),
"DID NOT LOCK!");
373 auto lock = std::unique_lock(channelMux);
374 ensure(initialized(),
"NOT INITIALIZED!");
376 while (!closed && q.is_empty()) {
377 read_cond.wait(lock);
380 ensure(lock.owns_lock(),
"DID NOT LOCK");
392 template<
typename Clock,
typename Duration>
395 auto lock = std::unique_lock(channelMux, when);
397 if (!lock.owns_lock()) {
400 ensure(initialized(),
"NOT INITIALIZED!");
402 while (!closed && q.is_empty()) {
403 if (read_cond.wait_until(lock, when) == std::cv_status::timeout) {
408 ensure(lock.owns_lock(),
"DID NOT LOCK");
418 auto lock = std::unique_lock(channelMux);
419 return initialized();
428 auto lock = std::unique_lock(channelMux);
429 ensure(lock.owns_lock(),
"DID NOT LOCK");
430 ensure(initialized(),
"NOT INITIALIZED!");
432 q.shrink_to_fit(alloc);
441 send_cond.notify_one();
445 [[nodiscard]]
bool initialized() const noexcept {
return q.
is_initialized(); }
447 [[nodiscard]]
bool is_closed() const noexcept {
return closed; }
constexpr auto nullopt
Placeholder value for an empty Optional.
ChannelGrowTrySendErrors
Errors for when trying to do non-blocking send (with possible allocation) fails.
ChannelSendBeforeErrors
Errors for when timed blocking sends fail.
ChannelGrowSendBlockErrors
Errors for when trying to do blocking send (with possible allocation) fails.
ChannelTryReceiveErrors
Errors for when non-blocking receives fail Note: If the channel is closed, a nullopt will be returned...
ChannelGrowSendBeforeErrors
Errors for when trying to do timed blocking send (with possible allocation) fails.
#define ensure(check,...)
Ensures that a check holds true, aborts the program if not true Will print error if the condition is ...
Success< void > success()
Creates a successful void Result object.
Error< Underlying > error(Underlying err)
Creates an error.
constexpr bool is_channel_like_grow
Checks if a type is growing channel-like.
Thread-related namespace The methods and classes provided by this class are thread-safe Classes and m...
Represents a memory allocator Exact behavior depends on the underlying VTable used Should use the a_*...
Represents a value that may or may not exist (an "Optional" value) Similar concept to std::optional,...
FIFO Queue (First In, First Out) Dynamically allocated, can be resized.
Optional< T > pop() noexcept
Removes and returns the next element in the Queue.
bool is_initialized() const noexcept
Checks if an Queue is initialized.
bool is_empty() const noexcept
Checks if ring buffer is empty.
Represents a Result that may have an error (error code) or a success value A type of "void" means the...
Inter-thread communication primitive to send messages between threads Has a growable Queue.
Result< void, ChannelGrowSendBlockErrors > send_block(Allocator &alloc, const T &msg)
Send message and block indefinitely Will grow inbox Queue if needed If unable to grow,...
bool is_initialized() const noexcept
Checks if the inbox is initialized inboxes which aren't initialized will abort if used.
Result< Optional< T >, ChannelSendBeforeErrors > receive_before(const std::chrono::time_point< Clock, Duration > &when)
Receives a message before a time point Blocks until a time point.
Result< void, ChannelSendBeforeErrors > send_before(const T &msg, const std::chrono::time_point< Clock, Duration > &when)
Send message before a timeout Will NOT grow inbox Queue and will instead block.
Result< Optional< T >, ChannelTryReceiveErrors > try_receive()
Tries to receive a message Will fail if it would have to block If the channel is empty and closed,...
Result< void, ChannelSendBlockErrors > send_block(const T &msg)
Send message, will block indefinitely Will NOT grow inbox Queue and will instead block.
Optional< T > receive_block()
Receives a message from the channel Blocks indefinitely.
void shrinkToFit(Allocator &alloc)
Shrinks Queue to fit current Queue size Useful to lower memory usage.
void close()
Closes an inbox.
Result< void, AllocationError > init(Allocator &alloc, size_t initCapacity=10)
Initializes a new message Queue.
Result< void, ChannelGrowTrySendErrors > try_send(Allocator &alloc, const T &msg)
Send message without blocking Will grow inbox Queue if needed If unable to grow, will return an error...
void deinit(Allocator &alloc)
Cleans up memory Note: All references to the inbox MUST be cleaned up BEFORE calling this method.
Result< void, ChannelTrySendErrors > try_send(const T &msg)
Send message without blocking Will NOT grow inbox Queue and will instead fail.
Result< void, ChannelGrowSendBeforeErrors > send_before(Allocator &alloc, const T &msg, const std::chrono::time_point< Clock, Duration > &when)
Send message before a timeout Will grow inbox Queue if needed If unable to grow, will instead block.