MT Core (C++)
Core library for replacing C++ standard in project usage
Loading...
Searching...
No Matches
inbox.hpp
Go to the documentation of this file.
1/*
2
3Copyright 2025 Matthew Tolman
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17*/
18
19#ifndef MTCORE_MTTHREAD_INBOX_HPP
20#define MTCORE_MTTHREAD_INBOX_HPP
21
25#include <mutex>
26
27namespace mtcore::thread {
38
47
57
74 template<typename T>
75 struct Inbox {
76 private:
77 Queue<T> q;
78 std::timed_mutex channelMux = {};
79 std::condition_variable_any read_cond = {};
80 std::condition_variable_any send_cond = {};
81 bool closed = false;
82
83 public:
84 using Message = T;
85
92 Result<void, AllocationError> init(Allocator &alloc, size_t initCapacity = 10) {
93 auto lock = std::unique_lock(channelMux);
94 ensure(!initialized(), "Already initialized! memory leak detected!");
95 return q.init(alloc, initCapacity);
96 }
97
104 void deinit(Allocator &alloc) {
105 auto lock = std::unique_lock(channelMux);
106 q.deinit(alloc);
107 ensure(!initialized(), "CLEAN UP FAILED!");
108 }
109
114 void close() {
115 auto lock = std::unique_lock(channelMux);
116 ensure(initialized(), "NOT INITIALIZED!");
117 closed = true;
118 send_cond.notify_all();
119 read_cond.notify_all();
120 }
121
128 [[nodiscard]] Result<void, ChannelTrySendErrors> try_send(const T &msg) {
129 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
130 ensure(initialized(), "NOT INITIALIZED!");
131
132 if (!lock.owns_lock()) {
134 }
135
136 if (closed) {
138 }
139
140 if (q.is_full()) {
142 }
143
144 (void) q.push(msg);
145 read_cond.notify_one();
146 return success();
147 }
148
156 auto lock = std::unique_lock(channelMux);
157 ensure(lock.owns_lock(), "DID NOT LOCK");
158 ensure(initialized(), "NOT INITIALIZED!");
159
160 while (!closed && q.is_full()) {
161 send_cond.wait(lock);
162 }
163 if (closed) {
165 }
166
167 (void) q.push(msg);
168 read_cond.notify_one();
169 return success();
170 }
171
180 template<typename Clock = std::chrono::steady_clock, typename Duration>
182 send_before(const T &msg, const std::chrono::time_point<Clock, Duration> &when) {
183 auto lock = std::unique_lock(channelMux, when);
184 if (!lock.owns_lock()) {
186 }
187 ensure(initialized(), "NOT INITIALIZED!");
188
189 while (!closed && q.is_full()) {
190 if (send_cond.wait_until(lock, when) == std::cv_status::timeout) {
192 }
193 }
194 if (closed) {
196 }
197
198 (void) q.push(msg);
199 read_cond.notify_one();
200 return success();
201 }
202
211 [[nodiscard]] Result<void, ChannelGrowTrySendErrors> try_send(Allocator &alloc, const T &msg) {
212 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
213
214 if (!lock.owns_lock()) {
216 }
217 ensure(initialized(), "NOT INITIALIZED!");
218
219 if (closed) {
221 }
222
223 bool emit_notice = q.is_full();
224 auto res = q.push(alloc, msg);
225 if (res.is_error()) {
227 }
228 read_cond.notify_one();
229
230 // tell everyone else our capacity has grown
231 if (emit_notice) {
232 send_cond.notify_all();
233 }
234 return success();
235 }
236
246 auto lock = std::unique_lock(channelMux);
247 ensure(lock.owns_lock(), "DID NOT LOCK");
248 ensure(initialized(), "NOT INITIALIZED!");
249
250 while (!closed && q.is_full()) {
251 send_cond.wait(lock);
252 }
253 if (closed) {
255 }
256
257 bool emit_notice = q.is_full();
258 auto res = q.push(alloc, msg);
259 if (res.is_error()) {
260 // If we couldn't resize, just wait for room
261 emit_notice = false;
262 while (!is_closed() && q.is_full()) {
263 send_cond.wait(lock);
264 }
265
266 // check if we closed
267 if (is_closed()) {
269 }
270
271 // try sending again
272 if (auto err = q.push(msg); err.is_error()) {
274 }
275 }
276 read_cond.notify_one();
277
278 // tell everyone else our capacity has grown
279 if (emit_notice) {
280 send_cond.notify_all();
281 }
282 return success();
283 }
284
295 template<typename Clock = std::chrono::steady_clock, typename Duration>
297 send_before(Allocator &alloc, const T &msg, const std::chrono::time_point<Clock, Duration> &when) {
298 auto lock = std::unique_lock(channelMux, when);
299 if (!lock.owns_lock()) {
301 }
302 ensure(initialized(), "NOT INITIALIZED!");
303
304 if (is_closed()) {
306 }
307
308 bool emit_notice = q.is_full();
309
310 auto res = q.push(alloc, msg);
311 if (res.is_error()) {
312 ensure(res.error().code == AllocationError::ALLOCATION_FAILED);
313 // If we couldn't resize, just wait for room
314 emit_notice = false;
315 while (!is_closed() && q.is_full()) {
316 if (send_cond.wait_until(lock, when) == std::cv_status::timeout) {
318 }
319 }
320
321 // check if we closed
322 if (is_closed()) {
324 }
325
326 // try sending again
327 if (auto err = q.push(msg); err.is_error()) {
329 }
330 }
331 read_cond.notify_one();
332
333 // tell everyone else our capacity has grown
334 if (emit_notice) {
335 send_cond.notify_all();
336 }
337 return success();
338 }
339
349 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
350
351 if (!lock.owns_lock()) {
353 }
354 ensure(initialized(), "NOT INITIALIZED!");
355
356 if (q.is_empty()) {
357 if (closed) {
358 return readImpl();
359 }
361 }
362
363 ensure(lock.owns_lock(), "DID NOT LOCK!");
364 return readImpl();
365 }
366
372 [[nodiscard]] Optional<T> receive_block() {
373 auto lock = std::unique_lock(channelMux);
374 ensure(initialized(), "NOT INITIALIZED!");
375
376 while (!closed && q.is_empty()) {
377 read_cond.wait(lock);
378 }
379
380 ensure(lock.owns_lock(), "DID NOT LOCK");
381 return readImpl();
382 }
383
392 template<typename Clock, typename Duration>
394 receive_before(const std::chrono::time_point<Clock, Duration> &when) {
395 auto lock = std::unique_lock(channelMux, when);
396
397 if (!lock.owns_lock()) {
399 }
400 ensure(initialized(), "NOT INITIALIZED!");
401
402 while (!closed && q.is_empty()) {
403 if (read_cond.wait_until(lock, when) == std::cv_status::timeout) {
405 }
406 }
407
408 ensure(lock.owns_lock(), "DID NOT LOCK");
409 return readImpl();
410 }
411
417 [[nodiscard]] bool is_initialized() const noexcept {
418 auto lock = std::unique_lock(channelMux);
419 return initialized();
420 }
421
427 void shrinkToFit(Allocator &alloc) {
428 auto lock = std::unique_lock(channelMux);
429 ensure(lock.owns_lock(), "DID NOT LOCK");
430 ensure(initialized(), "NOT INITIALIZED!");
431
432 q.shrink_to_fit(alloc);
433 }
434
435 private:
436 Optional<T> readImpl() {
437 if (closed && q.is_empty()) {
438 return nullopt;
439 }
440 ensure(!q.is_empty(), "CANNOT READ FROM EMPTY BUFFER");
441 send_cond.notify_one();
442 return q.pop();
443 }
444
445 [[nodiscard]] bool initialized() const noexcept { return q.is_initialized(); }
446
447 [[nodiscard]] bool is_closed() const noexcept { return closed; }
448 };
449 static_assert(is_channel_like_grow<Inbox<int>>, "Inbox<int> is not growing channel-like");
450} // namespace mtcore::thread
451
452#endif // MTCORE_MTTHREAD_INBOX_HPP
constexpr auto nullopt
Placeholder value for an empty Optional.
Definition optional.hpp:409
ChannelGrowTrySendErrors
Errors for when trying to do non-blocking send (with possible allocation) fails.
Definition inbox.hpp:32
ChannelSendBeforeErrors
Errors for when timed blocking sends fail.
Definition channels.hpp:61
ChannelGrowSendBlockErrors
Errors for when trying to do blocking send (with possible allocation) fails.
Definition inbox.hpp:43
ChannelTryReceiveErrors
Errors for when non-blocking receives fail Note: If the channel is closed, a nullopt will be returned...
Definition channels.hpp:79
ChannelGrowSendBeforeErrors
Errors for when trying to do timed blocking send (with possible allocation) fails.
Definition inbox.hpp:52
#define ensure(check,...)
Ensures that a check holds true, aborts the program if not true Will print error if the condition is ...
Success< void > success()
Creates a successful void Result object.
Definition result.hpp:398
Error< Underlying > error(Underlying err)
Creates an error.
Definition result.hpp:425
constexpr bool is_channel_like_grow
Checks if a type is growing channel-like.
Thread-related namespace The methods and classes provided by this class are thread-safe Classes and m...
Represents a memory allocator Exact behavior depends on the underlying VTable used Should use the a_*...
Represents a value that may or may not exist (an "Optional" value) Similar concept to std::optional,...
Definition optional.hpp:235
FIFO Queue (First In, First Out) Dynamically allocated, can be resized.
Definition queue.hpp:37
Optional< T > pop() noexcept
Removes and returns the next element in the Queue.
Definition queue.hpp:107
bool is_initialized() const noexcept
Checks if an Queue is initialized.
Definition queue.hpp:142
bool is_empty() const noexcept
Checks if ring buffer is empty.
Definition queue.hpp:77
Represents a Result that may have an error (error code) or a success value A type of "void" means the...
Definition result.hpp:170
Inter-thread communication primitive to send messages between threads Has a growable Queue.
Definition inbox.hpp:75
Result< void, ChannelGrowSendBlockErrors > send_block(Allocator &alloc, const T &msg)
Send message and block indefinitely Will grow inbox Queue if needed If unable to grow,...
Definition inbox.hpp:245
bool is_initialized() const noexcept
Checks if the inbox is initialized inboxes which aren't initialized will abort if used.
Definition inbox.hpp:417
Result< Optional< T >, ChannelSendBeforeErrors > receive_before(const std::chrono::time_point< Clock, Duration > &when)
Receives a message before a time point Blocks until a time point.
Definition inbox.hpp:394
Result< void, ChannelSendBeforeErrors > send_before(const T &msg, const std::chrono::time_point< Clock, Duration > &when)
Send message before a timeout Will NOT grow inbox Queue and will instead block.
Definition inbox.hpp:182
Result< Optional< T >, ChannelTryReceiveErrors > try_receive()
Tries to receive a message Will fail if it would have to block If the channel is empty and closed,...
Definition inbox.hpp:348
Result< void, ChannelSendBlockErrors > send_block(const T &msg)
Send message, will block indefinitely Will NOT grow inbox Queue and will instead block.
Definition inbox.hpp:155
Optional< T > receive_block()
Receives a message from the channel Blocks indefinitely.
Definition inbox.hpp:372
void shrinkToFit(Allocator &alloc)
Shrinks Queue to fit current Queue size Useful to lower memory usage.
Definition inbox.hpp:427
void close()
Closes an inbox.
Definition inbox.hpp:114
Result< void, AllocationError > init(Allocator &alloc, size_t initCapacity=10)
Initializes a new message Queue.
Definition inbox.hpp:92
Result< void, ChannelGrowTrySendErrors > try_send(Allocator &alloc, const T &msg)
Send message without blocking Will grow inbox Queue if needed If unable to grow, will return an error...
Definition inbox.hpp:211
void deinit(Allocator &alloc)
Cleans up memory Note: All references to the inbox MUST be cleaned up BEFORE calling this method.
Definition inbox.hpp:104
Result< void, ChannelTrySendErrors > try_send(const T &msg)
Send message without blocking Will NOT grow inbox Queue and will instead fail.
Definition inbox.hpp:128
Result< void, ChannelGrowSendBeforeErrors > send_before(Allocator &alloc, const T &msg, const std::chrono::time_point< Clock, Duration > &when)
Send message before a timeout Will grow inbox Queue if needed If unable to grow, will instead block.
Definition inbox.hpp:297