WPILibC++ 2027.0.0-alpha-4
Loading...
Searching...
No Matches
Stream.hpp
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#pragma once
6
7#include <cstdlib>
8#include <functional>
9#include <initializer_list>
10#include <memory>
11#include <span>
12#include <utility>
13
14#include <uv.h>
15
16#include "wpi/net/uv/Buffer.hpp"
17#include "wpi/net/uv/Handle.hpp"
19#include "wpi/util/Signal.h"
20
21namespace wpi::net::uv {
22
23class Stream;
24
25/**
26 * Shutdown request.
27 */
28class ShutdownReq : public RequestImpl<ShutdownReq, uv_shutdown_t> {
29 public:
31
32 Stream& GetStream() const {
33 return *static_cast<Stream*>(GetRaw()->handle->data);
34 }
35
36 /**
37 * Shutdown completed signal.
38 */
40};
41
42/**
43 * Write request.
44 */
45class WriteReq : public RequestImpl<WriteReq, uv_write_t> {
46 public:
48
49 Stream& GetStream() const {
50 return *static_cast<Stream*>(GetRaw()->handle->data);
51 }
52
53 /**
54 * Write completed signal. This is called even if an error occurred.
55 * @param err error value
56 */
58};
59
60/**
61 * Stream handle.
62 * Stream handles provide an abstraction of a duplex communication channel.
63 * This is an abstract type; there are three stream implementations (Tcp,
64 * Pipe, and Tty).
65 */
66class Stream : public Handle {
67 public:
68 std::shared_ptr<Stream> shared_from_this() {
69 return std::static_pointer_cast<Stream>(Handle::shared_from_this());
70 }
71
72 std::shared_ptr<const Stream> shared_from_this() const {
73 return std::static_pointer_cast<const Stream>(Handle::shared_from_this());
74 }
75
76 /**
77 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
78 * write requests to complete. HandleShutdownComplete() is called on the
79 * request after shutdown is complete.
80 *
81 * @param req shutdown request
82 */
83 void Shutdown(const std::shared_ptr<ShutdownReq>& req);
84
85 /**
86 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
87 * write requests to complete. The callback is called after shutdown is
88 * complete. Errors will be reported to the stream error handler.
89 *
90 * @param callback Callback function to call when shutdown completes
91 */
92 void Shutdown(std::function<void()> callback = nullptr);
93
94 /**
95 * Start reading data from an incoming stream.
96 *
97 * This will only succeed after a connection has been established.
98 *
99 * A data signal will be emitted several times until there is no more
100 * data to read or `StopRead()` is called.
101 * An end signal will be emitted when there is no more data to read.
102 */
103 void StartRead();
104
105 /**
106 * Stop reading data from the stream.
107 *
108 * This function is idempotent and may be safely called on a stopped stream.
109 */
111
112 /**
113 * Write data to the stream.
114 *
115 * Data are written in order. The lifetime of the data pointers passed in
116 * the `bufs` parameter must exceed the lifetime of the write request.
117 * An easy way to ensure this is to have the write request keep track of
118 * the data and use either its Complete() function or destructor to free the
119 * data.
120 *
121 * The finish signal will be emitted on the request object when the data
122 * has been written (or if an error occurs).
123 * The error signal will be emitted on the request object in case of errors.
124 *
125 * @param bufs The buffers to be written to the stream.
126 * @param req write request
127 */
128 void Write(std::span<const Buffer> bufs,
129 const std::shared_ptr<WriteReq>& req);
130
131 /**
132 * Write data to the stream.
133 *
134 * Data are written in order. The lifetime of the data pointers passed in
135 * the `bufs` parameter must exceed the lifetime of the write request.
136 * An easy way to ensure this is to have the write request keep track of
137 * the data and use either its Complete() function or destructor to free the
138 * data.
139 *
140 * The finish signal will be emitted on the request object when the data
141 * has been written (or if an error occurs).
142 * The error signal will be emitted on the request object in case of errors.
143 *
144 * @param bufs The buffers to be written to the stream.
145 * @param req write request
146 */
147 void Write(std::initializer_list<Buffer> bufs,
148 const std::shared_ptr<WriteReq>& req) {
149 Write({bufs.begin(), bufs.end()}, req);
150 }
151
152 /**
153 * Write data to the stream.
154 *
155 * Data are written in order. The lifetime of the data pointers passed in
156 * the `bufs` parameter must exceed the lifetime of the write request.
157 * The callback can be used to free data after the request completes.
158 *
159 * The callback will be called when the data has been written (even if an
160 * error occurred). Errors will be reported to the stream error handler.
161 *
162 * @param bufs The buffers to be written to the stream.
163 * @param callback Callback function to call when the write completes
164 */
165 void Write(std::span<const Buffer> bufs,
166 std::function<void(std::span<Buffer>, Error)> callback);
167
168 /**
169 * Write data to the stream.
170 *
171 * Data are written in order. The lifetime of the data pointers passed in
172 * the `bufs` parameter must exceed the lifetime of the write request.
173 * The callback can be used to free data after the request completes.
174 *
175 * The callback will be called when the data has been written (even if an
176 * error occurred). Errors will be reported to the stream error handler.
177 *
178 * @param bufs The buffers to be written to the stream.
179 * @param callback Callback function to call when the write completes
180 */
181 void Write(std::initializer_list<Buffer> bufs,
182 std::function<void(std::span<Buffer>, Error)> callback) {
183 Write({bufs.begin(), bufs.end()}, std::move(callback));
184 }
185
186 /**
187 * Queue a write request if it can be completed immediately.
188 *
189 * Same as `Write()`, but won’t queue a write request if it can’t be
190 * completed immediately.
191 * An error signal will be emitted in case of errors.
192 *
193 * @param bufs The buffers to be written to the stream.
194 * @return Number of bytes written, or negative (error code) on error
195 */
196 [[nodiscard]]
197 int TryWrite(std::span<const Buffer> bufs);
198
199 /**
200 * Queue a write request if it can be completed immediately.
201 *
202 * Same as `Write()`, but won’t queue a write request if it can’t be
203 * completed immediately.
204 * An error signal will be emitted in case of errors.
205 *
206 * @param bufs The buffers to be written to the stream.
207 * @return Number of bytes written, or negative (error code) on error
208 */
209 [[nodiscard]]
210 int TryWrite(std::initializer_list<Buffer> bufs) {
211 return TryWrite({bufs.begin(), bufs.end()});
212 }
213
214 /**
215 * Same as TryWrite() and extended write function for sending handles over a
216 * pipe.
217 *
218 * Try to send a handle is not supported on Windows, where it returns
219 * UV_EAGAIN.
220 *
221 * @param bufs The buffers to be written to the stream.
222 * @param send send stream
223 * @return Number of bytes written, or negative (error code) on error
224 */
225 [[nodiscard]]
226 int TryWrite2(std::span<const Buffer> bufs, Stream& send);
227
228 /**
229 * Same as TryWrite() and extended write function for sending handles over a
230 * pipe.
231 *
232 * Try to send a handle is not supported on Windows, where it returns
233 * UV_EAGAIN.
234 *
235 * @param bufs The buffers to be written to the stream.
236 * @param send send stream
237 * @return Number of bytes written, or negative (error code) on error
238 */
239 [[nodiscard]]
240 int TryWrite2(std::initializer_list<Buffer> bufs, Stream& send) {
241 return TryWrite2({bufs.begin(), bufs.end()}, send);
242 }
243
244 /**
245 * Check if the stream is readable.
246 * @return True if the stream is readable, false otherwise.
247 */
248 bool IsReadable() const noexcept {
249 return uv_is_readable(GetRawStream()) == 1;
250 }
251
252 /**
253 * @brief Checks if the stream is writable.
254 * @return True if the stream is writable, false otherwise.
255 */
256 bool IsWritable() const noexcept {
257 return uv_is_writable(GetRawStream()) == 1;
258 }
259
260 /**
261 * Enable or disable blocking mode for a stream.
262 *
263 * When blocking mode is enabled all writes complete synchronously. The
264 * interface remains unchanged otherwise, e.g. completion or failure of the
265 * operation will still be reported through events which are emitted
266 * asynchronously.
267 *
268 * @param enable True to enable blocking mode, false otherwise.
269 * @return True in case of success, false otherwise.
270 */
271 bool SetBlocking(bool enable) noexcept {
272 return uv_stream_set_blocking(GetRawStream(), enable) == 0;
273 }
274
275 /**
276 * Gets the amount of queued bytes waiting to be sent.
277 * @return Amount of queued bytes waiting to be sent.
278 */
279 size_t GetWriteQueueSize() const noexcept {
280 return GetRawStream()->write_queue_size;
281 }
282
283 /**
284 * Get the underlying stream data structure.
285 *
286 * @return The underlying stream data structure.
287 */
288 uv_stream_t* GetRawStream() const noexcept {
289 return reinterpret_cast<uv_stream_t*>(GetRawHandle());
290 }
291
292 /**
293 * Signal generated when data was read on a stream.
294 */
296
297 /**
298 * Signal generated when no more read data is available.
299 */
301
302 protected:
303 explicit Stream(uv_stream_t* uv_stream)
304 : Handle{reinterpret_cast<uv_handle_t*>(uv_stream)} {}
305};
306
307template <typename T, typename U>
308class StreamImpl : public Stream {
309 public:
310 std::shared_ptr<T> shared_from_this() {
311 return std::static_pointer_cast<T>(Handle::shared_from_this());
312 }
313
314 std::shared_ptr<const T> shared_from_this() const {
315 return std::static_pointer_cast<const T>(Handle::shared_from_this());
316 }
317
318 /**
319 * Get the underlying handle data structure.
320 *
321 * @return The underlying handle data structure.
322 */
323 U* GetRaw() const noexcept {
324 return reinterpret_cast<U*>(this->GetRawHandle());
325 }
326
327 protected:
328 StreamImpl() : Stream{static_cast<uv_stream_t*>(std::malloc(sizeof(U)))} {}
329};
330
331} // namespace wpi::net::uv
Error code.
Definition Error.hpp:14
bool Invoke(F &&f, Args &&... args) const
Definition Handle.hpp:265
uv_handle_t * GetRawHandle() const noexcept
Get the underlying handle data structure.
Definition Handle.hpp:178
Handle(const Handle &)=delete
uv_shutdown_t * GetRaw() noexcept
Definition Request.hpp:149
Stream & GetStream() const
Definition Stream.hpp:32
wpi::util::sig::Signal complete
Shutdown completed signal.
Definition Stream.hpp:39
Stream handle.
Definition Stream.hpp:66
void Write(std::initializer_list< Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
Definition Stream.hpp:181
int TryWrite(std::initializer_list< Buffer > bufs)
Queue a write request if it can be completed immediately.
Definition Stream.hpp:210
wpi::util::sig::Signal end
Signal generated when no more read data is available.
Definition Stream.hpp:300
void Shutdown(const std::shared_ptr< ShutdownReq > &req)
Shutdown the outgoing (write) side of a duplex stream.
size_t GetWriteQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition Stream.hpp:279
Stream(uv_stream_t *uv_stream)
Definition Stream.hpp:303
int TryWrite2(std::initializer_list< Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
Definition Stream.hpp:240
void Write(std::initializer_list< Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
Definition Stream.hpp:147
std::shared_ptr< const Stream > shared_from_this() const
Definition Stream.hpp:72
void Shutdown(std::function< void()> callback=nullptr)
Shutdown the outgoing (write) side of a duplex stream.
bool SetBlocking(bool enable) noexcept
Enable or disable blocking mode for a stream.
Definition Stream.hpp:271
wpi::util::sig::Signal< Buffer &, size_t > data
Signal generated when data was read on a stream.
Definition Stream.hpp:295
uv_stream_t * GetRawStream() const noexcept
Get the underlying stream data structure.
Definition Stream.hpp:288
bool IsReadable() const noexcept
Check if the stream is readable.
Definition Stream.hpp:248
int TryWrite2(std::span< const Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
void StartRead()
Start reading data from an incoming stream.
int TryWrite(std::span< const Buffer > bufs)
Queue a write request if it can be completed immediately.
std::shared_ptr< Stream > shared_from_this()
Definition Stream.hpp:68
bool IsWritable() const noexcept
Checks if the stream is writable.
Definition Stream.hpp:256
void StopRead()
Stop reading data from the stream.
Definition Stream.hpp:110
void Write(std::span< const Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
void Write(std::span< const Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
U * GetRaw() const noexcept
Get the underlying handle data structure.
Definition Stream.hpp:323
std::shared_ptr< T > shared_from_this()
Definition Stream.hpp:310
StreamImpl()
Definition Stream.hpp:328
std::shared_ptr< const T > shared_from_this() const
Definition Stream.hpp:314
Stream & GetStream() const
Definition Stream.hpp:49
wpi::util::sig::Signal< Error > finish
Write completed signal.
Definition Stream.hpp:57
Definition StringMap.hpp:773
Definition Prepare.hpp:14
SignalBase< detail::NullMutex, T... > Signal
Specialization of SignalBase to be used in single threaded contexts.
Definition Signal.h:809
UV_REQ_FIELDS uv_stream_t * handle
Definition uv.h:454
uv_stream_t * handle
Definition uv.h:571
UV_EXTERN int uv_read_stop(uv_stream_t *)
UV_EXTERN int uv_stream_set_blocking(uv_stream_t *handle, int blocking)
UV_EXTERN int uv_is_writable(const uv_stream_t *handle)
struct uv_handle_s uv_handle_t
Definition uv.h:216
UV_EXTERN int uv_is_readable(const uv_stream_t *handle)
struct uv_stream_s uv_stream_t
Definition uv.h:218