QUIC Protocol Implementation 1.0
A Python implementation of the QUIC (Quick UDP Internet Connections) protocol.
Loading...
Searching...
No Matches
stream.py
Go to the documentation of this file.
1"""
2@file stream.py
3@brief Implementation of QUIC stream handling classes.
4@details Contains Stream, StreamEndpointABC, StreamSender, and StreamReceiver classes
5 for managing data flow in QUIC connections.
6"""
7
8from abc import ABC, abstractmethod
9from frame import FrameStream
10from constants import Constants
11
12
13class Stream:
14 """
15 @brief Represents a QUIC stream that handles data transfer.
16
17 @details A Stream can be unidirectional or bidirectional, and handles
18 both sending and receiving data through dedicated endpoints.
19 """
20
21 def __init__(self, stream_id: int, is_uni: bool, is_s_initiated: bool):
22 """
23 @brief Initialize a Stream instance.
24
25 @param stream_id Unique identifier for the stream, generated already.
26 @param is_uni Specifies if the stream is unidirectional.
27 @param is_s_initiated Specifies if the stream was initiated by the server (receiver).
28 """
29 self._stream_id = stream_id
30 self._is_uni = is_uni
31 self._is_s_initiated = is_s_initiated
32 self._sender = StreamSender(stream_id, (
33 is_uni and not is_s_initiated) or not is_uni) # if uni and not s-initiated it's a send only stream, or it's a bidi stream
34 self._receiver = StreamReceiver(stream_id, (
35 is_uni and is_s_initiated) or not is_uni) # if uni and s-initiated it's a receive only stream, or it's a bidi stream
36
37 def has_data(self) -> bool:
38 """
39 @brief Check if there is any data to send or receive.
40
41 @return True if there is data in the sender or receiver buffer, False otherwise.
42 """
43 return self._sender.has_data() or self._receiver.has_data()
44
45 def get_stream_id(self) -> int:
46 """
47 @brief Getter for stream ID.
48
49 @return The stream ID.
50 """
51 return self._stream_id
52
53 def add_data_to_stream(self, data: bytes):
54 """
55 @brief Add data to the stream by delegation to StreamSender.
56
57 @param data Data to be added to the send buffer.
58 """
59 self._sender.add_data_to_buffer(data)
60
61 def generate_stream_frames(self, max_size: int):
62 """
63 @brief Stream frames generation by delegation to StreamSender.
64
65 @details Generate stream frames for sending based on the maximum frame size.
66
67 @param max_size The size of the payload_size which is determined by size of
68 payload-packet/num of streams on that packet.
69 """
70 self._sender.generate_stream_frames(max_size)
71
72 def send_next_frame(self) -> FrameStream:
73 """
74 @brief Retrieve the next frame to be sent from the sender's list.
75
76 @return The next frame to be sent.
77 """
78 return self._sender.send_next_frame()
79
80 def receive_frame(self, frame: FrameStream):
81 """
82 @brief Process a received frame by delegating it to the receiver.
83
84 @param frame The received frame.
85 """
86 self._receiver.stream_frame_recvd(frame)
87
88 def get_data_received(self) -> bytes:
89 """
90 @brief Retrieve the data received on the stream.
91
92 @return The data received on the stream.
93 """
94 return self._receiver.get_data_from_buffer()
95
96 def is_finished(self) -> bool:
97 """
98 @brief Check if the stream has finished sending and receiving data.
99
100 @return True if the stream is in a terminal state, False otherwise.
101 """
102 if not self._is_uni:
103 return self._receiver.is_terminal_state() or self._sender.is_terminal_state()
104 if self._is_s_initiated:
105 return self._receiver.is_terminal_state()
106 else:
107 return self._sender.is_terminal_state()
108
109 @staticmethod
110 def is_uni_by_sid(stream_id: int) -> bool:
111 """
112 @brief Determine if a stream is unidirectional based on stream ID.
113
114 @param stream_id The stream ID.
115 @return True if the stream is unidirectional, False otherwise.
116 """
117 return bool(stream_id & Constants.DIRECTION_MASK)
118
119 @staticmethod
120 def is_s_init_by_sid(stream_id: int) -> bool:
121 """
122 @brief Determine if a stream was initiated by a server based on its stream ID.
123
124 @param stream_id The stream ID.
125 @return True if the stream was initiated by a server, False otherwise.
126 """
127 return bool(stream_id & Constants.INIT_BY_MASK)
128
129
131 """
132 @brief Abstract base class for stream endpoints.
133
134 @details Provides common functionality for sending and receiving endpoints
135 of a QUIC stream.
136 """
137
138 def __init__(self, stream_id: int, is_usable: bool):
139 """
140 @brief Abstract Constructor for StreamEndpointABC abstract class.
141
142 @param stream_id The stream ID of this endpoint.
143 @param is_usable Specifies if the stream endpoint is 'usable'.
144 """
145 self._stream_id: int = stream_id
146 self._curr_offset: int = Constants.ZERO
147 self._buffer: bytes = b""
148 self._state: int = Constants.START # READY = RECV so it's applicable for both endpoints
149 self._is_usable: bool = is_usable
150
151 def _set_state(self, state: int) -> bool:
152 """
153 @brief Set the state of the endpoint.
154
155 @param state New state.
156 @return True if _state was set successfully, False otherwise.
157 """
158 try:
159 self._state = state
160 return True
161 except Exception as e:
162 print(f'ERROR: Cannot set state to {state}. {e}')
163 return False
164
165 @abstractmethod
166 def _add_data_to_buffer(self, data: bytes):
167 """
168 @brief Add data to the buffer.
169
170 @param data The data to add to the buffer.
171 """
172 pass
173
174 def has_data(self) -> bool:
175 """
176 @brief Check if the buffer contains data.
177
178 @return True if the buffer has data, False otherwise.
179 """
180 return bool(self._buffer)
181
182 def is_terminal_state(self) -> bool:
183 """
184 @brief Check if the endpoint has reached a terminal state.
185
186 @return True if the state is DATA_RECVD, False otherwise.
187 """
188 return self._state == Constants.DATA_RECVD or not self._is_usable
189
190
192 """
193 @brief Represents the sending endpoint of a QUIC stream.
194
195 @details Handles buffering of data to send, generation of stream frames,
196 and sending frames over the network.
197 """
198
199 def __init__(self, stream_id: int, is_usable: bool):
200 """
201 @brief Initialize a StreamSender instance.
202
203 @param stream_id The stream ID associated with this sender.
204 @param is_usable Whether this sender can be used for sending data.
205 """
206 super().__init__(stream_id, is_usable)
208
209 def add_data_to_buffer(self, data: bytes):
210 """
211 @brief Add data to the sender's buffer.
212
213 @param data The data to add.
214 """
216
217 def _add_data_to_buffer(self, data: bytes):
218 """
219 @brief Internal method to add data to the buffer.
220
221 @details Only adds data if the stream is in READY state.
222
223 @param data The data to add.
224 @throws ValueError If the stream is not in READY state.
225 """
226 if self._state_state == Constants.READY:
227 self._buffer_buffer += data
228 else:
229 raise ValueError("ERROR: cannot write. stream is not READY.")
230
231 def generate_stream_frames(self, max_size: int):
232 """
233 @brief Generate frames for the data in the buffer.
234
235 @details Splits data into chunks if necessary.
236
237 @param max_size The maximum size of each frame.
238 """
239 total_stream_frames = self._get_total_stream_frames_amount(max_size)
240 if total_stream_frames > Constants.ONE:
241 for i in range(total_stream_frames):
242 self._stream_frames.append(
243 FrameStream(stream_id=self._stream_id, offset=self._curr_offset, length=max_size, fin=False,
244 data=self._buffer_buffer[self._curr_offset:self._curr_offset + max_size]))
245 self._curr_offset += max_size
246 self._stream_frames.append(self.generate_fin_frame())
247
248 def _get_total_stream_frames_amount(self, max_size: int) -> int:
249 """
250 @brief Calculate the number of frames required for the data in the buffer.
251
252 @param max_size The maximum size of each frame.
253 @return The total number of frames.
254 """
255 if len(self._buffer_buffer) < max_size:
256 return Constants.ONE
257 else:
258 return len(self._buffer_buffer) // max_size
259
260 def generate_fin_frame(self) -> FrameStream:
261 """
262 @brief Generate a frame with the FIN bit set.
263
264 @details This indicates the end of the stream.
265
266 @return The final frame for the stream.
267 """
268 self._set_state(Constants.DATA_SENT)
269 return FrameStream(stream_id=self._stream_id, offset=self._curr_offset,
270 length=len(self._buffer_buffer[self._curr_offset:]),
271 fin=True,
272 data=self._buffer_buffer[
273 self._curr_offset:])
274
275 def send_next_frame(self) -> FrameStream:
276 """
277 @brief Send the next frame in the queue.
278
279 @return The next frame to be sent.
280 """
281 if self._stream_frames:
282 frame = self._stream_frames.pop(Constants.ZERO)
283 self._set_state(Constants.SEND)
284 if frame.fin:
285 self._set_state(Constants.DATA_RECVD)
286 return frame
287
288
290 """
291 @brief Represents the receiving endpoint of a QUIC stream.
292
293 @details Handles reception of stream frames, ordering them by offset,
294 and assembling the complete stream data.
295 """
296
297 def __init__(self, stream_id: int, is_usable: bool):
298 """
299 @brief Initialize a StreamReceiver instance.
300
301 @param stream_id The stream ID associated with this receiver.
302 @param is_usable Whether this receiver can be used for receiving data.
303 """
304 super().__init__(stream_id, is_usable)
305 self._recv_frame_dict: dict[int:bytes] = {} # such that K = offset, V = data
306
307 def stream_frame_recvd(self, frame: FrameStream):
308 """
309 @brief Process a received frame and add it to the receiver's buffer.
310
311 @param frame The received frame.
312 """
313 if frame.fin:
314 self._fin_recvd()
315 self._add_frame_to_recv_dict(frame)
316
317 def _add_frame_to_recv_dict(self, frame: FrameStream):
318 """
319 @brief Add a received frame to the receiver's dictionary.
320
321 @details Updates the current offset.
322
323 @param frame The received frame.
324 """
325 self._recv_frame_dict[frame.offset] = frame.data
326 self._curr_offset += len(frame.data)
327 if self._state_state == Constants.SIZE_KNOWN:
329
330 def _fin_recvd(self):
331 """
332 @brief Handle the reception of a FIN frame.
333
334 @details Indicates that all data has been received.
335 """
336 self._set_state(Constants.SIZE_KNOWN)
337
339 """
340 @brief Convert the received frames in the dictionary to a single buffer.
341
342 @details Sorts frames by offset.
343 """
344 self._recv_frame_dict = dict(sorted(self._recv_frame_dict.items())) # Sort frames by their offset.
345 for data in self._recv_frame_dict.values():
347 self._set_state(Constants.DATA_RECVD)
348
349 def _add_data_to_buffer(self, data: bytes):
350 """
351 @brief Add data to the buffer if the size is known.
352
353 @param data The data to add.
354 @throws ValueError If the stream size is not known.
355 """
356 if self._state_state == Constants.SIZE_KNOWN:
357 self._buffer += data
358 else:
359 raise ValueError("ERROR: cannot write. stream is not READY.")
360
361 def get_data_from_buffer(self) -> bytes:
362 """
363 @brief Retrieve the data from the buffer.
364
365 @return The data in the buffer.
366 @throws ValueError If the stream is closed.
367 """
368 if self._state_state == Constants.DATA_RECVD:
369 try:
370 return self._buffer
371 finally:
372 self._set_state(Constants.DATA_READ)
373 else:
374 raise ValueError("ERROR: cannot read. stream is closed.")
bool is_terminal_state(self)
Definition stream.py:182
bool _set_state(self, int state)
Definition stream.py:151
__init__(self, int stream_id, bool is_usable)
Definition stream.py:138
_add_data_to_buffer(self, bytes data)
Definition stream.py:166
bytes get_data_from_buffer(self)
Definition stream.py:361
stream_frame_recvd(self, FrameStream frame)
Definition stream.py:307
_add_frame_to_recv_dict(self, FrameStream frame)
Definition stream.py:317
_add_data_to_buffer(self, bytes data)
Definition stream.py:349
__init__(self, int stream_id, bool is_usable)
Definition stream.py:297
_convert_dict_to_buffer(self)
Definition stream.py:338
FrameStream generate_fin_frame(self)
Definition stream.py:260
_add_data_to_buffer(self, bytes data)
Definition stream.py:217
add_data_to_buffer(self, bytes data)
Definition stream.py:209
__init__(self, int stream_id, bool is_usable)
Definition stream.py:199
generate_stream_frames(self, int max_size)
Definition stream.py:231
FrameStream send_next_frame(self)
Definition stream.py:275
int _get_total_stream_frames_amount(self, int max_size)
Definition stream.py:248
generate_stream_frames(self, int max_size)
Definition stream.py:61
__init__(self, int stream_id, bool is_uni, bool is_s_initiated)
Definition stream.py:21
bytes get_data_received(self)
Definition stream.py:88
bool is_s_init_by_sid(int stream_id)
Definition stream.py:120
add_data_to_stream(self, bytes data)
Definition stream.py:53
bool is_uni_by_sid(int stream_id)
Definition stream.py:110
FrameStream send_next_frame(self)
Definition stream.py:72
int get_stream_id(self)
Definition stream.py:45
receive_frame(self, FrameStream frame)
Definition stream.py:80
bool has_data(self)
Definition stream.py:37
bool is_finished(self)
Definition stream.py:96