WPILibC++ 2027.0.0-alpha-3
Loading...
Searching...
No Matches
Stream.h
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifndef WPINET_UV_STREAM_H_
6#define WPINET_UV_STREAM_H_
7
8#include <uv.h>
9
10#include <cstdlib>
11#include <functional>
12#include <initializer_list>
13#include <memory>
14#include <span>
15#include <utility>
16
17#include <wpi/Signal.h>
18
19#include "wpinet/uv/Buffer.h"
20#include "wpinet/uv/Handle.h"
21#include "wpinet/uv/Request.h"
22
23namespace wpi::uv {
24
25class Stream;
26
27/**
28 * Shutdown request.
29 */
30class ShutdownReq : public RequestImpl<ShutdownReq, uv_shutdown_t> {
31 public:
33
34 Stream& GetStream() const {
35 return *static_cast<Stream*>(GetRaw()->handle->data);
36 }
37
38 /**
39 * Shutdown completed signal.
40 */
42};
43
44/**
45 * Write request.
46 */
47class WriteReq : public RequestImpl<WriteReq, uv_write_t> {
48 public:
50
51 Stream& GetStream() const {
52 return *static_cast<Stream*>(GetRaw()->handle->data);
53 }
54
55 /**
56 * Write completed signal. This is called even if an error occurred.
57 * @param err error value
58 */
60};
61
62/**
63 * Stream handle.
64 * Stream handles provide an abstraction of a duplex communication channel.
65 * This is an abstract type; there are three stream implementations (Tcp,
66 * Pipe, and Tty).
67 */
68class Stream : public Handle {
69 public:
70 std::shared_ptr<Stream> shared_from_this() {
71 return std::static_pointer_cast<Stream>(Handle::shared_from_this());
72 }
73
74 std::shared_ptr<const Stream> shared_from_this() const {
75 return std::static_pointer_cast<const Stream>(Handle::shared_from_this());
76 }
77
78 /**
79 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
80 * write requests to complete. HandleShutdownComplete() is called on the
81 * request after shutdown is complete.
82 *
83 * @param req shutdown request
84 */
85 void Shutdown(const std::shared_ptr<ShutdownReq>& req);
86
87 /**
88 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
89 * write requests to complete. The callback is called after shutdown is
90 * complete. Errors will be reported to the stream error handler.
91 *
92 * @param callback Callback function to call when shutdown completes
93 */
94 void Shutdown(std::function<void()> callback = nullptr);
95
96 /**
97 * Start reading data from an incoming stream.
98 *
99 * This will only succeed after a connection has been established.
100 *
101 * A data signal will be emitted several times until there is no more
102 * data to read or `StopRead()` is called.
103 * An end signal will be emitted when there is no more data to read.
104 */
105 void StartRead();
106
107 /**
108 * Stop reading data from the stream.
109 *
110 * This function is idempotent and may be safely called on a stopped stream.
111 */
113
114 /**
115 * Write data to the stream.
116 *
117 * Data are written in order. The lifetime of the data pointers passed in
118 * the `bufs` parameter must exceed the lifetime of the write request.
119 * An easy way to ensure this is to have the write request keep track of
120 * the data and use either its Complete() function or destructor to free the
121 * data.
122 *
123 * The finish signal will be emitted on the request object when the data
124 * has been written (or if an error occurs).
125 * The error signal will be emitted on the request object in case of errors.
126 *
127 * @param bufs The buffers to be written to the stream.
128 * @param req write request
129 */
130 void Write(std::span<const Buffer> bufs,
131 const std::shared_ptr<WriteReq>& req);
132
133 /**
134 * Write data to the stream.
135 *
136 * Data are written in order. The lifetime of the data pointers passed in
137 * the `bufs` parameter must exceed the lifetime of the write request.
138 * An easy way to ensure this is to have the write request keep track of
139 * the data and use either its Complete() function or destructor to free the
140 * data.
141 *
142 * The finish signal will be emitted on the request object when the data
143 * has been written (or if an error occurs).
144 * The error signal will be emitted on the request object in case of errors.
145 *
146 * @param bufs The buffers to be written to the stream.
147 * @param req write request
148 */
149 void Write(std::initializer_list<Buffer> bufs,
150 const std::shared_ptr<WriteReq>& req) {
151 Write({bufs.begin(), bufs.end()}, req);
152 }
153
154 /**
155 * Write data to the stream.
156 *
157 * Data are written in order. The lifetime of the data pointers passed in
158 * the `bufs` parameter must exceed the lifetime of the write request.
159 * The callback can be used to free data after the request completes.
160 *
161 * The callback will be called when the data has been written (even if an
162 * error occurred). Errors will be reported to the stream error handler.
163 *
164 * @param bufs The buffers to be written to the stream.
165 * @param callback Callback function to call when the write completes
166 */
167 void Write(std::span<const Buffer> bufs,
168 std::function<void(std::span<Buffer>, Error)> callback);
169
170 /**
171 * Write data to the stream.
172 *
173 * Data are written in order. The lifetime of the data pointers passed in
174 * the `bufs` parameter must exceed the lifetime of the write request.
175 * The callback can be used to free data after the request completes.
176 *
177 * The callback will be called when the data has been written (even if an
178 * error occurred). Errors will be reported to the stream error handler.
179 *
180 * @param bufs The buffers to be written to the stream.
181 * @param callback Callback function to call when the write completes
182 */
183 void Write(std::initializer_list<Buffer> bufs,
184 std::function<void(std::span<Buffer>, Error)> callback) {
185 Write({bufs.begin(), bufs.end()}, std::move(callback));
186 }
187
188 /**
189 * Queue a write request if it can be completed immediately.
190 *
191 * Same as `Write()`, but won’t queue a write request if it can’t be
192 * completed immediately.
193 * An error signal will be emitted in case of errors.
194 *
195 * @param bufs The buffers to be written to the stream.
196 * @return Number of bytes written, or negative (error code) on error
197 */
198 [[nodiscard]]
199 int TryWrite(std::span<const Buffer> bufs);
200
201 /**
202 * Queue a write request if it can be completed immediately.
203 *
204 * Same as `Write()`, but won’t queue a write request if it can’t be
205 * completed immediately.
206 * An error signal will be emitted in case of errors.
207 *
208 * @param bufs The buffers to be written to the stream.
209 * @return Number of bytes written, or negative (error code) on error
210 */
211 [[nodiscard]]
212 int TryWrite(std::initializer_list<Buffer> bufs) {
213 return TryWrite({bufs.begin(), bufs.end()});
214 }
215
216 /**
217 * Same as TryWrite() and extended write function for sending handles over a
218 * pipe.
219 *
220 * Try to send a handle is not supported on Windows, where it returns
221 * UV_EAGAIN.
222 *
223 * @param bufs The buffers to be written to the stream.
224 * @param send send stream
225 * @return Number of bytes written, or negative (error code) on error
226 */
227 [[nodiscard]]
228 int TryWrite2(std::span<const Buffer> bufs, Stream& send);
229
230 /**
231 * Same as TryWrite() and extended write function for sending handles over a
232 * pipe.
233 *
234 * Try to send a handle is not supported on Windows, where it returns
235 * UV_EAGAIN.
236 *
237 * @param bufs The buffers to be written to the stream.
238 * @param send send stream
239 * @return Number of bytes written, or negative (error code) on error
240 */
241 [[nodiscard]]
242 int TryWrite2(std::initializer_list<Buffer> bufs, Stream& send) {
243 return TryWrite2({bufs.begin(), bufs.end()}, send);
244 }
245
246 /**
247 * Check if the stream is readable.
248 * @return True if the stream is readable, false otherwise.
249 */
250 bool IsReadable() const noexcept {
251 return uv_is_readable(GetRawStream()) == 1;
252 }
253
254 /**
255 * @brief Checks if the stream is writable.
256 * @return True if the stream is writable, false otherwise.
257 */
258 bool IsWritable() const noexcept {
259 return uv_is_writable(GetRawStream()) == 1;
260 }
261
262 /**
263 * Enable or disable blocking mode for a stream.
264 *
265 * When blocking mode is enabled all writes complete synchronously. The
266 * interface remains unchanged otherwise, e.g. completion or failure of the
267 * operation will still be reported through events which are emitted
268 * asynchronously.
269 *
270 * @param enable True to enable blocking mode, false otherwise.
271 * @return True in case of success, false otherwise.
272 */
273 bool SetBlocking(bool enable) noexcept {
274 return uv_stream_set_blocking(GetRawStream(), enable) == 0;
275 }
276
277 /**
278 * Gets the amount of queued bytes waiting to be sent.
279 * @return Amount of queued bytes waiting to be sent.
280 */
281 size_t GetWriteQueueSize() const noexcept {
282 return GetRawStream()->write_queue_size;
283 }
284
285 /**
286 * Get the underlying stream data structure.
287 *
288 * @return The underlying stream data structure.
289 */
290 uv_stream_t* GetRawStream() const noexcept {
291 return reinterpret_cast<uv_stream_t*>(GetRawHandle());
292 }
293
294 /**
295 * Signal generated when data was read on a stream.
296 */
298
299 /**
300 * Signal generated when no more read data is available.
301 */
303
304 protected:
305 explicit Stream(uv_stream_t* uv_stream)
306 : Handle{reinterpret_cast<uv_handle_t*>(uv_stream)} {}
307};
308
309template <typename T, typename U>
310class StreamImpl : public Stream {
311 public:
312 std::shared_ptr<T> shared_from_this() {
313 return std::static_pointer_cast<T>(Handle::shared_from_this());
314 }
315
316 std::shared_ptr<const T> shared_from_this() const {
317 return std::static_pointer_cast<const T>(Handle::shared_from_this());
318 }
319
320 /**
321 * Get the underlying handle data structure.
322 *
323 * @return The underlying handle data structure.
324 */
325 U* GetRaw() const noexcept {
326 return reinterpret_cast<U*>(this->GetRawHandle());
327 }
328
329 protected:
330 StreamImpl() : Stream{static_cast<uv_stream_t*>(std::malloc(sizeof(U)))} {}
331};
332
333} // namespace wpi::uv
334
335#endif // WPINET_UV_STREAM_H_
SignalBase is an implementation of the observer pattern, through the use of an emitting object and sl...
Definition Signal.h:495
Error code.
Definition Error.h:15
Handle.
Definition Handle.h:34
uv_handle_t * GetRawHandle() const noexcept
Get the underlying handle data structure.
Definition Handle.h:180
bool Invoke(F &&f, Args &&... args) const
Definition Handle.h:267
Request.
Definition Request.h:135
uv_shutdown_t * GetRaw() noexcept
Definition Request.h:150
Shutdown request.
Definition Stream.h:30
sig::Signal complete
Shutdown completed signal.
Definition Stream.h:41
Stream & GetStream() const
Definition Stream.h:34
Stream handle.
Definition Stream.h:68
std::shared_ptr< Stream > shared_from_this()
Definition Stream.h:70
void Write(std::span< const Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
sig::Signal end
Signal generated when no more read data is available.
Definition Stream.h:302
bool IsReadable() const noexcept
Check if the stream is readable.
Definition Stream.h:250
void StopRead()
Stop reading data from the stream.
Definition Stream.h:112
void Write(std::span< const Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
void Shutdown(std::function< void()> callback=nullptr)
Shutdown the outgoing (write) side of a duplex stream.
bool SetBlocking(bool enable) noexcept
Enable or disable blocking mode for a stream.
Definition Stream.h:273
Stream(uv_stream_t *uv_stream)
Definition Stream.h:305
void Write(std::initializer_list< Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
Definition Stream.h:149
int TryWrite(std::initializer_list< Buffer > bufs)
Queue a write request if it can be completed immediately.
Definition Stream.h:212
size_t GetWriteQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition Stream.h:281
int TryWrite2(std::initializer_list< Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
Definition Stream.h:242
void StartRead()
Start reading data from an incoming stream.
int TryWrite2(std::span< const Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
int TryWrite(std::span< const Buffer > bufs)
Queue a write request if it can be completed immediately.
void Write(std::initializer_list< Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
Definition Stream.h:183
bool IsWritable() const noexcept
Checks if the stream is writable.
Definition Stream.h:258
uv_stream_t * GetRawStream() const noexcept
Get the underlying stream data structure.
Definition Stream.h:290
std::shared_ptr< const Stream > shared_from_this() const
Definition Stream.h:74
void Shutdown(const std::shared_ptr< ShutdownReq > &req)
Shutdown the outgoing (write) side of a duplex stream.
sig::Signal< Buffer &, size_t > data
Signal generated when data was read on a stream.
Definition Stream.h:297
Definition Stream.h:310
std::shared_ptr< const T > shared_from_this() const
Definition Stream.h:316
std::shared_ptr< T > shared_from_this()
Definition Stream.h:312
StreamImpl()
Definition Stream.h:330
U * GetRaw() const noexcept
Get the underlying handle data structure.
Definition Stream.h:325
Write request.
Definition Stream.h:47
Stream & GetStream() const
Definition Stream.h:51
sig::Signal< Error > finish
Write completed signal.
Definition Stream.h:59
Definition PointerIntPair.h:280
Definition Loop.h:22
Definition uv.h:476
UV_REQ_FIELDS uv_stream_t * handle
Definition uv.h:454
Definition uv.h:532
uv_stream_t * handle
Definition uv.h:571
UV_EXTERN int uv_read_stop(uv_stream_t *)
UV_EXTERN int uv_stream_set_blocking(uv_stream_t *handle, int blocking)
UV_EXTERN int uv_is_writable(const uv_stream_t *handle)
UV_EXTERN int uv_is_readable(const uv_stream_t *handle)