5#ifndef WPIUTIL_WPI_CALLBACKMANAGER_H_
6#define WPIUTIL_WPI_CALLBACKMANAGER_H_
24template <
typename Callback>
46template <
typename Derived,
typename TUserInfo,
47 typename TListenerData =
49 typename TNotifierData = TUserInfo>
56 CallbackThread(std::function<
void()> on_start, std::function<
void()> on_exit)
61 for (
size_t i = 0; i <
m_pollers.size(); ++i) {
72 std::queue<std::pair<unsigned int, NotifierData>>
m_queue;
95 template <
typename... Args>
96 void SendPoller(
unsigned int poller_uid, Args&&... args) {
105 std::scoped_lock lock(poller->poll_mutex);
106 poller->poll_queue.emplace(std::forward<Args>(args)...);
108 poller->poll_cond.notify_one();
112template <
typename Derived,
typename TUserInfo,
typename TListenerData,
113 typename TNotifierData>
119 std::unique_lock lock(m_mutex);
121 while (m_queue.empty()) {
128 while (!m_queue.empty()) {
132 auto item = std::move(m_queue.front());
134 if (item.first != UINT_MAX) {
135 if (item.first < m_listeners.size()) {
136 auto& listener = m_listeners[item.first];
138 static_cast<Derived*
>(
this)->Matches(listener, item.second)) {
139 static_cast<Derived*
>(
this)->SetListener(&item.second, item.first);
140 if (listener.callback) {
142 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
145 }
else if (listener.poller_uid != UINT_MAX) {
146 SendPoller(listener.poller_uid, std::move(item.second));
152 for (
size_t i = 0; i < m_listeners.size(); ++i) {
153 auto& listener = m_listeners[i];
158 if (!
static_cast<Derived*
>(
this)->Matches(listener, item.second)) {
161 static_cast<Derived*
>(
this)->SetListener(&item.second,
162 static_cast<unsigned>(i));
163 if (listener.callback) {
165 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
168 }
else if (listener.poller_uid != UINT_MAX) {
169 SendPoller(listener.poller_uid, item.second);
176 m_queue_empty.notify_all();
191template <
typename Derived,
typename Thread>
197 m_on_start = std::move(on_start);
201 m_on_exit = std::move(on_exit);
211 thr->m_listeners.erase(listener_uid);
215 static_cast<Derived*
>(
this)->Start();
217 return thr->m_pollers.emplace_back(
218 std::make_shared<typename Thread::Poller>());
228 for (
size_t i = 0; i < thr->m_listeners.size(); ++i) {
229 if (thr->m_listeners[i].poller_uid == poller_uid) {
230 thr->m_listeners.erase(i);
235 if (poller_uid >= thr->m_pollers.size()) {
238 auto poller = thr->m_pollers[poller_uid];
243 thr->m_pollers.erase(poller_uid);
252 auto& lock = thr.GetLock();
253 auto timeout_time = std::chrono::steady_clock::now() +
254 std::chrono::duration<double>(timeout);
255 while (!thr->m_queue.empty()) {
256 if (!thr->m_active) {
263 thr->m_queue_empty.wait(lock);
265 auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
266 if (cond_timed_out == std::cv_status::timeout) {
275 std::vector<typename Thread::UserInfo>
Poll(
unsigned int poller_uid) {
276 bool timed_out =
false;
277 return Poll(poller_uid, -1, &timed_out);
280 std::vector<typename Thread::UserInfo>
Poll(
unsigned int poller_uid,
281 double timeout,
bool* timed_out) {
282 std::vector<typename Thread::UserInfo> infos;
283 std::shared_ptr<typename Thread::Poller> poller;
289 if (poller_uid > thr->m_pollers.size()) {
292 poller = thr->m_pollers[poller_uid];
298 std::unique_lock lock(poller->poll_mutex);
299 auto timeout_time = std::chrono::steady_clock::now() +
300 std::chrono::duration<double>(timeout);
302 while (poller->poll_queue.empty()) {
303 if (poller->terminating) {
306 if (poller->canceling) {
309 poller->canceling =
false;
317 poller->poll_cond.wait(lock);
319 auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
320 if (cond_timed_out == std::cv_status::timeout) {
327 while (!poller->poll_queue.empty()) {
328 infos.emplace_back(std::move(poller->poll_queue.front()));
329 poller->poll_queue.pop();
335 std::shared_ptr<typename Thread::Poller> poller;
341 if (poller_uid > thr->m_pollers.size()) {
344 poller = thr->m_pollers[poller_uid];
351 std::scoped_lock lock(poller->poll_mutex);
352 poller->canceling =
true;
354 poller->poll_cond.notify_one();
358 template <
typename... Args>
360 m_owner.
Start(m_on_start, m_on_exit, std::forward<Args>(args)...);
363 template <
typename... Args>
364 unsigned int DoAdd(Args&&... args) {
365 static_cast<Derived*
>(
this)->Start();
367 return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
370 template <
typename... Args>
371 void Send(
unsigned int only_listener, Args&&... args) {
373 if (!thr || thr->m_listeners.empty()) {
376 thr->m_queue.emplace(std::piecewise_construct,
377 std::make_tuple(only_listener),
378 std::forward_as_tuple(std::forward<Args>(args)...));
379 thr->m_cond.notify_one();
389 std::function<void()> m_on_start;
390 std::function<void()> m_on_exit;
Definition: CallbackManager.h:25
CallbackListenerData()=default
CallbackListenerData(unsigned int poller_uid_)
Definition: CallbackManager.h:29
CallbackListenerData(Callback callback_)
Definition: CallbackManager.h:28
unsigned int poller_uid
Definition: CallbackManager.h:35
Callback callback
Definition: CallbackManager.h:34
Definition: CallbackManager.h:192
wpi::SafeThreadOwner< Thread >::Proxy GetThread() const
Definition: CallbackManager.h:382
void SetOnExit(std::function< void()> on_exit)
Definition: CallbackManager.h:200
void Send(unsigned int only_listener, Args &&... args)
Definition: CallbackManager.h:371
void SetOnStart(std::function< void()> on_start)
Definition: CallbackManager.h:196
bool WaitForQueue(double timeout)
Definition: CallbackManager.h:246
void RemovePoller(unsigned int poller_uid)
Definition: CallbackManager.h:221
void CancelPoll(unsigned int poller_uid)
Definition: CallbackManager.h:334
unsigned int DoAdd(Args &&... args)
Definition: CallbackManager.h:364
friend class RpcServerTest
Definition: CallbackManager.h:193
void Remove(unsigned int listener_uid)
Definition: CallbackManager.h:206
void Stop()
Definition: CallbackManager.h:204
std::vector< typename Thread::UserInfo > Poll(unsigned int poller_uid)
Definition: CallbackManager.h:275
std::vector< typename Thread::UserInfo > Poll(unsigned int poller_uid, double timeout, bool *timed_out)
Definition: CallbackManager.h:280
void DoStart(Args &&... args)
Definition: CallbackManager.h:359
unsigned int CreatePoller()
Definition: CallbackManager.h:214
Definition: CallbackManager.h:50
std::queue< std::pair< unsigned int, NotifierData > > m_queue
Definition: CallbackManager.h:72
TUserInfo UserInfo
Definition: CallbackManager.h:52
std::function< void()> m_on_start
Definition: CallbackManager.h:91
TListenerData ListenerData
Definition: CallbackManager.h:54
CallbackThread(std::function< void()> on_start, std::function< void()> on_exit)
Definition: CallbackManager.h:56
TNotifierData NotifierData
Definition: CallbackManager.h:53
void SendPoller(unsigned int poller_uid, Args &&... args)
Definition: CallbackManager.h:96
~CallbackThread() override
Definition: CallbackManager.h:59
std::function< void()> m_on_exit
Definition: CallbackManager.h:92
void Main() override
Definition: CallbackManager.h:114
wpi::condition_variable m_queue_empty
Definition: CallbackManager.h:73
wpi::UidVector< std::shared_ptr< Poller >, 64 > m_pollers
Definition: CallbackManager.h:89
wpi::UidVector< ListenerData, 64 > m_listeners
Definition: CallbackManager.h:70
Definition: SafeThread.h:33
void Start(Args &&... args)
Definition: SafeThread.h:127
Proxy GetThread() const
Definition: SafeThread.h:133
typename detail::SafeThreadProxy< T > Proxy
Definition: SafeThread.h:132
Definition: ntcore_cpp.h:26
Definition: CallbackManager.h:75
wpi::mutex poll_mutex
Definition: CallbackManager.h:84
wpi::condition_variable poll_cond
Definition: CallbackManager.h:85
void Terminate()
Definition: CallbackManager.h:76
std::queue< NotifierData > poll_queue
Definition: CallbackManager.h:83
bool canceling
Definition: CallbackManager.h:87
bool terminating
Definition: CallbackManager.h:86
::std::condition_variable condition_variable
Definition: condition_variable.h:16
::std::mutex mutex
Definition: mutex.h:17