3@brief Implementation of QUIC connection handling.
4@details Contains the QuicConnection class which manages QUIC connections,
5 including stream creation, packet sending/receiving, and statistics tracking.
11from sys
import getsizeof
13from constants
import Constants
14from frame
import FrameStream
15from packet
import Packet
16from stream
import Stream
19PACKET_SIZE = random.randint(Constants.MIN_PACKET_SIZE, Constants.MAX_PACKET_SIZE)
24 @brief Manages a QUIC connection.
26 @details Handles stream creation, packet assembly, sending/receiving data,
27 and tracking connection statistics.
30 def __init__(self, connection_id: int, local_addr: tuple, remote_addr: tuple):
32 @brief Initialize a QuicConnection instance.
34 @param connection_id The ID of the connection (0 for client, 1 for server).
35 @param local_addr The local address for the connection (IP, port).
36 @param remote_addr The remote address for the connection (IP, port).
41 self.
_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
43 self._streams: dict[int, Stream] = {}
49 self._pending_frames: list[
'FrameStream'] = []
54 def get_stream(self, initiated_by: int, direction: int) ->
'Stream':
56 @brief Retrieve or create a new stream for the connection.
58 @param initiated_by Indicates whether the stream was initiated by 'client'(0) or 'server'(1).
59 @param direction Indicates if the stream is bidirectional(0) or unidirectional(1).
60 @return The created or retrieved stream.
64 self.
_add_stream(stream_id, bool(initiated_by), bool(direction))
69 @brief Generate a unique stream ID.
71 @details Generates ID based on stream counter, initiator, and direction.
73 @param initiated_by Indicates whether the stream was initiated by 'client'(0) or 'server'(1).
74 @param direction Indicates whether the stream is bidirectional(0) or unidirectional(1).
75 @return The generated stream ID.
78 str_binary += str(direction) + str(initiated_by)
79 padded_int = int(str_binary, Constants.BASE_TWO)
82 def _add_stream(self, stream_id: int, initiated_by: bool, direction: bool):
84 @brief Add a new stream to the connection.
86 @details Initializes the stream's statistics.
88 @param stream_id The ID of the stream to add.
89 @param initiated_by Indicates if the stream was initiated by the server.
90 @param direction Indicates if the stream is unidirectional.
92 self._streams[stream_id] =
Stream(stream_id, initiated_by, direction)
98 @brief Initialize statistics for a new stream.
100 @param stream_id The ID of the stream.
102 self.
_stats_dict[stream_id] = {
'total_bytes': Constants.ZERO,
'total_time': time.time(),
'total_packets': set()}
106 @brief Retrieve a stream by its ID.
108 @details Creates the stream if necessary.
110 @param stream_id The ID of the stream to retrieve.
111 @return The retrieved or newly created stream.
114 self.
_add_stream(stream_id, Stream.is_s_init_by_sid(stream_id), Stream.is_uni_by_sid(stream_id))
115 return self._streams[stream_id]
119 @brief Remove a stream from the active streams list and the streams dictionary.
121 @param stream_id The ID of the stream to remove.
122 @return The removed stream.
125 return self._streams.pop(stream_id)
129 @brief Add a file's content to a stream.
131 @param stream_id The ID of the stream to add the file to.
132 @param path The file path.
134 with open(path,
'rb')
as file:
140 @brief Add data to a specific stream's buffer.
142 @param stream_id The ID of the stream.
143 @param data The data to add.
146 stream.add_data_to_stream(data=data)
151 @brief Mark a stream as active.
153 @details Adds stream ID to the active streams list.
155 @param stream_id The ID of the stream to mark as active.
162 @brief Check if a stream ID exists in the streams dict.
164 @param stream_id The ID of the stream to check.
165 @return True if the stream ID exists, False otherwise.
167 return stream_id
in self._streams.keys()
171 @brief Set the start time for all streams in the connection.
173 start_time = time.time()
175 stream[
'total_time'] = start_time
179 @brief Continuously create and send packets until all streams are finished.
182 start_time = time.time()
184 stream[
'total_time'] = start_time
192 @brief Send the packet size to the remote peer.
194 @return True if the packet size was sent successfully, False otherwise.
197 packet_size_bytes = self.
_packet_size.to_bytes(Constants.PACKET_SIZE_BYTES,
'big')
202 @brief Create a packet containing frames from the streams.
204 @details 1. Generate frames for each stream
205 2. Assemble SOME of them and add to packet payload
206 3. Add packet to pending packets
208 @return The created packet with frames from different streams.
213 remaining_space -= getsizeof(packet)
214 while remaining_space > Constants.ZERO:
215 if self._pending_frames:
216 frame = self._pending_frames.pop(Constants.START)
220 frame = stream.send_next_frame()
221 if stream.is_finished():
225 size_of_frame = getsizeof(frame.encode())
226 if size_of_frame <= remaining_space:
227 packet.add_frame(frame)
228 remaining_space -= size_of_frame
230 self._pending_frames.append(frame)
237 @brief Generate frames for each active stream.
240 self.
_get_stream_by_id(stream_id).generate_stream_frames(PACKET_SIZE // Constants.FRAMES_IN_PACKET)
244 @brief Retrieve a stream from the list of active streams.
246 @return The retrieved stream, or None if no active streams.
254 print(
"No more streams!")
259 @brief Send a packet to the remote address.
261 @param packet The packet to send.
262 @return True if the packet was sent successfully, False otherwise.
268 @brief Continuously receive packets until the connection is closed or a timeout occurs.
270 self.
_socket.settimeout(Constants.TIMEOUT)
275 print(f
"An error occurred in receive_packets: {e}")
280 @brief Receive a packet and process it.
282 @details If socket times out, the connection will be closed.
286 packet, addr = self.
_socket.recvfrom(Constants.PACKET_SIZE_BYTES)
296 except socket.timeout:
301 @brief Increment the counter for received packets.
307 @brief Handle the reception of the packet size from the peer.
309 @param packet_size The received packet size in bytes.
311 print(f
'Packet size received: {int.from_bytes(packet_size, "big")}')
316 @brief Handle the reception of a packet and its frames.
318 @param packet The received packet in bytes.
320 unpacked_packet = Packet.unpack(packet)
321 frames_in_packet = unpacked_packet.payload
322 for frame
in frames_in_packet:
323 stream_id = frame.stream_id
327 self.
_stats_dict[stream_id][
'total_bytes'] += len(frame.encode())
328 self.
_stats_dict[stream_id][
'total_packets'].add(unpacked_packet.packet_number)
331 except Exception
as e:
332 print(f
"An error occurred handle_received_packet: {e}")
336 @brief Write the received data of a stream to a file.
338 @param stream_id The ID of the stream whose data should be written.
339 @return True if the data was written successfully, False otherwise.
342 data = stream.get_data_received()
343 curr_time = time.time()
344 self.
_stats_dict[stream_id][
'total_time'] -= curr_time
346 with open(f
'{stream_id}.gif',
'wb')
as file:
349 except Exception
as e:
350 print(f
"An error occurred in _write_stream: {e}")
355 @brief Close the connection, socket, and print the statistics.
364 @brief Print the statistics for all active streams in the connection.
369 elapsed_time = abs(stats[
'total_time'])
371 total_bytes = stats[
'total_bytes']
372 _bytes += total_bytes
373 total_packets = len(stats[
'total_packets'])
374 print(f
'STREAM #{stream_id}:')
375 print(f
'---------------- {total_bytes} bytes total')
376 print(f
'---------------- {total_packets} packets total')
377 print(f
'---------------- at rate {float(total_bytes) / elapsed_time} bytes/second')
378 print(f
'---------------- at rate {float(total_packets) / elapsed_time} packets/second')
379 print(f
'Statistics for all active streams:')
380 print(f
'------- rate {float(_bytes) / self._total_time} bytes/second, {_bytes} bytes total')
382 f
'------- rate {float(self._received_packets_counter) / self._total_time} packets/second, {self._received_packets_counter} packets total')
383 print(f
'total time elapsed: {self._total_time} seconds')
_add_stream_to_stats_dict(self, int stream_id)
'Stream' get_stream(self, int initiated_by, int direction)
_generate_streams_frames(self)
_received_packets_counter
_add_active_stream_id(self, int stream_id)
_add_stream(self, int stream_id, bool initiated_by, bool direction)
_add_data_to_stream(self, int stream_id, bytes data)
Stream|None _get_stream_from_active_streams(self)
_send_packet(self, bytes packet)
_increment_received_packets_counter(self)
bool _write_stream(self, int stream_id)
__init__(self, int connection_id, tuple local_addr, tuple remote_addr)
'Stream' _get_stream_by_id(self, int stream_id)
int _stream_id_generator(self, int initiated_by, int direction)
_handle_received_packet_size(self, bytes packet_size)
add_file_to_stream(self, int stream_id, str path)
'Stream' _remove_stream(self, int stream_id)
_handle_received_packet(self, bytes packet)
bool _is_stream_id_in_dict(self, int stream_id)
Packet _create_packet(self)