QUIC Protocol Implementation 1.0
A Python implementation of the QUIC (Quick UDP Internet Connections) protocol.
Loading...
Searching...
No Matches
quic.py
Go to the documentation of this file.
1"""
2@file quic.py
3@brief Implementation of QUIC connection handling.
4@details Contains the QuicConnection class which manages QUIC connections,
5 including stream creation, packet sending/receiving, and statistics tracking.
6"""
7
8import socket
9import time
10import random
11from sys import getsizeof
12
13from constants import Constants
14from frame import FrameStream
15from packet import Packet
16from stream import Stream
17
18# Define the packet size randomly within the given range
19PACKET_SIZE = random.randint(Constants.MIN_PACKET_SIZE, Constants.MAX_PACKET_SIZE)
20
21
23 """
24 @brief Manages a QUIC connection.
25
26 @details Handles stream creation, packet assembly, sending/receiving data,
27 and tracking connection statistics.
28 """
29
30 def __init__(self, connection_id: int, local_addr: tuple, remote_addr: tuple):
31 """
32 @brief Initialize a QuicConnection instance.
33
34 @param connection_id The ID of the connection (0 for client, 1 for server).
35 @param local_addr The local address for the connection (IP, port).
36 @param remote_addr The remote address for the connection (IP, port).
37 """
38 self._connection_id = connection_id
39 self._local_addr = local_addr
40 self._remote_addr = remote_addr
41 self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
42 self._socket.bind(local_addr)
43 self._streams: dict[int, Stream] = {}
44 self._active_streams_ids: list[int] = []
45 self._stats_dict = {} # stream_id: {total_bytes: , total_packets: , total_time:}
46 self._streams_counter = Constants.ZERO
47 self._sent_packets_counter = Constants.ZERO
48 self._received_packets_counter = Constants.ZERO
49 self._pending_frames: list['FrameStream'] = []
50 self._total_time: float = Constants.ZERO
51 self._packet_size: int = Constants.ZERO # 2 bytes
52 self._idle = True
53
54 def get_stream(self, initiated_by: int, direction: int) -> 'Stream':
55 """
56 @brief Retrieve or create a new stream for the connection.
57
58 @param initiated_by Indicates whether the stream was initiated by 'client'(0) or 'server'(1).
59 @param direction Indicates if the stream is bidirectional(0) or unidirectional(1).
60 @return The created or retrieved stream.
61 """
62 stream_id = self._stream_id_generator(initiated_by, direction)
63 if not self._is_stream_id_in_dict(stream_id):
64 self._add_stream(stream_id, bool(initiated_by), bool(direction))
65 return self._get_stream_by_id(stream_id)
66
67 def _stream_id_generator(self, initiated_by: int, direction: int) -> int:
68 """
69 @brief Generate a unique stream ID.
70
71 @details Generates ID based on stream counter, initiator, and direction.
72
73 @param initiated_by Indicates whether the stream was initiated by 'client'(0) or 'server'(1).
74 @param direction Indicates whether the stream is bidirectional(0) or unidirectional(1).
75 @return The generated stream ID.
76 """
77 str_binary = bin(self._streams_counter)[Constants.TWO:] # convert to binary string without prefix (index=2)
78 str_binary += str(direction) + str(initiated_by)
79 padded_int = int(str_binary, Constants.BASE_TWO)
80 return padded_int
81
82 def _add_stream(self, stream_id: int, initiated_by: bool, direction: bool):
83 """
84 @brief Add a new stream to the connection.
85
86 @details Initializes the stream's statistics.
87
88 @param stream_id The ID of the stream to add.
89 @param initiated_by Indicates if the stream was initiated by the server.
90 @param direction Indicates if the stream is unidirectional.
91 """
92 self._streams[stream_id] = Stream(stream_id, initiated_by, direction)
93 self._add_stream_to_stats_dict(stream_id)
94 self._streams_counter += Constants.ONE
95
96 def _add_stream_to_stats_dict(self, stream_id: int):
97 """
98 @brief Initialize statistics for a new stream.
99
100 @param stream_id The ID of the stream.
101 """
102 self._stats_dict[stream_id] = {'total_bytes': Constants.ZERO, 'total_time': time.time(), 'total_packets': set()}
103
104 def _get_stream_by_id(self, stream_id: int) -> 'Stream':
105 """
106 @brief Retrieve a stream by its ID.
107
108 @details Creates the stream if necessary.
109
110 @param stream_id The ID of the stream to retrieve.
111 @return The retrieved or newly created stream.
112 """
113 if not self._is_stream_id_in_dict(stream_id):
114 self._add_stream(stream_id, Stream.is_s_init_by_sid(stream_id), Stream.is_uni_by_sid(stream_id))
115 return self._streams[stream_id]
116
117 def _remove_stream(self, stream_id: int) -> 'Stream':
118 """
119 @brief Remove a stream from the active streams list and the streams dictionary.
120
121 @param stream_id The ID of the stream to remove.
122 @return The removed stream.
123 """
124 self._active_streams_ids.remove(stream_id)
125 return self._streams.pop(stream_id)
126
127 def add_file_to_stream(self, stream_id: int, path: str):
128 """
129 @brief Add a file's content to a stream.
130
131 @param stream_id The ID of the stream to add the file to.
132 @param path The file path.
133 """
134 with open(path, 'rb') as file:
135 data = file.read()
136 self._add_data_to_stream(stream_id, data)
137
138 def _add_data_to_stream(self, stream_id: int, data: bytes):
139 """
140 @brief Add data to a specific stream's buffer.
141
142 @param stream_id The ID of the stream.
143 @param data The data to add.
144 """
145 stream = self._get_stream_by_id(stream_id)
146 stream.add_data_to_stream(data=data)
147 self._add_active_stream_id(stream_id)
148
149 def _add_active_stream_id(self, stream_id: int):
150 """
151 @brief Mark a stream as active.
152
153 @details Adds stream ID to the active streams list.
154
155 @param stream_id The ID of the stream to mark as active.
156 """
157 if stream_id not in self._active_streams_ids:
158 self._active_streams_ids.append(stream_id)
159
160 def _is_stream_id_in_dict(self, stream_id: int) -> bool:
161 """
162 @brief Check if a stream ID exists in the streams dict.
163
164 @param stream_id The ID of the stream to check.
165 @return True if the stream ID exists, False otherwise.
166 """
167 return stream_id in self._streams.keys()
168
170 """
171 @brief Set the start time for all streams in the connection.
172 """
173 start_time = time.time()
174 for stream in self._stats_dict.values():
175 stream['total_time'] = start_time
176
177 def send_packets(self):
178 """
179 @brief Continuously create and send packets until all streams are finished.
180 """
181 self._send_packet_size()
182 start_time = time.time()
183 for stream in self._stats_dict.values():
184 stream['total_time'] = start_time
185 while self._active_streams_ids:
186 packet = self._create_packet()
187 self._send_packet(packet.pack())
188 self._close_connection()
189
191 """
192 @brief Send the packet size to the remote peer.
193
194 @return True if the packet size was sent successfully, False otherwise.
195 """
196 self._packet_size = PACKET_SIZE
197 packet_size_bytes = self._packet_size.to_bytes(Constants.PACKET_SIZE_BYTES, 'big')
198 return self._send_packet(packet_size_bytes)
199
200 def _create_packet(self) -> Packet:
201 """
202 @brief Create a packet containing frames from the streams.
203
204 @details 1. Generate frames for each stream
205 2. Assemble SOME of them and add to packet payload
206 3. Add packet to pending packets
207
208 @return The created packet with frames from different streams.
209 """
211 remaining_space = self._packet_size
212 packet = Packet(int(not self._connection_id), self._sent_packets_counter)
213 remaining_space -= getsizeof(packet)
214 while remaining_space > Constants.ZERO:
215 if self._pending_frames:
216 frame = self._pending_frames.pop(Constants.START)
217 else:
218 stream = self._get_stream_from_active_streams()
219 if stream:
220 frame = stream.send_next_frame()
221 if stream.is_finished():
222 self._remove_stream(stream.get_stream_id())
223 else:
224 break
225 size_of_frame = getsizeof(frame.encode())
226 if size_of_frame <= remaining_space:
227 packet.add_frame(frame)
228 remaining_space -= size_of_frame
229 else:
230 self._pending_frames.append(frame)
231 break
232 self._sent_packets_counter += Constants.ONE
233 return packet
234
236 """
237 @brief Generate frames for each active stream.
238 """
239 for stream_id in self._active_streams_ids:
240 self._get_stream_by_id(stream_id).generate_stream_frames(PACKET_SIZE // Constants.FRAMES_IN_PACKET)
241
242 def _get_stream_from_active_streams(self) -> Stream | None:
243 """
244 @brief Retrieve a stream from the list of active streams.
245
246 @return The retrieved stream, or None if no active streams.
247 """
248 if not self._active_streams_ids:
249 self._idle = False
250 return None
251 try:
252 return self._streams[random.choice(self._active_streams_ids)] # return a random stream
253 except IndexError:
254 print("No more streams!")
255 return None
256
257 def _send_packet(self, packet: bytes):
258 """
259 @brief Send a packet to the remote address.
260
261 @param packet The packet to send.
262 @return True if the packet was sent successfully, False otherwise.
263 """
264 return self._socket.sendto(packet, self._remote_addr) > 0
265
267 """
268 @brief Continuously receive packets until the connection is closed or a timeout occurs.
269 """
270 self._socket.settimeout(Constants.TIMEOUT)
271 while self._idle:
272 try:
273 self._receive_packet()
274 except OSError as e:
275 print(f"An error occurred in receive_packets: {e}")
276 break
277
279 """
280 @brief Receive a packet and process it.
281
282 @details If socket times out, the connection will be closed.
283 """
284 try:
285 if self._packet_size == Constants.ZERO:
286 packet, addr = self._socket.recvfrom(Constants.PACKET_SIZE_BYTES)
287 self._total_time = time.time()
289 else:
290 packet, addr = self._socket.recvfrom(self._packet_size)
291 self._handle_received_packet(packet)
292 if not self._active_streams_ids:
294 self._close_connection()
296 except socket.timeout:
297 self._close_connection()
298
300 """
301 @brief Increment the counter for received packets.
302 """
303 self._received_packets_counter += Constants.ONE
304
305 def _handle_received_packet_size(self, packet_size: bytes):
306 """
307 @brief Handle the reception of the packet size from the peer.
308
309 @param packet_size The received packet size in bytes.
310 """
311 print(f'Packet size received: {int.from_bytes(packet_size, "big")}')
312 self._packet_size = int.from_bytes(packet_size, 'big')
313
314 def _handle_received_packet(self, packet: bytes):
315 """
316 @brief Handle the reception of a packet and its frames.
317
318 @param packet The received packet in bytes.
319 """
320 unpacked_packet = Packet.unpack(packet)
321 frames_in_packet = unpacked_packet.payload
322 for frame in frames_in_packet:
323 stream_id = frame.stream_id
324 self._add_active_stream_id(stream_id)
325 try:
326 self._get_stream_by_id(stream_id).receive_frame(frame)
327 self._stats_dict[stream_id]['total_bytes'] += len(frame.encode())
328 self._stats_dict[stream_id]['total_packets'].add(unpacked_packet.packet_number)
329 if self._get_stream_by_id(stream_id).is_finished():
330 self._write_stream(stream_id)
331 except Exception as e:
332 print(f"An error occurred handle_received_packet: {e}")
333
334 def _write_stream(self, stream_id: int) -> bool:
335 """
336 @brief Write the received data of a stream to a file.
337
338 @param stream_id The ID of the stream whose data should be written.
339 @return True if the data was written successfully, False otherwise.
340 """
341 stream = self._remove_stream(stream_id)
342 data = stream.get_data_received()
343 curr_time = time.time()
344 self._stats_dict[stream_id]['total_time'] -= curr_time
345 try:
346 with open(f'{stream_id}.gif', 'wb') as file:
347 file.write(data)
348 return True
349 except Exception as e:
350 print(f"An error occurred in _write_stream: {e}")
351 return False
352
354 """
355 @brief Close the connection, socket, and print the statistics.
356 """
357 self._total_time -= time.time()
358 self._idle = False
359 self._socket.close()
360 self._print_stats()
361
362 def _print_stats(self):
363 """
364 @brief Print the statistics for all active streams in the connection.
365 """
366 self._total_time = abs(self._total_time)
367 _bytes = 0
368 for stream_id, stats in self._stats_dict.items():
369 elapsed_time = abs(stats['total_time'])
370 if elapsed_time > 0:
371 total_bytes = stats['total_bytes']
372 _bytes += total_bytes
373 total_packets = len(stats['total_packets'])
374 print(f'STREAM #{stream_id}:')
375 print(f'---------------- {total_bytes} bytes total')
376 print(f'---------------- {total_packets} packets total')
377 print(f'---------------- at rate {float(total_bytes) / elapsed_time} bytes/second')
378 print(f'---------------- at rate {float(total_packets) / elapsed_time} packets/second')
379 print(f'Statistics for all active streams:')
380 print(f'------- rate {float(_bytes) / self._total_time} bytes/second, {_bytes} bytes total')
381 print(
382 f'------- rate {float(self._received_packets_counter) / self._total_time} packets/second, {self._received_packets_counter} packets total')
383 print(f'total time elapsed: {self._total_time} seconds')
send_packets(self)
Definition quic.py:177
_add_stream_to_stats_dict(self, int stream_id)
Definition quic.py:96
_send_packet_size(self)
Definition quic.py:190
'Stream' get_stream(self, int initiated_by, int direction)
Definition quic.py:54
_generate_streams_frames(self)
Definition quic.py:235
_set_start_time(self)
Definition quic.py:169
_add_active_stream_id(self, int stream_id)
Definition quic.py:149
_receive_packet(self)
Definition quic.py:278
_add_stream(self, int stream_id, bool initiated_by, bool direction)
Definition quic.py:82
_add_data_to_stream(self, int stream_id, bytes data)
Definition quic.py:138
Stream|None _get_stream_from_active_streams(self)
Definition quic.py:242
_send_packet(self, bytes packet)
Definition quic.py:257
_increment_received_packets_counter(self)
Definition quic.py:299
bool _write_stream(self, int stream_id)
Definition quic.py:334
__init__(self, int connection_id, tuple local_addr, tuple remote_addr)
Definition quic.py:30
_print_stats(self)
Definition quic.py:362
'Stream' _get_stream_by_id(self, int stream_id)
Definition quic.py:104
int _stream_id_generator(self, int initiated_by, int direction)
Definition quic.py:67
_handle_received_packet_size(self, bytes packet_size)
Definition quic.py:305
add_file_to_stream(self, int stream_id, str path)
Definition quic.py:127
'Stream' _remove_stream(self, int stream_id)
Definition quic.py:117
_handle_received_packet(self, bytes packet)
Definition quic.py:314
bool _is_stream_id_in_dict(self, int stream_id)
Definition quic.py:160
receive_packets(self)
Definition quic.py:266
Packet _create_packet(self)
Definition quic.py:200
_close_connection(self)
Definition quic.py:353