WPILibC++ 2027.0.0-alpha-4
Loading...
Searching...
No Matches
WorkerThread.hpp
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#pragma once
6
7#include <functional>
8#include <memory>
9#include <tuple>
10#include <utility>
11#include <vector>
12
13#include "wpi/net/uv/Async.hpp"
15#include "wpi/util/future.hpp"
16
17namespace wpi::net {
18
19namespace detail {
20
21template <typename R>
23 using AfterWorkFunction = std::function<void(R)>;
24
26
29 async->wakeup.connect(
30 [](AfterWorkFunction func, R result) { func(result); });
31 m_async = async;
32 }
33
34 void UnsetLoop() {
35 if (auto async = m_async.lock()) {
36 async->Close();
37 m_async.reset();
38 }
39 }
40
41 std::weak_ptr<uv::Async<AfterWorkFunction, R>> m_async;
42};
43
44template <>
45struct WorkerThreadAsync<void> {
46 using AfterWorkFunction = std::function<void()>;
47
49
52 async->wakeup.connect([](AfterWorkFunction func) { func(); });
53 m_async = async;
54 }
55
56 void RemoveLoop() {
57 if (auto async = m_async.lock()) {
58 async->Close();
59 m_async.reset();
60 }
61 }
62
63 std::weak_ptr<uv::Async<AfterWorkFunction>> m_async;
64};
65
66template <typename R, typename... T>
68 using WorkFunction = std::function<R(T...)>;
70
72 WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_,
73 std::tuple<T...> params_)
74 : promiseId(promiseId_),
75 work(std::move(work_)),
76 params(std::move(params_)) {}
78 std::tuple<T...> params_)
79 : promiseId(0),
80 work(std::move(work_)),
81 afterWork(std::move(afterWork_)),
82 params(std::move(params_)) {}
83
84 uint64_t promiseId;
87 std::tuple<T...> params;
88};
89
90template <typename R, typename... T>
101
102template <typename R, typename... T>
105 R result = std::apply(req.work, std::move(req.params));
106 if (req.afterWork) {
107 if (auto async = thr.m_async.m_async.lock()) {
108 async->Send(std::move(req.afterWork), std::move(result));
109 }
110 } else {
111 thr.m_promises.SetValue(req.promiseId, std::move(result));
112 }
113}
114
115template <typename... T>
118 std::apply(req.work, req.params);
119 if (req.afterWork) {
120 if (auto async = thr.m_async.m_async.lock()) {
121 async->Send(std::move(req.afterWork));
122 }
123 } else {
125 }
126}
127
128template <typename R, typename... T>
130 std::vector<Request> requests;
131 while (m_active) {
132 std::unique_lock lock(m_mutex);
133 m_cond.wait(lock, [&] { return !m_active || !m_requests.empty(); });
134 if (!m_active) {
135 break;
136 }
137
138 // don't want to hold the lock while executing the callbacks
139 requests.swap(m_requests);
140 lock.unlock();
141
142 for (auto&& req : requests) {
143 if (!m_active) {
144 break; // requests may be long-running
145 }
146 RunWorkerThreadRequest(*this, req);
147 }
148 requests.clear();
149 m_promises.Notify();
150 }
151}
152
153} // namespace detail
154
155template <typename T>
157
158template <typename R, typename... T>
159class WorkerThread<R(T...)> final {
160 using Thread = detail::WorkerThreadThread<R, T...>;
161
162 public:
163 using WorkFunction = std::function<R(T...)>;
166
167 WorkerThread() { m_owner.Start(); }
168
169 /**
170 * Set the loop. This must be called from the loop thread.
171 * Subsequent calls to QueueWorkThen will run afterWork on the provided
172 * loop (via an async handle).
173 *
174 * @param loop the loop to use for running afterWork routines
175 */
177 if (auto thr = m_owner.GetThread()) {
178 thr->m_async.SetLoop(loop);
179 }
180 }
181
182 /**
183 * Set the loop. This must be called from the loop thread.
184 * Subsequent calls to QueueWorkThen will run afterWork on the provided
185 * loop (via an async handle).
186 *
187 * @param loop the loop to use for running afterWork routines
188 */
189 void SetLoop(std::shared_ptr<uv::Loop> loop) { SetLoop(*loop); }
190
191 /**
192 * Unset the loop. This must be called from the loop thread.
193 * Subsequent calls to QueueWorkThen will no longer run afterWork.
194 */
195 void UnsetLoop() {
196 if (auto thr = m_owner.GetThread()) {
197 thr->m_async.UnsetLoop();
198 }
199 }
200
201 /**
202 * Get the handle used by QueueWorkThen() to run afterWork.
203 * This handle is set by SetLoop().
204 * Calling Close() on this handle is the same as calling UnsetLoop().
205 *
206 * @return The handle (if nullptr, no handle is set)
207 */
208 std::shared_ptr<uv::Handle> GetHandle() const {
209 if (auto thr = m_owner.GetThread()) {
210 return thr->m_async.m_async.lock();
211 } else {
212 return nullptr;
213 }
214 }
215
216 /**
217 * Wakeup the worker thread, call the work function, and return a future for
218 * the result.
219 *
220 * It’s safe to call this function from any thread.
221 * The work function will be called on the worker thread.
222 *
223 * The future will return a default-constructed result if this class is
224 * destroyed while waiting for a result.
225 *
226 * @param work Work function (called on worker thread)
227 * @param u Arguments to work function
228 */
229 template <typename... U>
231 if (auto thr = m_owner.GetThread()) {
232 // create the future
233 uint64_t req = thr->m_promises.CreateRequest();
234
235 // add the parameters to the input queue
236 thr->m_requests.emplace_back(
237 req, std::move(work), std::forward_as_tuple(std::forward<U>(u)...));
238
239 // signal the thread
240 thr->m_cond.notify_one();
241
242 // return future
243 return thr->m_promises.CreateFuture(req);
244 }
245
246 // XXX: is this the right thing to do?
247 return wpi::util::future<R>();
248 }
249
250 /**
251 * Wakeup the worker thread, call the work function, and call the afterWork
252 * function with the result on the loop set by SetLoop().
253 *
254 * It’s safe to call this function from any thread.
255 * The work function will be called on the worker thread, and the afterWork
256 * function will be called on the loop thread.
257 *
258 * SetLoop() must be called prior to calling this function for afterWork to
259 * be called.
260 *
261 * @param work Work function (called on worker thread)
262 * @param afterWork After work function (called on loop thread)
263 * @param u Arguments to work function
264 */
265 template <typename... U>
267 if (auto thr = m_owner.GetThread()) {
268 // add the parameters to the input queue
269 thr->m_requests.emplace_back(
270 std::move(work), std::move(afterWork),
271 std::forward_as_tuple(std::forward<U>(u)...));
272
273 // signal the thread
274 thr->m_cond.notify_one();
275 }
276 }
277
278 private:
280};
281
282} // namespace wpi::net
and restrictions which apply to each piece of software is included later in this file and or inside of the individual applicable source files The disclaimer of warranty in the WPILib license above applies to all code in and nothing in any of the other licenses gives permission to use the names of FIRST nor the names of the WPILib contributors to endorse or promote products derived from this software The following pieces of software have additional or alternate and or glfw and nanopb were modified for use in Google Inc All rights reserved Redistribution and use in source and binary with or without are permitted provided that the following conditions are this list of conditions and the following disclaimer *Redistributions in binary form must reproduce the above copyright this list of conditions and the following disclaimer in the documentation and or other materials provided with the distribution *Neither the name of Google Inc nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS AS IS AND ANY EXPRESS OR IMPLIED BUT NOT LIMITED THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY OR CONSEQUENTIAL WHETHER IN STRICT OR EVEN IF ADVISED OF THE POSSIBILITY OF SUCH January AND DISTRIBUTION Definitions License shall mean the terms and conditions for and distribution as defined by Sections through of this document Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License Legal Entity shall mean the union of the acting entity and all other entities that control are controlled by or are under common control with that entity For the purposes of this definition control direct or to cause the direction or management of such whether by contract or including but not limited to software source documentation and configuration files Object form shall mean any form resulting from mechanical transformation or translation of a Source including but not limited to compiled object generated and conversions to other media types Work shall mean the work of whether in Source or Object made available under the as indicated by a copyright notice that is included in or attached to the work(an example is provided in the Appendix below). "Derivative Works" shall mean any work
WorkerThread()
Definition WorkerThread.hpp:167
std::shared_ptr< uv::Handle > GetHandle() const
Get the handle used by QueueWorkThen() to run afterWork.
Definition WorkerThread.hpp:208
typename detail::WorkerThreadAsync< R >::AfterWorkFunction AfterWorkFunction
Definition WorkerThread.hpp:164
void UnsetLoop()
Unset the loop.
Definition WorkerThread.hpp:195
void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U &&... u)
Wakeup the worker thread, call the work function, and call the afterWork function with the result on ...
Definition WorkerThread.hpp:266
void SetLoop(std::shared_ptr< uv::Loop > loop)
Set the loop.
Definition WorkerThread.hpp:189
std::function< R(T...)> WorkFunction
Definition WorkerThread.hpp:163
wpi::util::future< R > QueueWork(WorkFunction work, U &&... u)
Wakeup the worker thread, call the work function, and return a future for the result.
Definition WorkerThread.hpp:230
void SetLoop(uv::Loop &loop)
Set the loop.
Definition WorkerThread.hpp:176
Definition WorkerThread.hpp:156
Definition WorkerThread.hpp:91
wpi::util::PromiseFactory< R > m_promises
Definition WorkerThread.hpp:98
detail::WorkerThreadAsync< R > m_async
Definition WorkerThread.hpp:99
WorkerThreadRequest< R, T... > Request
Definition WorkerThread.hpp:93
void Main() override
Definition WorkerThread.hpp:129
std::vector< Request > m_requests
Definition WorkerThread.hpp:97
static std::shared_ptr< Async > Create(Loop &loop)
Create an async handle.
Definition Async.hpp:53
Event loop.
Definition Loop.hpp:35
A promise factory for lightweight futures.
Definition future.hpp:122
void SetValue(uint64_t request, const T &value)
Sets a value directly for a future without creating a promise object.
Definition future.hpp:728
wpi::util::mutex m_mutex
Definition SafeThread.hpp:27
std::atomic_bool m_active
Definition SafeThread.hpp:28
Definition SafeThread.hpp:32
wpi::util::condition_variable m_cond
Definition SafeThread.hpp:36
Definition SafeThread.hpp:123
A lightweight version of std::future.
Definition future.hpp:270
Converts a string literal into a format string that will be parsed at compile time and converted into...
Definition printf.h:50
Definition StringMap.hpp:773
void RunWorkerThreadRequest(WorkerThreadThread< R, T... > &thr, WorkerThreadRequest< R, T... > &req)
Definition WorkerThread.hpp:103
Definition raw_socket_ostream.hpp:9
std::function< void()> AfterWorkFunction
Definition WorkerThread.hpp:46
void RemoveLoop()
Definition WorkerThread.hpp:56
~WorkerThreadAsync()
Definition WorkerThread.hpp:48
void SetLoop(uv::Loop &loop)
Definition WorkerThread.hpp:50
std::weak_ptr< uv::Async< AfterWorkFunction > > m_async
Definition WorkerThread.hpp:63
Definition WorkerThread.hpp:22
void UnsetLoop()
Definition WorkerThread.hpp:34
std::weak_ptr< uv::Async< AfterWorkFunction, R > > m_async
Definition WorkerThread.hpp:41
void SetLoop(uv::Loop &loop)
Definition WorkerThread.hpp:27
~WorkerThreadAsync()
Definition WorkerThread.hpp:25
std::function< void(R)> AfterWorkFunction
Definition WorkerThread.hpp:23
Definition WorkerThread.hpp:67
AfterWorkFunction afterWork
Definition WorkerThread.hpp:86
std::tuple< T... > params
Definition WorkerThread.hpp:87
std::function< R(T...)> WorkFunction
Definition WorkerThread.hpp:68
typename WorkerThreadAsync< R >::AfterWorkFunction AfterWorkFunction
Definition WorkerThread.hpp:69
uint64_t promiseId
Definition WorkerThread.hpp:84
WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_, std::tuple< T... > params_)
Definition WorkerThread.hpp:72
WorkerThreadRequest(WorkFunction work_, AfterWorkFunction afterWork_, std::tuple< T... > params_)
Definition WorkerThread.hpp:77
WorkFunction work
Definition WorkerThread.hpp:85