QUIC Protocol Implementation 1.0
A Python implementation of the QUIC (Quick UDP Internet Connections) protocol.
Loading...
Searching...
No Matches
packet.py
Go to the documentation of this file.
1"""
2@file packet.py
3@brief Implementation of QUIC packet data structures.
4@details Contains classes for QUIC packet headers and packet assembly,
5 with methods for encoding and decoding packets.
6"""
7
8from dataclasses import dataclass, field
9
10from constants import Constants
11from frame import FrameStream
12
13
14@dataclass
16 """
17 @brief Represents the header of a QUIC packet.
18
19 @details Contains fields specified in QUIC protocol for packet headers.
20 """
21
22 packet_number_length: int # !< The length of the packet number field in bytes (2 bits)
23 header_form: bool = False # !< The header form bit (1 bit)
24 fixed_bit: bool = False # !< The fixed bit (1 bit)
25 spin_bit: bool = False # !< The spin bit (1 bit)
26 key_phase: bool = False # !< The key phase bit (1 bit)
27 reserved_bits: int = Constants.ZERO # !< The reserved bits (2 bits)
28
29 def pack(self) -> bytes:
30 """
31 @brief Packs the PacketHeader into bytes.
32
33 @details Shifts and combines the header attributes into a single byte.
34
35 @return The packed header as bytes.
36 """
37 first_byte = (
38 (int(self.header_formheader_form) << Constants.FORM_SHIFT) |
39 (int(self.fixed_bitfixed_bit) << Constants.FIXED_SHIFT) |
40 (int(self.spin_bitspin_bit) << Constants.SPIN_SHIFT) |
41 (self.reserved_bits << Constants.RES_SHIFT) |
42 (int(self.key_phasekey_phase) << Constants.KEY_SHIFT) |
44 )
45 return first_byte.to_bytes(Constants.HEADER_LENGTH, 'big')
46
47 @classmethod
48 def unpack(cls, header: bytes) -> 'PacketHeader':
49 """
50 @brief Unpacks bytes into a PacketHeader.
51
52 @details Extracts each field from the header byte using bitwise operations.
53
54 @param header The packed header as bytes.
55 @return The unpacked PacketHeader object.
56 """
57 header_form = int.from_bytes(header, "big") & Constants.FORM_MASK # 1 bit
58 fixed_bit = int.from_bytes(header, "big") & Constants.FIXED_MASK # 1 bit
59 spin_bit = int.from_bytes(header, "big") & Constants.SPIN_MASK # 1 bit
60 reserved_bits = int.from_bytes(header, "big") & Constants.RES_MASK # 2 bits
61 key_phase = int.from_bytes(header, "big") & Constants.KEY_MASK # 1 bit
62 packet_number_length = int.from_bytes(header, 'big') & Constants.PACKET_NUMBER_LENGTH_MASK
63 return PacketHeader(header_form=bool(header_form), fixed_bit=bool(fixed_bit), spin_bit=bool(spin_bit),
64 reserved_bits=reserved_bits, key_phase=bool(key_phase),
65 packet_number_length=packet_number_length)
66
67
68@dataclass
69class Packet:
70 """
71 @brief Represents a QUIC packet.
72
73 @details Contains a destination connection ID, packet number,
74 and payload as list of frames.
75 """
76
77 destination_connection_id: int # !< The destination connection ID (8 bytes in this implementation)
78 packet_number: int # !< The packet number (4 bytes in this implementation)
79 payload: list[FrameStream] = field(default_factory=list) # !< The payload containing a list of FrameStream objects
80
81 def pack(self) -> bytes:
82 """
83 @brief Packs the packet into bytes.
84
85 @details The process includes:
86 - Packing the header.
87 - Converting the destination connection ID to bytes.
88 - Converting the packet number to bytes based on its length.
89 - Encoding each frame in the payload and appending it to the packed packet.
90
91 @return The packed packet as bytes.
92 """
93 packet_number_length = (self.packet_number.bit_length() + Constants.SEVEN) // Constants.EIGHT
94 packed_header = PacketHeader(packet_number_length).pack()
95 dest_connection_id_bytes = self.destination_connection_id.to_bytes(Constants.DEST_CONNECTION_ID_LENGTH, 'big')
96 packet_number_bytes = self.packet_number.to_bytes(packet_number_length, 'big')
97 packed_packet: bytes = packed_header + dest_connection_id_bytes + packet_number_bytes
98 for frame in self.payload:
99 packed_packet += frame.encode()
100 return packed_packet
101
102 @classmethod
103 def unpack(cls, packet_bytes: bytes) -> 'Packet':
104 """
105 @brief Unpacks bytes into a Packet object.
106
107 @details The process includes:
108 - Unpacking the header to get the packet number length.
109 - Extracting the destination connection ID from the bytes.
110 - Extracting the packet number from the bytes.
111 - Extracting the payload frames from the remaining bytes.
112
113 @param packet_bytes The packed packet as bytes.
114 @return The unpacked Packet object.
115 """
116 packet_number_length = PacketHeader.unpack(packet_bytes[:Constants.HEADER_LENGTH]).packet_number_length
117 index = Constants.HEADER_LENGTH
118 destination_connection_id = int.from_bytes(packet_bytes[index:index + Constants.DEST_CONNECTION_ID_LENGTH],
119 'big')
120 index += Constants.DEST_CONNECTION_ID_LENGTH
121 packet_number = int.from_bytes(packet_bytes[index:index + Constants.PACKET_NUMBER_LENGTH], 'big')
122 index += packet_number_length
123 payload_frames = Packet.get_frames_from_payload_bytes(packet_bytes[index:])
124 return Packet(
125 destination_connection_id=destination_connection_id,
126 packet_number=packet_number,
127 payload=payload_frames
128 )
129
130 @staticmethod
131 def get_frames_from_payload_bytes(payload_bytes: bytes) -> list[FrameStream]:
132 """
133 @brief Extracts frames from the payload bytes.
134
135 @details The process includes:
136 - Iterating through the payload bytes.
137 - Determining the end of attributes for each frame.
138 - Determining the length of the frame data.
139 - Decoding each frame and appending it to the list of frames.
140
141 @param payload_bytes The payload bytes.
142 @return The list of decoded FrameStream objects.
143 """
144 index = Constants.START
145 frames: list[FrameStream] = []
146 while index < len(payload_bytes):
147 end_of_attrs = FrameStream.end_of_attrs(payload_bytes[index:index + Constants.FRAME_TYPE_FIELD_LENGTH])
148 length_of_frame_data = FrameStream.length_from_attrs(payload_bytes[index:index + end_of_attrs],
149 end_of_attrs)
150 frames.append(FrameStream.decode(payload_bytes[index:index + end_of_attrs + length_of_frame_data]))
151 index += end_of_attrs + length_of_frame_data
152 return frames
153
154 def add_frame(self, frame: 'FrameStream'):
155 """
156 @brief Adds a frame to the packet's payload.
157
158 @param frame The frame to be added.
159 """
160 self.payload.append(frame)
'PacketHeader' unpack(cls, bytes header)
Definition packet.py:48
int packet_number_length
Definition packet.py:22
bytes pack(self)
Definition packet.py:29
list payload
Definition packet.py:79
int destination_connection_id
Definition packet.py:77
add_frame(self, 'FrameStream' frame)
Definition packet.py:154
int packet_number
Definition packet.py:78
'Packet' unpack(cls, bytes packet_bytes)
Definition packet.py:103
list[FrameStream] get_frames_from_payload_bytes(bytes payload_bytes)
Definition packet.py:131
bytes pack(self)
Definition packet.py:81