WPILibC++ 2024.3.2
Stream.h
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifndef WPINET_UV_STREAM_H_
6#define WPINET_UV_STREAM_H_
7
8#include <uv.h>
9
10#include <cstdlib>
11#include <functional>
12#include <initializer_list>
13#include <memory>
14#include <span>
15#include <utility>
16
17#include <wpi/Signal.h>
18
19#include "wpinet/uv/Buffer.h"
20#include "wpinet/uv/Handle.h"
21#include "wpinet/uv/Request.h"
22
23namespace wpi::uv {
24
25class Stream;
26
27/**
28 * Shutdown request.
29 */
30class ShutdownReq : public RequestImpl<ShutdownReq, uv_shutdown_t> {
31 public:
33
34 Stream& GetStream() const {
35 return *static_cast<Stream*>(GetRaw()->handle->data);
36 }
37
38 /**
39 * Shutdown completed signal.
40 */
42};
43
44/**
45 * Write request.
46 */
47class WriteReq : public RequestImpl<WriteReq, uv_write_t> {
48 public:
50
51 Stream& GetStream() const {
52 return *static_cast<Stream*>(GetRaw()->handle->data);
53 }
54
55 /**
56 * Write completed signal. This is called even if an error occurred.
57 * @param err error value
58 */
60};
61
62/**
63 * Stream handle.
64 * Stream handles provide an abstraction of a duplex communication channel.
65 * This is an abstract type; there are three stream implementations (Tcp,
66 * Pipe, and Tty).
67 */
68class Stream : public Handle {
69 public:
70 std::shared_ptr<Stream> shared_from_this() {
71 return std::static_pointer_cast<Stream>(Handle::shared_from_this());
72 }
73
74 std::shared_ptr<const Stream> shared_from_this() const {
75 return std::static_pointer_cast<const Stream>(Handle::shared_from_this());
76 }
77
78 /**
79 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
80 * write requests to complete. HandleShutdownComplete() is called on the
81 * request after shutdown is complete.
82 *
83 * @param req shutdown request
84 */
85 void Shutdown(const std::shared_ptr<ShutdownReq>& req);
86
87 /**
88 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
89 * write requests to complete. The callback is called after shutdown is
90 * complete. Errors will be reported to the stream error handler.
91 *
92 * @param callback Callback function to call when shutdown completes
93 * @return Connection object for the callback
94 */
95 void Shutdown(std::function<void()> callback = nullptr);
96
97 /**
98 * Start reading data from an incoming stream.
99 *
100 * This will only succeed after a connection has been established.
101 *
102 * A data signal will be emitted several times until there is no more
103 * data to read or `StopRead()` is called.
104 * An end signal will be emitted when there is no more data to read.
105 */
106 void StartRead();
107
108 /**
109 * Stop reading data from the stream.
110 *
111 * This function is idempotent and may be safely called on a stopped stream.
112 */
114
115 /**
116 * Write data to the stream.
117 *
118 * Data are written in order. The lifetime of the data pointers passed in
119 * the `bufs` parameter must exceed the lifetime of the write request.
120 * An easy way to ensure this is to have the write request keep track of
121 * the data and use either its Complete() function or destructor to free the
122 * data.
123 *
124 * The finish signal will be emitted on the request object when the data
125 * has been written (or if an error occurs).
126 * The error signal will be emitted on the request object in case of errors.
127 *
128 * @param bufs The buffers to be written to the stream.
129 * @param req write request
130 */
131 void Write(std::span<const Buffer> bufs,
132 const std::shared_ptr<WriteReq>& req);
133
134 /**
135 * Write data to the stream.
136 *
137 * Data are written in order. The lifetime of the data pointers passed in
138 * the `bufs` parameter must exceed the lifetime of the write request.
139 * An easy way to ensure this is to have the write request keep track of
140 * the data and use either its Complete() function or destructor to free the
141 * data.
142 *
143 * The finish signal will be emitted on the request object when the data
144 * has been written (or if an error occurs).
145 * The error signal will be emitted on the request object in case of errors.
146 *
147 * @param bufs The buffers to be written to the stream.
148 * @param req write request
149 */
150 void Write(std::initializer_list<Buffer> bufs,
151 const std::shared_ptr<WriteReq>& req) {
152 Write({bufs.begin(), bufs.end()}, req);
153 }
154
155 /**
156 * Write data to the stream.
157 *
158 * Data are written in order. The lifetime of the data pointers passed in
159 * the `bufs` parameter must exceed the lifetime of the write request.
160 * The callback can be used to free data after the request completes.
161 *
162 * The callback will be called when the data has been written (even if an
163 * error occurred). Errors will be reported to the stream error handler.
164 *
165 * @param bufs The buffers to be written to the stream.
166 * @param callback Callback function to call when the write completes
167 */
168 void Write(std::span<const Buffer> bufs,
169 std::function<void(std::span<Buffer>, Error)> callback);
170
171 /**
172 * Write data to the stream.
173 *
174 * Data are written in order. The lifetime of the data pointers passed in
175 * the `bufs` parameter must exceed the lifetime of the write request.
176 * The callback can be used to free data after the request completes.
177 *
178 * The callback will be called when the data has been written (even if an
179 * error occurred). Errors will be reported to the stream error handler.
180 *
181 * @param bufs The buffers to be written to the stream.
182 * @param callback Callback function to call when the write completes
183 */
184 void Write(std::initializer_list<Buffer> bufs,
185 std::function<void(std::span<Buffer>, Error)> callback) {
186 Write({bufs.begin(), bufs.end()}, std::move(callback));
187 }
188
189 /**
190 * Queue a write request if it can be completed immediately.
191 *
192 * Same as `Write()`, but won’t queue a write request if it can’t be
193 * completed immediately.
194 * An error signal will be emitted in case of errors.
195 *
196 * @param bufs The buffers to be written to the stream.
197 * @return Number of bytes written, or negative (error code) on error
198 */
199 [[nodiscard]]
200 int TryWrite(std::span<const Buffer> bufs);
201
202 /**
203 * Queue a write request if it can be completed immediately.
204 *
205 * Same as `Write()`, but won’t queue a write request if it can’t be
206 * completed immediately.
207 * An error signal will be emitted in case of errors.
208 *
209 * @param bufs The buffers to be written to the stream.
210 * @return Number of bytes written, or negative (error code) on error
211 */
212 [[nodiscard]]
213 int TryWrite(std::initializer_list<Buffer> bufs) {
214 return TryWrite({bufs.begin(), bufs.end()});
215 }
216
217 /**
218 * Same as TryWrite() and extended write function for sending handles over a
219 * pipe.
220 *
221 * Try to send a handle is not supported on Windows, where it returns
222 * UV_EAGAIN.
223 *
224 * @param bufs The buffers to be written to the stream.
225 * @param send send stream
226 * @return Number of bytes written, or negative (error code) on error
227 */
228 [[nodiscard]]
229 int TryWrite2(std::span<const Buffer> bufs, Stream& send);
230
231 /**
232 * Same as TryWrite() and extended write function for sending handles over a
233 * pipe.
234 *
235 * Try to send a handle is not supported on Windows, where it returns
236 * UV_EAGAIN.
237 *
238 * @param bufs The buffers to be written to the stream.
239 * @param send send stream
240 * @return Number of bytes written, or negative (error code) on error
241 */
242 [[nodiscard]]
243 int TryWrite2(std::initializer_list<Buffer> bufs, Stream& send) {
244 return TryWrite2({bufs.begin(), bufs.end()}, send);
245 }
246
247 /**
248 * Check if the stream is readable.
249 * @return True if the stream is readable, false otherwise.
250 */
251 bool IsReadable() const noexcept {
252 return uv_is_readable(GetRawStream()) == 1;
253 }
254
255 /**
256 * @brief Checks if the stream is writable.
257 * @return True if the stream is writable, false otherwise.
258 */
259 bool IsWritable() const noexcept {
260 return uv_is_writable(GetRawStream()) == 1;
261 }
262
263 /**
264 * Enable or disable blocking mode for a stream.
265 *
266 * When blocking mode is enabled all writes complete synchronously. The
267 * interface remains unchanged otherwise, e.g. completion or failure of the
268 * operation will still be reported through events which are emitted
269 * asynchronously.
270 *
271 * @param enable True to enable blocking mode, false otherwise.
272 * @return True in case of success, false otherwise.
273 */
274 bool SetBlocking(bool enable) noexcept {
275 return uv_stream_set_blocking(GetRawStream(), enable) == 0;
276 }
277
278 /**
279 * Gets the amount of queued bytes waiting to be sent.
280 * @return Amount of queued bytes waiting to be sent.
281 */
282 size_t GetWriteQueueSize() const noexcept {
283 return GetRawStream()->write_queue_size;
284 }
285
286 /**
287 * Get the underlying stream data structure.
288 *
289 * @return The underlying stream data structure.
290 */
291 uv_stream_t* GetRawStream() const noexcept {
292 return reinterpret_cast<uv_stream_t*>(GetRawHandle());
293 }
294
295 /**
296 * Signal generated when data was read on a stream.
297 */
299
300 /**
301 * Signal generated when no more read data is available.
302 */
304
305 protected:
306 explicit Stream(uv_stream_t* uv_stream)
307 : Handle{reinterpret_cast<uv_handle_t*>(uv_stream)} {}
308};
309
310template <typename T, typename U>
311class StreamImpl : public Stream {
312 public:
313 std::shared_ptr<T> shared_from_this() {
314 return std::static_pointer_cast<T>(Handle::shared_from_this());
315 }
316
317 std::shared_ptr<const T> shared_from_this() const {
318 return std::static_pointer_cast<const T>(Handle::shared_from_this());
319 }
320
321 /**
322 * Get the underlying handle data structure.
323 *
324 * @return The underlying handle data structure.
325 */
326 U* GetRaw() const noexcept {
327 return reinterpret_cast<U*>(this->GetRawHandle());
328 }
329
330 protected:
331 StreamImpl() : Stream{static_cast<uv_stream_t*>(std::malloc(sizeof(U)))} {}
332};
333
334} // namespace wpi::uv
335
336#endif // WPINET_UV_STREAM_H_
SignalBase is an implementation of the observer pattern, through the use of an emitting object and sl...
Definition: Signal.h:495
Error code.
Definition: Error.h:15
Handle.
Definition: Handle.h:34
uv_handle_t * GetRawHandle() const noexcept
Get the underlying handle data structure.
Definition: Handle.h:180
bool Invoke(F &&f, Args &&... args) const
Definition: Handle.h:267
Request.
Definition: Request.h:135
uv_shutdown_t * GetRaw() noexcept
Get the underlying request data structure.
Definition: Request.h:150
Shutdown request.
Definition: Stream.h:30
sig::Signal complete
Shutdown completed signal.
Definition: Stream.h:41
Stream & GetStream() const
Definition: Stream.h:34
Stream handle.
Definition: Stream.h:68
std::shared_ptr< Stream > shared_from_this()
Definition: Stream.h:70
void Write(std::span< const Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
sig::Signal end
Signal generated when no more read data is available.
Definition: Stream.h:303
bool IsReadable() const noexcept
Check if the stream is readable.
Definition: Stream.h:251
void StopRead()
Stop reading data from the stream.
Definition: Stream.h:113
void Write(std::span< const Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
void Shutdown(std::function< void()> callback=nullptr)
Shutdown the outgoing (write) side of a duplex stream.
bool SetBlocking(bool enable) noexcept
Enable or disable blocking mode for a stream.
Definition: Stream.h:274
Stream(uv_stream_t *uv_stream)
Definition: Stream.h:306
void Write(std::initializer_list< Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
Definition: Stream.h:150
int TryWrite(std::initializer_list< Buffer > bufs)
Queue a write request if it can be completed immediately.
Definition: Stream.h:213
size_t GetWriteQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition: Stream.h:282
int TryWrite2(std::initializer_list< Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
Definition: Stream.h:243
void StartRead()
Start reading data from an incoming stream.
int TryWrite2(std::span< const Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
int TryWrite(std::span< const Buffer > bufs)
Queue a write request if it can be completed immediately.
void Write(std::initializer_list< Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
Definition: Stream.h:184
bool IsWritable() const noexcept
Checks if the stream is writable.
Definition: Stream.h:259
uv_stream_t * GetRawStream() const noexcept
Get the underlying stream data structure.
Definition: Stream.h:291
std::shared_ptr< const Stream > shared_from_this() const
Definition: Stream.h:74
void Shutdown(const std::shared_ptr< ShutdownReq > &req)
Shutdown the outgoing (write) side of a duplex stream.
sig::Signal< Buffer &, size_t > data
Signal generated when data was read on a stream.
Definition: Stream.h:298
Definition: Stream.h:311
std::shared_ptr< const T > shared_from_this() const
Definition: Stream.h:317
std::shared_ptr< T > shared_from_this()
Definition: Stream.h:313
StreamImpl()
Definition: Stream.h:331
U * GetRaw() const noexcept
Get the underlying handle data structure.
Definition: Stream.h:326
Write request.
Definition: Stream.h:47
Stream & GetStream() const
Definition: Stream.h:51
sig::Signal< Error > finish
Write completed signal.
Definition: Stream.h:59
Definition: array.h:89
Definition: WebSocket.h:27
Definition: uv.h:474
UV_REQ_FIELDS uv_stream_t * handle
Definition: uv.h:452
Definition: uv.h:530
uv_stream_t * handle
Definition: uv.h:569
UV_EXTERN int uv_read_stop(uv_stream_t *)
UV_EXTERN int uv_stream_set_blocking(uv_stream_t *handle, int blocking)
UV_EXTERN int uv_is_writable(const uv_stream_t *handle)
UV_EXTERN int uv_is_readable(const uv_stream_t *handle)