WPILibC++ 2025.2.1
Loading...
Searching...
No Matches
WorkerThread.h
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifndef WPINET_WORKERTHREAD_H_
6#define WPINET_WORKERTHREAD_H_
7
8#include <functional>
9#include <memory>
10#include <tuple>
11#include <utility>
12#include <vector>
13
14#include <wpi/SafeThread.h>
15#include <wpi/future.h>
16
17#include "wpinet/uv/Async.h"
18
19namespace wpi {
20
21namespace detail {
22
23template <typename R>
25 using AfterWorkFunction = std::function<void(R)>;
26
28
29 void SetLoop(uv::Loop& loop) {
31 async->wakeup.connect(
32 [](AfterWorkFunction func, R result) { func(result); });
33 m_async = async;
34 }
35
36 void UnsetLoop() {
37 if (auto async = m_async.lock()) {
38 async->Close();
39 m_async.reset();
40 }
41 }
42
43 std::weak_ptr<uv::Async<AfterWorkFunction, R>> m_async;
44};
45
46template <>
47struct WorkerThreadAsync<void> {
48 using AfterWorkFunction = std::function<void()>;
49
50 ~WorkerThreadAsync() { RemoveLoop(); }
51
52 void SetLoop(uv::Loop& loop) {
53 auto async = uv::Async<AfterWorkFunction>::Create(loop);
54 async->wakeup.connect([](AfterWorkFunction func) { func(); });
55 m_async = async;
56 }
57
58 void RemoveLoop() {
59 if (auto async = m_async.lock()) {
60 async->Close();
61 m_async.reset();
62 }
63 }
64
65 std::weak_ptr<uv::Async<AfterWorkFunction>> m_async;
66};
67
68template <typename R, typename... T>
70 using WorkFunction = std::function<R(T...)>;
72
74 WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_,
75 std::tuple<T...> params_)
76 : promiseId(promiseId_),
77 work(std::move(work_)),
78 params(std::move(params_)) {}
80 std::tuple<T...> params_)
81 : promiseId(0),
82 work(std::move(work_)),
83 afterWork(std::move(afterWork_)),
84 params(std::move(params_)) {}
85
86 uint64_t promiseId;
89 std::tuple<T...> params;
90};
91
92template <typename R, typename... T>
94 public:
95 using Request = WorkerThreadRequest<R, T...>;
96
97 void Main() override;
98
99 std::vector<Request> m_requests;
102};
103
104template <typename R, typename... T>
107 R result = std::apply(req.work, std::move(req.params));
108 if (req.afterWork) {
109 if (auto async = thr.m_async.m_async.lock()) {
110 async->Send(std::move(req.afterWork), std::move(result));
111 }
112 } else {
113 thr.m_promises.SetValue(req.promiseId, std::move(result));
114 }
115}
116
117template <typename... T>
120 std::apply(req.work, req.params);
121 if (req.afterWork) {
122 if (auto async = thr.m_async.m_async.lock()) {
123 async->Send(std::move(req.afterWork));
124 }
125 } else {
127 }
128}
129
130template <typename R, typename... T>
132 std::vector<Request> requests;
133 while (m_active) {
134 std::unique_lock lock(m_mutex);
135 m_cond.wait(lock, [&] { return !m_active || !m_requests.empty(); });
136 if (!m_active) {
137 break;
138 }
139
140 // don't want to hold the lock while executing the callbacks
141 requests.swap(m_requests);
142 lock.unlock();
143
144 for (auto&& req : requests) {
145 if (!m_active) {
146 break; // requests may be long-running
147 }
148 RunWorkerThreadRequest(*this, req);
149 }
150 requests.clear();
151 m_promises.Notify();
152 }
153}
154
155} // namespace detail
156
157template <typename T>
159
160template <typename R, typename... T>
161class WorkerThread<R(T...)> final {
162 using Thread = detail::WorkerThreadThread<R, T...>;
163
164 public:
165 using WorkFunction = std::function<R(T...)>;
168
169 WorkerThread() { m_owner.Start(); }
170
171 /**
172 * Set the loop. This must be called from the loop thread.
173 * Subsequent calls to QueueWorkThen will run afterWork on the provided
174 * loop (via an async handle).
175 *
176 * @param loop the loop to use for running afterWork routines
177 */
178 void SetLoop(uv::Loop& loop) {
179 if (auto thr = m_owner.GetThread()) {
180 thr->m_async.SetLoop(loop);
181 }
182 }
183
184 /**
185 * Set the loop. This must be called from the loop thread.
186 * Subsequent calls to QueueWorkThen will run afterWork on the provided
187 * loop (via an async handle).
188 *
189 * @param loop the loop to use for running afterWork routines
190 */
191 void SetLoop(std::shared_ptr<uv::Loop> loop) { SetLoop(*loop); }
192
193 /**
194 * Unset the loop. This must be called from the loop thread.
195 * Subsequent calls to QueueWorkThen will no longer run afterWork.
196 */
197 void UnsetLoop() {
198 if (auto thr = m_owner.GetThread()) {
199 thr->m_async.UnsetLoop();
200 }
201 }
202
203 /**
204 * Get the handle used by QueueWorkThen() to run afterWork.
205 * This handle is set by SetLoop().
206 * Calling Close() on this handle is the same as calling UnsetLoop().
207 *
208 * @return The handle (if nullptr, no handle is set)
209 */
210 std::shared_ptr<uv::Handle> GetHandle() const {
211 if (auto thr = m_owner.GetThread()) {
212 return thr->m_async.m_async.lock();
213 } else {
214 return nullptr;
215 }
216 }
217
218 /**
219 * Wakeup the worker thread, call the work function, and return a future for
220 * the result.
221 *
222 * It’s safe to call this function from any thread.
223 * The work function will be called on the worker thread.
224 *
225 * The future will return a default-constructed result if this class is
226 * destroyed while waiting for a result.
227 *
228 * @param work Work function (called on worker thread)
229 * @param u Arguments to work function
230 */
231 template <typename... U>
233 if (auto thr = m_owner.GetThread()) {
234 // create the future
235 uint64_t req = thr->m_promises.CreateRequest();
236
237 // add the parameters to the input queue
238 thr->m_requests.emplace_back(
239 req, std::move(work), std::forward_as_tuple(std::forward<U>(u)...));
240
241 // signal the thread
242 thr->m_cond.notify_one();
243
244 // return future
245 return thr->m_promises.CreateFuture(req);
246 }
247
248 // XXX: is this the right thing to do?
249 return future<R>();
250 }
251
252 /**
253 * Wakeup the worker thread, call the work function, and call the afterWork
254 * function with the result on the loop set by SetLoop().
255 *
256 * It’s safe to call this function from any thread.
257 * The work function will be called on the worker thread, and the afterWork
258 * function will be called on the loop thread.
259 *
260 * SetLoop() must be called prior to calling this function for afterWork to
261 * be called.
262 *
263 * @param work Work function (called on worker thread)
264 * @param afterWork After work function (called on loop thread)
265 * @param u Arguments to work function
266 */
267 template <typename... U>
268 void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U&&... u) {
269 if (auto thr = m_owner.GetThread()) {
270 // add the parameters to the input queue
271 thr->m_requests.emplace_back(
272 std::move(work), std::move(afterWork),
273 std::forward_as_tuple(std::forward<U>(u)...));
274
275 // signal the thread
276 thr->m_cond.notify_one();
277 }
278 }
279
280 private:
282};
283
284} // namespace wpi
285
286#endif // WPINET_WORKERTHREAD_H_
A promise factory for lightweight futures.
Definition future.h:123
void SetValue(uint64_t request, const T &value)
Sets a value directly for a future without creating a promise object.
Definition future.h:729
Definition SafeThread.h:33
Definition SafeThread.h:124
std::function< R(T...)> WorkFunction
Definition WorkerThread.h:165
future< R > QueueWork(WorkFunction work, U &&... u)
Wakeup the worker thread, call the work function, and return a future for the result.
Definition WorkerThread.h:232
typename detail::WorkerThreadAsync< R >::AfterWorkFunction AfterWorkFunction
Definition WorkerThread.h:166
void SetLoop(uv::Loop &loop)
Set the loop.
Definition WorkerThread.h:178
void SetLoop(std::shared_ptr< uv::Loop > loop)
Set the loop.
Definition WorkerThread.h:191
void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U &&... u)
Wakeup the worker thread, call the work function, and call the afterWork function with the result on ...
Definition WorkerThread.h:268
std::shared_ptr< uv::Handle > GetHandle() const
Get the handle used by QueueWorkThen() to run afterWork.
Definition WorkerThread.h:210
WorkerThread()
Definition WorkerThread.h:169
void UnsetLoop()
Unset the loop.
Definition WorkerThread.h:197
Definition WorkerThread.h:158
Definition WorkerThread.h:93
void Main() override
Definition WorkerThread.h:131
PromiseFactory< R > m_promises
Definition WorkerThread.h:100
detail::WorkerThreadAsync< R > m_async
Definition WorkerThread.h:101
std::vector< Request > m_requests
Definition WorkerThread.h:99
A lightweight version of std::future.
Definition future.h:271
static std::shared_ptr< Async > Create(Loop &loop)
Create an async handle.
Definition Async.h:55
Event loop.
Definition Loop.h:37
detail namespace with internal helper functions
Definition input_adapters.h:32
Implement std::hash so that hash_code can be used in STL containers.
Definition PointerIntPair.h:280
void RunWorkerThreadRequest(WorkerThreadThread< R, T... > &thr, WorkerThreadRequest< R, T... > &req)
Definition WorkerThread.h:105
Foonathan namespace.
Definition ntcore_cpp.h:26
std::weak_ptr< uv::Async< AfterWorkFunction > > m_async
Definition WorkerThread.h:65
void RemoveLoop()
Definition WorkerThread.h:58
void SetLoop(uv::Loop &loop)
Definition WorkerThread.h:52
std::function< void()> AfterWorkFunction
Definition WorkerThread.h:48
~WorkerThreadAsync()
Definition WorkerThread.h:50
Definition WorkerThread.h:24
void SetLoop(uv::Loop &loop)
Definition WorkerThread.h:29
void UnsetLoop()
Definition WorkerThread.h:36
std::weak_ptr< uv::Async< AfterWorkFunction, R > > m_async
Definition WorkerThread.h:43
std::function< void(R)> AfterWorkFunction
Definition WorkerThread.h:25
~WorkerThreadAsync()
Definition WorkerThread.h:27
Definition WorkerThread.h:69
std::function< R(T...)> WorkFunction
Definition WorkerThread.h:70
WorkFunction work
Definition WorkerThread.h:87
std::tuple< T... > params
Definition WorkerThread.h:89
AfterWorkFunction afterWork
Definition WorkerThread.h:88
typename WorkerThreadAsync< R >::AfterWorkFunction AfterWorkFunction
Definition WorkerThread.h:71
uint64_t promiseId
Definition WorkerThread.h:86
WorkerThreadRequest(WorkFunction work_, AfterWorkFunction afterWork_, std::tuple< T... > params_)
Definition WorkerThread.h:79
WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_, std::tuple< T... > params_)
Definition WorkerThread.h:74