QUIC Protocol Implementation 1.0
A Python implementation of the QUIC (Quick UDP Internet Connections) protocol.
Loading...
Searching...
No Matches
frame.py
Go to the documentation of this file.
1"""
2@file frame.py
3@brief Implementation of QUIC stream frame handling.
4@details Contains abstract and concrete classes for QUIC stream frames,
5 with methods for encoding and decoding them.
6"""
7
8from abc import ABC, abstractmethod
9from dataclasses import dataclass
10
11from constants import Constants
12
13
14@dataclass
15class StreamFrameABC(ABC):
16 """
17 @brief Abstract base class for stream frames.
18
19 @details Defines the interface for stream frame classes.
20 """
21
22 stream_id: int # !< Unique identifier for the stream
23
24 @abstractmethod
25 def encode(self) -> bytes:
26 """
27 @brief Encode the frame into bytes.
28
29 @return The encoded frame as bytes.
30 """
31 pass
32
33 @classmethod
34 @abstractmethod
35 def decode(cls, frame: bytes):
36 """
37 @brief Decode bytes into a frame object.
38
39 @param frame The encoded frame as bytes.
40 @return A new frame instance with the decoded values.
41 """
42 pass
43
44
45@dataclass
47 """
48 @brief Concrete implementation of a QUIC stream frame.
49
50 @details Contains data for a single frame within a stream,
51 with methods to encode and decode the frame.
52 """
53
54 offset: int # !< The offset of this frame in the stream
55 length: int # !< The length of the data in this frame
56 fin: bool # !< Flag indicating if this is the final frame
57 data: bytes # !< The payload data of this frame
58
59 def encode(self) -> bytes:
60 """
61 @brief Encodes the frame into bytes.
62
63 @details The encoding process includes:
64 - Converting the stream ID to bytes.
65 - Setting the type field based on the presence of offset, length, and fin attributes.
66 - Appending the offset, length, and data to the values list if they are present.
67 - Combining all parts into a single bytes object.
68
69 @return The encoded frame as bytes.
70 """
71 values = [self.stream_id.to_bytes(Constants.STREAM_ID_LENGTH, 'big')]
72 type_field = Constants.MIN_TYPE_FIELD
73 if self.offset != 0:
74 type_field = type_field | Constants.OFF_BIT
75 values.append(self.offset.to_bytes(Constants.OFFSET_LENGTH, 'big'))
76 if self.length != 0:
77 type_field = type_field | Constants.LEN_BIT
78 values.append(self.length.to_bytes(Constants.LEN_LENGTH, 'big'))
79 if self.fin:
80 type_field = type_field | Constants.FIN_BIT
81 values.append(self.datadata)
82 encoded_frame = type_field.to_bytes(Constants.FRAME_TYPE_FIELD_LENGTH, 'big') # type is byte[0]
83 for v in values:
84 encoded_frame += v
85 return encoded_frame
86
87 @classmethod
88 def decode(cls, frame: bytes):
89 """
90 @brief Decodes a frame encoded in bytes into a FrameStream instance.
91
92 @details Delegates to _decode for the actual decoding.
93
94 @param frame The encoded frame as bytes.
95 @return A new FrameStream instance with the decoded values.
96 """
97 return FrameStream._decode(frame)
98
99 @classmethod
100 def _decode(cls, frame: bytes):
101 """
102 @brief Decodes a frame encoded in bytes into a FrameStream instance.
103
104 @details The decoding process includes:
105 - Extracting the offset, length, fin flag, stream ID, and stream data from the frame.
106 - Creating a new FrameStream instance with the extracted values.
107
108 @param frame The encoded frame as bytes.
109 @return A new FrameStream instance with the decoded values.
110 """
111 offset = Constants.ZERO
112 length = Constants.ZERO
113 fin = False
114 type_field = int.from_bytes(frame[:Constants.FRAME_TYPE_FIELD_LENGTH], 'big')
115 index = Constants.FRAME_TYPE_FIELD_LENGTH
116 stream_id = int.from_bytes(frame[index:index + Constants.STREAM_ID_LENGTH], 'big')
117 index += Constants.STREAM_ID_LENGTH
118 if type_field & Constants.OFF_BIT:
119 offset = int.from_bytes(frame[index:index + Constants.OFFSET_LENGTH], 'big')
120 index += Constants.OFFSET_LENGTH
121
122 # Check if the length is present
123 if type_field & Constants.LEN_BIT:
124 length = int.from_bytes(frame[index:index + Constants.LEN_LENGTH], 'big')
125 index += Constants.LEN_LENGTH
126
127 # Check if the FIN bit is set
128 if type_field & Constants.FIN_BIT:
129 fin = True
130
131 return FrameStream(stream_id=stream_id, offset=offset, length=length, fin=fin, data=frame[index:])
132
133 @staticmethod
134 def end_of_attrs(frame: bytes) -> int:
135 """
136 @brief Determines the end position of the attributes in the frame.
137
138 @details The process includes:
139 - Calculating the initial end position based on the frame type field length and stream ID length.
140 - Checking if the offset bit is set in the type field and adjusting the end position accordingly.
141 - Checking if the length bit is set in the type field and adjusting the end position accordingly.
142
143 @param frame The encoded frame as bytes.
144 @return The end position of the attributes in the frame.
145 """
146 end_of_data = Constants.FRAME_TYPE_FIELD_LENGTH + Constants.STREAM_ID_LENGTH
147 type_field = int.from_bytes(frame[:Constants.FRAME_TYPE_FIELD_LENGTH], 'big')
148 if type_field & Constants.OFF_BIT:
149 end_of_data += Constants.OFFSET_LENGTH
150 if type_field & Constants.LEN_BIT:
151 end_of_data += Constants.LEN_LENGTH
152 return end_of_data
153
154 @staticmethod
155 def length_from_attrs(frame: bytes, end_of_attrs: int):
156 """
157 @brief Determines the length of the data in the frame.
158
159 @details The process includes:
160 - Checking if the end of attributes is less than or equal to the sum of the frame type
161 field length and stream ID length.
162 - If the end of attributes is less than or equal to the sum of the frame type field
163 length and stream ID length plus the offset length.
164 - Otherwise, the length is extracted from the frame after the offset length.
165
166 @param frame The encoded frame as bytes.
167 @param end_of_attrs The end position of the attributes in the frame.
168 @return The length of the data in the frame.
169 """
170 if end_of_attrs <= Constants.FRAME_TYPE_FIELD_LENGTH + Constants.STREAM_ID_LENGTH:
171 return Constants.ZERO
172 index = Constants.FRAME_TYPE_FIELD_LENGTH + Constants.STREAM_ID_LENGTH
173 if end_of_attrs <= index + Constants.OFFSET_LENGTH: # offset is not present, len "took" its room
174 return int.from_bytes(frame[index:index + Constants.LEN_LENGTH], "big")
175 index += Constants.OFFSET_LENGTH
176 return int.from_bytes(frame[index:index + Constants.LEN_LENGTH], "big")
length_from_attrs(bytes frame, int end_of_attrs)
Definition frame.py:155
_decode(cls, bytes frame)
Definition frame.py:100
decode(cls, bytes frame)
Definition frame.py:88
int end_of_attrs(bytes frame)
Definition frame.py:134
bytes encode(self)
Definition frame.py:59
decode(cls, bytes frame)
Definition frame.py:35
bytes encode(self)
Definition frame.py:25