MT Core (C++)
Core library for replacing C++ standard in project usage
Loading...
Searching...
No Matches
broadcaster.hpp
Go to the documentation of this file.
1/*
2
3Copyright 2025 Matthew Tolman
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17*/
18
19#ifndef MTCORE_THREAD_BROADCAST_HPP
20#define MTCORE_THREAD_BROADCAST_HPP
21
23#include "mtcore_thread/arc.hpp"
26
31
33namespace mtcore::thread {
39
45 template<typename T>
48
50 if (auto inboxRes = inbox.init(alloc); inboxRes.is_error()) {
51 return inboxRes.error();
52 }
53 return inbox->init(alloc);
54 }
55
57 ensure(inbox.valid(), "SUBSCRIPTION IS NOT INITIALIZED!");
58 return inbox->receive_block();
59 }
60
61 void deinit(Allocator &alloc) {
62 if (inbox.valid()) {
63 inbox->close();
64 }
65 inbox.release(alloc);
66 }
67 };
68
102 template<typename T>
103 struct broadcaster {
105 std::mutex mux;
106 bool closed = false;
107
114 Result<void, AllocationError> init(Allocator &alloc, size_t initCapacity = 64) {
115 return channels.init(alloc, initCapacity);
116 }
117
125 // We need the lock since we'll be iterating and removing stuff
126 auto l = std::unique_lock{mux};
127
128 if (closed) {
130 }
131
132 Handle handle;
133 auto iter = channels.handles();
134 while (iter.next().copy_if_present(handle)) {
135 auto &e = channels[handle];
136 if (e.inbox.valid()) {
137 if (auto res = e.inbox->send_block(alloc, msg); res.is_error()) {
138 // If we failed allocation, try a non-allocating send
139 if (res.error().code == ChannelGrowSendBlockErrors::ALLOCATION_FAILED) {
140 // We can still fail if our channel is closed. In this case, remove the channel
141 if (auto res2 = e.inbox->send_block(msg); res2.is_error()) {
142 e.inbox.release(alloc);
143 channels.remove(handle);
144 }
145 }
146 // otherwise, our channel is closed, just remove the channel
147 else {
148 e.inbox.release(alloc);
149 channels.remove(handle);
150 }
151 }
152 }
153 else {
154 // clean up and remove the channel
155 e.deinit(alloc);
156 channels.remove(handle);
157 }
158 }
159 return success();
160 }
161
170 if (auto subRes = sub.init(alloc); subRes.is_error()) {
172 }
173
174 // We need the lock since we'll be removing stuff
175 {
176 auto l = std::unique_lock{mux};
177 if (closed) {
178 sub.deinit(alloc);
180 }
181
182 if (auto addRes = channels.add(alloc, sub); addRes.is_error()) {
183 sub.deinit(alloc);
185 }
186 }
187 auto acquireRes = sub.inbox.acquire(alloc);
188 if (acquireRes.is_error()) {
189 sub.deinit(alloc);
191 }
192 return BroadcastSubscription<T>{acquireRes.value()};
193 }
194
201 void close() {
202 auto l = std::unique_lock{mux};
203 closed = true;
205 auto iter = channels.iter();
206 while (iter.next().copy_if_present(cur)) {
207 if (cur.inbox.valid()) {
208 cur.inbox->close();
209 }
210 }
211 }
212
219 void deinit(Allocator &alloc) {
220 auto l = std::unique_lock{mux};
221 closed = true;
223 auto iter = channels.iter();
224 while (iter.next().copy_if_present(cur)) {
225 cur.deinit(alloc);
226 }
227 channels.deinit(alloc);
228 }
229 };
230
231} // namespace mtcore::thread
232
233#endif // MTCORE_THREAD_BROADCAST_HPP
BroadcastSubscribeError
Errors when getting a broadcast subscription.
#define ensure(check,...)
Ensures that a check holds true, aborts the program if not true Will print error if the condition is ...
Success< void > success()
Creates a successful void Result object.
Definition result.hpp:398
Error< Underlying > error(Underlying err)
Creates an error.
Definition result.hpp:425
Generic iterator defaults built on common contracts Does not guarantee performance of iterators Actua...
Definition iter.hpp:91
Thread-related namespace The methods and classes provided by this class are thread-safe Classes and m...
Represents a memory allocator Exact behavior depends on the underlying VTable used Should use the a_*...
Represents a generational list where removed items are marked and recycled.
Definition gen_list.hpp:59
Handle to an item in the list.
Definition gen_list.hpp:36
Represents a value that may or may not exist (an "Optional" value) Similar concept to std::optional,...
Definition optional.hpp:235
Represents a Result that may have an error (error code) or a success value A type of "void" means the...
Definition result.hpp:170
Automic Reference Count.
Definition arc.hpp:190
A subscription to a broadcaster.
Result< void, AllocationError > init(Allocator &alloc)
A broadcaster is able to send messages to multiple channels, with each channel having its own lock Se...
Result< BroadcastSubscription< T >, BroadcastSubscribeError > subscribe(Allocator &alloc)
Subscribes to a broadcaster Will create a new inbox for the subscription.
Result< void, ChannelSendBlockErrors > send_block(Allocator &alloc, const T &msg)
Blocks while sending data.
void deinit(Allocator &alloc)
Will close the broadcaster and will also clean up the broadcaster's references to each subscription (...
GenList< BroadcastSubscription< T > > channels
Result< void, AllocationError > init(Allocator &alloc, size_t initCapacity=64)
Initializes a broadcaster.
void close()
Closes the broadcaster which prevents messages from being sent (which aren't already being sent) and ...