WPILibC++ 2024.1.1-beta-4
WorkerThread.h
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifndef WPINET_WORKERTHREAD_H_
6#define WPINET_WORKERTHREAD_H_
7
8#include <functional>
9#include <memory>
10#include <tuple>
11#include <utility>
12#include <vector>
13
14#include <wpi/SafeThread.h>
15#include <wpi/future.h>
16
17#include "wpinet/uv/Async.h"
18
19namespace wpi {
20
21namespace detail {
22
23template <typename R>
25 using AfterWorkFunction = std::function<void(R)>;
26
28
29 void SetLoop(uv::Loop& loop) {
31 async->wakeup.connect(
32 [](AfterWorkFunction func, R result) { func(result); });
33 m_async = async;
34 }
35
36 void UnsetLoop() {
37 if (auto async = m_async.lock()) {
38 async->Close();
39 m_async.reset();
40 }
41 }
42
43 std::weak_ptr<uv::Async<AfterWorkFunction, R>> m_async;
44};
45
46template <>
47struct WorkerThreadAsync<void> {
48 using AfterWorkFunction = std::function<void()>;
49
50 ~WorkerThreadAsync() { RemoveLoop(); }
51
52 void SetLoop(uv::Loop& loop) {
53 auto async = uv::Async<AfterWorkFunction>::Create(loop);
54 async->wakeup.connect([](AfterWorkFunction func) { func(); });
55 m_async = async;
56 }
57
58 void RemoveLoop() {
59 if (auto async = m_async.lock()) {
60 async->Close();
61 m_async.reset();
62 }
63 }
64
65 std::weak_ptr<uv::Async<AfterWorkFunction>> m_async;
66};
67
68template <typename R, typename... T>
70 using WorkFunction = std::function<R(T...)>;
72
74 WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_,
75 std::tuple<T...> params_)
76 : promiseId(promiseId_),
77 work(std::move(work_)),
78 params(std::move(params_)) {}
80 std::tuple<T...> params_)
81 : promiseId(0),
82 work(std::move(work_)),
83 afterWork(std::move(afterWork_)),
84 params(std::move(params_)) {}
85
86 uint64_t promiseId;
89 std::tuple<T...> params;
90};
91
92template <typename R, typename... T>
94 public:
96
97 void Main() override;
98
99 std::vector<Request> m_requests;
102};
103
104template <typename R, typename... T>
107 R result = std::apply(req.work, std::move(req.params));
108 if (req.afterWork) {
109 if (auto async = thr.m_async.m_async.lock()) {
110 async->Send(std::move(req.afterWork), std::move(result));
111 }
112 } else {
113 thr.m_promises.SetValue(req.promiseId, std::move(result));
114 }
115}
116
117template <typename... T>
120 std::apply(req.work, req.params);
121 if (req.afterWork) {
122 if (auto async = thr.m_async.m_async.lock()) {
123 async->Send(std::move(req.afterWork));
124 }
125 } else {
127 }
128}
129
130template <typename R, typename... T>
132 std::vector<Request> requests;
133 while (m_active) {
134 std::unique_lock lock(m_mutex);
135 m_cond.wait(lock, [&] { return !m_active || !m_requests.empty(); });
136 if (!m_active) {
137 break;
138 }
139
140 // don't want to hold the lock while executing the callbacks
141 requests.swap(m_requests);
142 lock.unlock();
143
144 for (auto&& req : requests) {
145 if (!m_active) {
146 break; // requests may be long-running
147 }
148 RunWorkerThreadRequest(*this, req);
149 }
150 requests.clear();
151 m_promises.Notify();
152 }
153}
154
155} // namespace detail
156
157template <typename T>
159
160template <typename R, typename... T>
161class WorkerThread<R(T...)> final {
162 using Thread = detail::WorkerThreadThread<R, T...>;
163
164 public:
165 using WorkFunction = std::function<R(T...)>;
168
169 WorkerThread() { m_owner.Start(); }
170
171 /**
172 * Set the loop. This must be called from the loop thread.
173 * Subsequent calls to QueueWorkThen will run afterWork on the provided
174 * loop (via an async handle).
175 *
176 * @param loop the loop to use for running afterWork routines
177 */
178 void SetLoop(uv::Loop& loop) {
179 if (auto thr = m_owner.GetThread()) {
180 thr->m_async.SetLoop(loop);
181 }
182 }
183
184 /**
185 * Set the loop. This must be called from the loop thread.
186 * Subsequent calls to QueueWorkThen will run afterWork on the provided
187 * loop (via an async handle).
188 *
189 * @param loop the loop to use for running afterWork routines
190 */
191 void SetLoop(std::shared_ptr<uv::Loop> loop) { SetLoop(*loop); }
192
193 /**
194 * Unset the loop. This must be called from the loop thread.
195 * Subsequent calls to QueueWorkThen will no longer run afterWork.
196 */
197 void UnsetLoop() {
198 if (auto thr = m_owner.GetThread()) {
199 thr->m_async.UnsetLoop();
200 }
201 }
202
203 /**
204 * Get the handle used by QueueWorkThen() to run afterWork.
205 * This handle is set by SetLoop().
206 * Calling Close() on this handle is the same as calling UnsetLoop().
207 *
208 * @return The handle (if nullptr, no handle is set)
209 */
210 std::shared_ptr<uv::Handle> GetHandle() const {
211 if (auto thr = m_owner.GetThread()) {
212 return thr->m_async.m_async.lock();
213 } else {
214 return nullptr;
215 }
216 }
217
218 /**
219 * Wakeup the worker thread, call the work function, and return a future for
220 * the result.
221 *
222 * It’s safe to call this function from any thread.
223 * The work function will be called on the worker thread.
224 *
225 * The future will return a default-constructed result if this class is
226 * destroyed while waiting for a result.
227 *
228 * @param work Work function (called on worker thread)
229 * @param u Arguments to work function
230 */
231 template <typename... U>
233 if (auto thr = m_owner.GetThread()) {
234 // create the future
235 uint64_t req = thr->m_promises.CreateRequest();
236
237 // add the parameters to the input queue
238 thr->m_requests.emplace_back(
239 req, std::move(work), std::forward_as_tuple(std::forward<U>(u)...));
240
241 // signal the thread
242 thr->m_cond.notify_one();
243
244 // return future
245 return thr->m_promises.CreateFuture(req);
246 }
247
248 // XXX: is this the right thing to do?
249 return future<R>();
250 }
251
252 /**
253 * Wakeup the worker thread, call the work function, and call the afterWork
254 * function with the result on the loop set by SetLoop().
255 *
256 * It’s safe to call this function from any thread.
257 * The work function will be called on the worker thread, and the afterWork
258 * function will be called on the loop thread.
259 *
260 * SetLoop() must be called prior to calling this function for afterWork to
261 * be called.
262 *
263 * @param work Work function (called on worker thread)
264 * @param afterWork After work function (called on loop thread)
265 * @param u Arguments to work function
266 */
267 template <typename... U>
269 if (auto thr = m_owner.GetThread()) {
270 // add the parameters to the input queue
271 thr->m_requests.emplace_back(
272 std::move(work), std::move(afterWork),
273 std::forward_as_tuple(std::forward<U>(u)...));
274
275 // signal the thread
276 thr->m_cond.notify_one();
277 }
278 }
279
280 private:
282};
283
284} // namespace wpi
285
286#endif // WPINET_WORKERTHREAD_H_
and restrictions which apply to each piece of software is included later in this file and or inside of the individual applicable source files The disclaimer of warranty in the WPILib license above applies to all code in and nothing in any of the other licenses gives permission to use the names of FIRST nor the names of the WPILib contributors to endorse or promote products derived from this software The following pieces of software have additional or alternate and or Google Inc All rights reserved Redistribution and use in source and binary with or without are permitted provided that the following conditions are this list of conditions and the following disclaimer *Redistributions in binary form must reproduce the above copyright this list of conditions and the following disclaimer in the documentation and or other materials provided with the distribution *Neither the name of Google Inc nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS AS IS AND ANY EXPRESS OR IMPLIED BUT NOT LIMITED THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY OR CONSEQUENTIAL WHETHER IN STRICT OR EVEN IF ADVISED OF THE POSSIBILITY OF SUCH January AND DISTRIBUTION Definitions License shall mean the terms and conditions for and distribution as defined by Sections through of this document Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License Legal Entity shall mean the union of the acting entity and all other entities that control are controlled by or are under common control with that entity For the purposes of this definition control direct or to cause the direction or management of such whether by contract or including but not limited to software source documentation and configuration files Object form shall mean any form resulting from mechanical transformation or translation of a Source including but not limited to compiled object generated and conversions to other media types Work shall mean the work of whether in Source or Object made available under the as indicated by a copyright notice that is included in or attached to the work(an example is provided in the Appendix below). "Derivative Works" shall mean any work
void SetValue(uint64_t request, const T &value)
Sets a value directly for a future without creating a promise object.
Definition: future.h:729
Definition: SafeThread.h:33
std::function< R(T...)> WorkFunction
Definition: WorkerThread.h:165
future< R > QueueWork(WorkFunction work, U &&... u)
Wakeup the worker thread, call the work function, and return a future for the result.
Definition: WorkerThread.h:232
void SetLoop(uv::Loop &loop)
Set the loop.
Definition: WorkerThread.h:178
void SetLoop(std::shared_ptr< uv::Loop > loop)
Set the loop.
Definition: WorkerThread.h:191
void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U &&... u)
Wakeup the worker thread, call the work function, and call the afterWork function with the result on ...
Definition: WorkerThread.h:268
typename detail::WorkerThreadAsync< R >::AfterWorkFunction AfterWorkFunction
Definition: WorkerThread.h:167
std::shared_ptr< uv::Handle > GetHandle() const
Get the handle used by QueueWorkThen() to run afterWork.
Definition: WorkerThread.h:210
WorkerThread()
Definition: WorkerThread.h:169
void UnsetLoop()
Unset the loop.
Definition: WorkerThread.h:197
Definition: WorkerThread.h:158
Definition: WorkerThread.h:93
void Main() override
Definition: WorkerThread.h:131
PromiseFactory< R > m_promises
Definition: WorkerThread.h:100
detail::WorkerThreadAsync< R > m_async
Definition: WorkerThread.h:101
std::vector< Request > m_requests
Definition: WorkerThread.h:99
A lightweight version of std::future.
Definition: future.h:271
static std::shared_ptr< Async > Create(Loop &loop)
Create an async handle.
Definition: Async.h:55
Event loop.
Definition: Loop.h:37
detail namespace with internal helper functions
Definition: ranges.h:23
Definition: array.h:89
static constexpr const unit_t< compound_unit< energy::joules, inverse< temperature::kelvin >, inverse< substance::moles > > > R(8.3144598)
Gas constant.
void RunWorkerThreadRequest(WorkerThreadThread< R, T... > &thr, WorkerThreadRequest< R, T... > &req)
Definition: WorkerThread.h:105
Definition: ntcore_cpp.h:26
std::weak_ptr< uv::Async< AfterWorkFunction > > m_async
Definition: WorkerThread.h:65
void RemoveLoop()
Definition: WorkerThread.h:58
void SetLoop(uv::Loop &loop)
Definition: WorkerThread.h:52
std::function< void()> AfterWorkFunction
Definition: WorkerThread.h:48
~WorkerThreadAsync()
Definition: WorkerThread.h:50
Definition: WorkerThread.h:24
void SetLoop(uv::Loop &loop)
Definition: WorkerThread.h:29
void UnsetLoop()
Definition: WorkerThread.h:36
std::weak_ptr< uv::Async< AfterWorkFunction, R > > m_async
Definition: WorkerThread.h:43
std::function< void(R)> AfterWorkFunction
Definition: WorkerThread.h:25
~WorkerThreadAsync()
Definition: WorkerThread.h:27
Definition: WorkerThread.h:69
std::function< R(T...)> WorkFunction
Definition: WorkerThread.h:70
WorkFunction work
Definition: WorkerThread.h:87
std::tuple< T... > params
Definition: WorkerThread.h:89
AfterWorkFunction afterWork
Definition: WorkerThread.h:88
typename WorkerThreadAsync< R >::AfterWorkFunction AfterWorkFunction
Definition: WorkerThread.h:71
uint64_t promiseId
Definition: WorkerThread.h:86
WorkerThreadRequest(WorkFunction work_, AfterWorkFunction afterWork_, std::tuple< T... > params_)
Definition: WorkerThread.h:79
WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_, std::tuple< T... > params_)
Definition: WorkerThread.h:74