MT Core (C++)
Core library for replacing C++ standard in project usage
Loading...
Searching...
No Matches
channels.hpp
Go to the documentation of this file.
1/*
2
3Copyright 2025 Matthew Tolman
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17*/
18
19#pragma once
20
21#ifndef MTCORE_THREAD_CHANNELS_HPP
22#define MTCORE_THREAD_CHANNELS_HPP
23
27
28#include <array>
29#include <condition_variable>
30#include <mutex>
31#include <thread>
32
34namespace mtcore::thread {
42
44
54
56
66
67 static_assert(static_cast<int>(ChannelTrySendErrors::CHANNEL_CLOSED) ==
69 "MISMATCHED CLOSED ERROR");
70 static_assert(static_cast<int>(ChannelSendBeforeErrors::CHANNEL_CLOSED) ==
72 "MISMATCHED CLOSED ERROR");
73
83
92
116 template<typename T, size_t Size = 1>
117 struct Channel {
118 static_assert(Size > 0, "CANNOT HAVE UNBUFFERED THREAD-BASED CHANNELS");
119
120 private:
121 std::timed_mutex channelMux = {};
122 std::condition_variable_any read_cond = {};
123 std::condition_variable_any send_cond = {};
124 FixedQueue<T, Size> q = {};
125 bool closed = false;
126
127 public:
128 using Message = T;
129
133 void close() {
134 auto lock = std::unique_lock(channelMux);
135 closed = true;
136 read_cond.notify_all();
137 send_cond.notify_all();
138 }
139
148 [[nodiscard]] Result<void, ChannelTrySendErrors> try_send(const T &msg) {
149 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
150
151 if (!lock.owns_lock()) {
153 }
154
155 if (q.is_full()) {
157 }
158
159 ensure(lock.owns_lock(), "DID NOT LOCK!");
160 if (sendImpl(msg).is_error()) {
162 }
163 return success();
164 }
165
173 auto lock = std::unique_lock(channelMux);
174 while (!closed && q.is_full()) {
175 send_cond.wait(lock);
176 }
177
178 ensure(lock.owns_lock(), "DID NOT LOCK!");
179 return sendImpl(msg);
180 }
181
191 template<typename Clock = std::chrono::steady_clock>
193 const std::chrono::time_point<Clock> &when) {
194 auto lock = std::unique_lock(channelMux, when);
195
196 if (!lock.owns_lock()) {
198 }
199
200 while (!closed && q.is_full()) {
201 if (send_cond.wait_until(lock, when) == std::cv_status::timeout) {
203 }
204 }
205
206 ensure(lock.owns_lock(), "DID NOT LOCK!");
207 if (sendImpl(msg).is_error()) {
209 }
210 return success();
211 }
212
222 auto lock = std::unique_lock(channelMux, std::try_to_lock_t{});
223
224 if (!lock.owns_lock()) {
226 }
227
228 if (q.is_empty()) {
229 if (closed) {
230 return readImpl();
231 }
233 }
234
235 ensure(lock.owns_lock(), "DID NOT LOCK!");
236 return readImpl();
237 }
238
245 auto lock = std::unique_lock(channelMux);
246
247 while (!closed && q.is_empty()) {
248 read_cond.wait(lock);
249 }
250
251 ensure(lock.owns_lock(), "DID NOT LOCK");
252 return readImpl();
253 }
254
263 template<typename Clock>
265 receive_before(const std::chrono::time_point<Clock> &when) {
266 auto lock = std::unique_lock(channelMux, when);
267
268 if (!lock.owns_lock()) {
270 }
271
272 while (!closed && q.is_empty()) {
273 if (read_cond.wait_until(lock, when) == std::cv_status::timeout) {
275 }
276 }
277
278 ensure(lock.owns_lock(), "DID NOT LOCK");
279 return readImpl();
280 }
281
282 private:
283 Result<void, ChannelSendBlockErrors> sendImpl(const T &msg) {
284 if (closed) {
286 }
287
288 ensure(!q.is_full(), "CANNOT SEND TO FULL BUFFER");
289 ensure(q.push(msg).is_success());
290 read_cond.notify_one();
291 return success();
292 }
293
294 Optional<T> readImpl() {
295 if (closed && q.is_empty()) {
296 return nullopt;
297 }
298 ensure(!q.is_empty(), "CANNOT READ FROM EMPTY BUFFER");
299 send_cond.notify_one();
300 return q.pop();
301 }
302 };
303 static_assert(is_channel_like<Channel<int, 1>>, "Channel<int, 1> not send/receive");
304 static_assert(is_channel_like<Channel<int>>, "Channel<int> not channel-like");
305} // namespace mtcore::thread
306
307#endif // MTCORE_THREAD_CHANNELS_HPP
constexpr auto nullopt
Placeholder value for an empty Optional.
Definition optional.hpp:409
ChannelReceiveBeforeErrors
Errors for when timed receives fail Note: If the channel is closed, a nullopt will be returned instea...
Definition channels.hpp:89
ChannelTrySendErrors
Errors for when non-blocking sends fail.
Definition channels.hpp:49
ChannelSendBlockErrors
Errors for when blocking sends fail.
Definition channels.hpp:39
ChannelSendBeforeErrors
Errors for when timed blocking sends fail.
Definition channels.hpp:61
ChannelTryReceiveErrors
Errors for when non-blocking receives fail Note: If the channel is closed, a nullopt will be returned...
Definition channels.hpp:79
#define ensure(check,...)
Ensures that a check holds true, aborts the program if not true Will print error if the condition is ...
Success< void > success()
Creates a successful void Result object.
Definition result.hpp:398
Error< Underlying > error(Underlying err)
Creates an error.
Definition result.hpp:425
constexpr bool is_channel_like
Checks if a type is channel-like.
constexpr bool has_closed_error
Checks if an error type has a recognized closed error.
Thread-related namespace The methods and classes provided by this class are thread-safe Classes and m...
Statically allocated FIFO queue with fixed maximum capacity.
Definition queue.hpp:156
Result< void, CollectionAddNoAllocationError > push(const T &elem) noexcept
Tries to add an element to the Queue Fails if Queue is empty.
Definition queue.hpp:210
bool is_full() const noexcept
Checks if ring buffer is full.
Definition queue.hpp:219
bool is_empty() const noexcept
Checks if ring buffer is empty.
Definition queue.hpp:225
Represents a value that may or may not exist (an "Optional" value) Similar concept to std::optional,...
Definition optional.hpp:235
Represents a Result that may have an error (error code) or a success value A type of "void" means the...
Definition result.hpp:170
Channel is a message-passing primitive between two threads A channel has blocking and non-blocking (o...
Definition channels.hpp:117
Result< void, ChannelSendBlockErrors > send_block(const T &msg)
Will send a message (unless the channel is closed, then will fail) Will block as long as is needed to...
Definition channels.hpp:172
Optional< Message > receive_block()
Receives a message from the channel Blocks indefinitely.
Definition channels.hpp:244
Result< void, ChannelTrySendErrors > try_send(const T &msg)
Tries to send a message Does not block If there is no room or would have to block for locking,...
Definition channels.hpp:148
Result< Optional< T >, ChannelReceiveBeforeErrors > receive_before(const std::chrono::time_point< Clock > &when)
Receives a message before a time point Blocks until a time point.
Definition channels.hpp:265
Result< Optional< T >, ChannelTryReceiveErrors > try_receive()
Tries to receive a message Will fail if it would have to block If the channel is empty and closed,...
Definition channels.hpp:221
Result< void, ChannelSendBeforeErrors > send_before(const T &msg, const std::chrono::time_point< Clock > &when)
Will send a message (unless the channel is closed, will fail) Will block up to a certain time point,...
Definition channels.hpp:192
void close()
Closes the channel.
Definition channels.hpp:133