Часть 2: Отправка и получение сообщений
В этой серии статей мы рассматриваем процесс создания масштабируемого сервера для чата в реальном времени, во всех деталях. Цель статьи — показать пример практического применения языка Rust на фоне изучения концепций системного программирования и системных API, шаг за шагом.
Вторая часть является прямым продолжением первой, поэтому если вы ее пропустили (или забыли контекст), то рекомендую сначала ознакомиться с ней. В этой части мы продолжаем реализацию протокола WebSocket.
14 Фреймы
Теперь мы готовы приступить к обмену реальными сообщениями. Но прежде всего нам нужно понять, как протокол WebSocket кодирует данные для пересылки. Описание можно найти в RFC, который обязывает нас упаковывать данные во фреймы, которые состоят из заголовка, содержащего метаинформацию (тип фрейма, и т.п.), и полезной нагрузки (payload), т.е., данных, которые мы хотим отправить.
Начало заголовка фрейма состоит из 2 байтов (или 16 битов), которые выглядят следующим образом:
Каждый ромб на схеме представляет один бит (единицу или ноль).
fin
— маркер последнего фрагмента в серии фрагментированных фреймов, которые используются, когда длина передаваемых данных неизвестна заранее — например, для потоковой передачи. Пока что мы не будем освещать эту тему в подробностях, так как в этом нет необходимости для решения непосредственно нашей задачи.rsv1
,rsv2
, иrsv3
зарезервированы для будущих версий WebSocket на случай появления каких-либо расширений протокола. Для нашей задачи это тоже неактуально, следовательно, мы можем смело игнорировать эти поля.opcode
определяет тип отправляемого или принимаемого фрейма, который может содержать двоичные данные или текст. Помимо того, существует несколько типов контрольных фреймов (control frames), используемых для пингов, или в тех случаях, когда одна из сторон прерывает соединение. Мы обсудим этот вопрос позже.masked
устанавливается в 1 (true) когда данные кодируются с использованием битовой маски. Подробности позднее.payload len
— размер передаваемых данных. Это поле требует особой логики для обработки, т.к. его значение зависит от размера данных — мы скоро к этому вернемся.
Учтите, что все эти данные запакованы всего в 2 октета (октет это байт, состоящий из 8 битов). Для примера возьмем такой объект, сериализованный в JSON:
{
"fin" : 1,
"rsv1" : 0,
"rsv2" : 0,
"rsv3" : 0,
"opcode" : 2,
"masked" : 1,
"payload_len": 64
}
Все эти значения также могут быть закодированны в двоичном формате, как группы битов:
{
"fin" : b1,
"rsv1" : b0,
"rsv2" : b0,
"rsv3" : b0,
"opcode" : b10,
"masked" : b1,
"payload_len": b1000000
}
Или же просто как одно большое двоичное число:
1.0.0.0.0010.1.1000000
, или же в шеснадцатеричной системе счисления: 0x82C0
.Это различные представления заголовка фрейма, получаемого от клиентов, и для удобства работы нам нужно привести двоичное число к структуре с отдельными полями, разделив его на отдельные части.
15 Распаковка заголовков
Для того, чтобы вытащить из числа в двоичной системе счисления нужную нам информацию, мы применим технику, известную как битовые маски. Звучит страшно, но на самом деле это довольно простая идея.
Мы просто "выделяем" биты, которые мы хотим "извлечь" из числа:
Серые квадраты отражают нули, а желтые — единицы.
Таким образом, схема соответствует битовой маске
00001111b
.
Следуя той же логике, это битовая маска
00100000b
.
Мы "накладываем" маску с помощью оператора &
(побитовое И, and
) следующим образом: байт & маска
.
Вот как это работает:
Опять же, серые квадраты соответствуют
0
, а зеленые — 1
.Имея две "переменные"
A = 1b
и B = 0b
, на выходе мы получим Out = 0b
.То же верно для значений
A = 0b
и B = 1b
.Out = 1b
на выходе получается только в том случае, если оба входящих числа — единицы. Вот и вся логика.
В том случае, когда у нас множество битов, операция И
(AND) применяется последовательно к каждому из них. Допустим, у нас есть значения A = 101b
и B = 011b
. В этом случае на выходе получается Out = 001b
, так как единственный общий бит у A
и B
— это третья единица, 1b
.
Теперь, используя ту же логику, мы можем применить маски с "выделенными" битами, примеры которых мы видели в начале, для извлечения интересующих нас частей заголовка фрейма.
16 Порядок байтов
Порядок байтов (endianness[1]) определяет, каким образом в памяти группируются два байта или более. Порядок может идти от младшего к старшему (little-endian), либо от старшего к младшему (big-endian).
Давайте посмотрим, в чем отличие.
Одно слово может содержать числа до 0xFFFF
.
Например, в случае со значением 0x0001
— в каком порядке следует разместить два этих байта в памяти?
Простейшая аналогия, которую можно провести — это запись слов на бумаге. Как правило, в европейских языках они пишутся слева направо. Таким же образом мы записываем и арабские числа — слева цифры старшего порядка (тысячи, сотни), справа — младшего порядка (десятки, единицы). В двоичной системе счисления байты старшего порядка — те, которые составляют основную часть числа, т.е., например, для числа
0x2A42
старшим байтом будет 0x2A
. Такая запись соответствует порядку big-endian.
В то же время в некоторых восточных языках (например, в иврите и арабском), слова принято записывать справа налево. Порядок записи в little-endian работает схожим образом: младшие байты там находятся всегда слева, и двоичное число 0x2A42
мы в этом случае запишем как 0x422A
.
Вот как эти числа выглядят в памяти:
В целом это все, что вам нужно знать про порядок байтов — просто запомните пример с двухбайтовым числом
0x0001
, которое в little-endian мы записываем "неправильным" образом — как 0x0100
.
Все это нам нужно знать по одной простой причине: по сети все данные приходят нам в big-endian, и это традиционный для сетевых приложений порядок, называемый также "сетевым порядком байт". При этом процессоры архитектуры x86 (и x86-64) используют порядок little-endian[1], что означает необходимость конвертации чисел из одного порядка в другой. Звучит сложно? На самом деле мы можем значительно упростить себе жизнь, используя библиотеки из стандартного репозитория Crates.
Интересующая нас библиотека называется byteorder, и она позволяет работать с байтами довольно простым способом, просто указывая интересующий нас порядок перед чтением или записью байтов. Все это мы рассмотрим на примере чуть позднее.
17 Рефакторинг процесса соединения
Перед тем, как мы перейдем к реализации распаковки байтов, займемся небольшой разминкой — давайте немного отрефакторим структуру
WebSocketClient
, чтобы наш код не превращался в лапшу. Напомню, что в этой структуре мы обрабатываем клиентские соединения, и метод read
в ней вызывается каждый раз, когда к нам поступают новые данные от клиента.
Для начала, давайте переместим код обработки подключений в отдельную функцию, просто переименовав fn read
в fn read_handshake
. Теперь мы можем добавить новую реализацию read
, которая будет в зависимости от состояния подключенного клиента передавать управление нужной реализации:
fn read(&mut self) {
match self.state {
ClientState::AwaitingHandshake => {
self.read_handshake();
},
_ => {}
}
}
Здесь все просто — мы делаем сопоставление с текущим значением
self.state
, и обрабатываем случай, когда состояние равно AwaitingHandshake
(ожидание начала соедниения по протоколу HTTP). Для всех остальных возможных случаев мы используем универсальный паттерн _
, потому что компилятор Rust требует, чтобы match
покрывал все возможные случаи для безопасной работы нашей программы.
И раз уж мы затронули тему текущего состояния подключения (self.state
), давайте проведем еще один небольшой рефакторинг, который нам доступен благодаря интересным возможностям Rust'а. В первой части статьи мы парсили заголовки HTTP-запроса с помощью специальной структуры, Parser
, которая содержит контекст и состояние HTTP-парсера. При этом нам эта структура нужна только один раз — когда мы еще не перешли к общению по протоколу WebSocket, поэтому неплохо было бы освобождать занимаемую ненужным нам парсером память.
Мы можем легко этого добиться с помощью простого трюка: мы просто переместим состояние парсера в перечисление (enum) ClientState
. Пример кода:
enum ClientState {
// Мы добавили состояние парсера прямо в enum:
AwaitingHandshake(RefCell<Parser<HttpParser>>),
…
}
struct WebSocketClient {
// Теперь мы можем удалить ненужную переменную из структуры подключения:
headers: Rc<RefCell<HashMap<String, String>>>,
…
interest: EventSet,
…
}
// Изменим код инициализации структуры:
impl WebSocketClient {
fn new(socket: TcpStream) -> WebSocketClient {
…
WebSocketClient {
socket: socket,
headers: headers.clone(),
interest: EventSet::readable(),
state: ClientState::AwaitingHandshake(RefCell::new(Parser::request(HttpParser {
current_key: None,
headers: headers.clone()
})))
}
}
}
Что это нам дает, и как такое вообще возможно? Строго говоря, перечисления — это структуры с некоторыми особенностями. Говоря еще точнее, перечисления в Rust — это алгебраические типы данных (ADT), про которые вы могли слышать в контексте некоторых функциональных языков (OCaml, Haskell, и др.). ADT позволяют нам определять сложные структуры данных (в том числе и рекурсивные), пользуясь при этом только системой типов. Так реализованы некоторые встроенные в язык структуры — например, уже знакомые нам
Option
и Result
. В этих структурах нет никакой черной магии, и вы при желании можете легко реализовать их самостоятельно.
Таким образом, Rust позволяет нам хранить состояние непосредственно вместе с информацией о типе. При этом, однако, поля перечислений неизменяемы, поэтому мы и используем RefCell
для инкапсуляции изменяемого состояния парсера.
Из-за того, что мы передали владение состоянием парсера в перечисление (а точнее — в тип ClientState::AwaitingHandshake
), вся связанная с ним память будет автоматически освобождена при переходе состояния к другому значению перечисления — например, к ClientState::Connected
. Этого эффекта мы и добивались. Теперь, после того, как мы убрали http_parser
из клиентской структуры, нам нужно соответствующим образом изменить функции read
и read_handshake
:
fn read(&mut self) {
match self.state {
ClientState::AwaitingHandshake(_) => {
self.read_handshake();
},
_ => {}
}
}
fn read_handshake(&mut self) {
let is_upgrade = if let ClientState::AwaitingHandshake(ref parser_state) = self.state {
let mut parser = parser_state.borrow_mut();
parser.parse(&buf);
parser.is_upgrade()
} else { false };
if is_upgrade {
// Изменяем текущее состояние
self.state = ClientState::HandshakeResponse;
…
}
}
В
read_handshake
можно увидеть незнакомый вид выражения, if let
:
let is_upgrade = if let ClientState::AwaitingHandshake(ref parser_state) = self.state {
if let
— упрощенная версия оператора сопоставления с образцом match
. С его помощью мы можем делать сопоставление только по одному образцу, что в некоторых случаях удобнее, чем match
, который требует обработки всех возможных случаев.
И обратите внимание на образец, с которым мы делаем сопоставление, и в частности ключевое слово ref
:
ClientState::AwaitingHandshake(ref parser_state)
ref
используется для получения значения по ссылке (то есть, для его заимствования) — без ref
образец выглядел бы так, будто мы хотим передать состояние парсера новому владельцу, метапеременной parser_state
(или же неявным образом его скопировать).
Если обратить внимание, мы могли бы получить ссылку на состояние парсера в функции read
, когда мы делали сопоставление с текущим состоянием подключения — например, так:
match self.state {
ClientState::AwaitingHandshake(ref parser_state) => {
self.read_handshake(parser_state);
},
…
}
Но такой подход является нарушением правил анализатора заимствований, и поэтому в результате мы бы получили такую ошибку:
error: cannot borrow `*self` as mutable because `self.state.0` is also borrowed as immutable
ClientState::AwaitingHandshake(ref parser_state) => self.read_handshake(...),
^~~~
Происходит это из-за того, что во время распаковки состояния парсера
ref parser_state
из self.state
мы также неявным образом заимствуем и self
— для предотвращения возможных будущих изменений и перемещений из self.state
.
При этом в функции read_handshake
во время входа мы еще раз заимствуем self
с возможностью изменения — в результате происходит попытка двойного заимствования self
для изменения, что, как вы помните, правилами компилятора в Rust запрещено — именно поэтому мы заимствуем состояние парсера в рамках функции read_handshake
.
Все это может казаться не очень удобным, но таким образом Rust защищает нас от повреждения памяти — следуя таким строгим, но простым правилам, мы всегда можем быть уверены, что наша программа работает корректно.
18 Распаковка фрейма
Наконец, давайте напишем код для распаковки заголовка фрейма. Но прежде, чем мы к этому перейдем, было бы неплохо разделить проект на модули — с ростом количества строк держать весь код в одном файле становится довольно неудобно. Мы начнем с модуля
frame.rs
, который будет содержать код, относящийся к работе с фреймами.
Дальше приводится полный листинг модуля — код довольно объемный, но не бойтесь, после мы рассмотрим его во всех подробностях:
use std::io;
use std::io::Read;
use std::error::Error;
use byteorder::{ReadBytesExt, BigEndian};
const PAYLOAD_LEN_U16: u8 = 126;
const PAYLOAD_LEN_U64: u8 = 127;
#[derive(Debug, Clone, Copy, PartialEq)]
#[allow(dead_code)]
pub enum OpCode {
TextFrame = 1,
BinaryFrame = 2,
ConnectionClose = 8,
Ping = 9,
Pong = 0xA
}
impl OpCode {
fn from(op: u8) -> Option<OpCode> {
match op {
1 => Some(OpCode::TextFrame),
2 => Some(OpCode::BinaryFrame),
8 => Some(OpCode::ConnectionClose),
9 => Some(OpCode::Ping),
0xA => Some(OpCode::Pong),
_ => None
}
}
}
pub struct WebSocketFrameHeader {
fin: bool,
rsv1: bool,
rsv2: bool,
rsv3: bool,
masked: bool,
opcode: OpCode,
payload_length: u8
}
pub struct WebSocketFrame {
header: WebSocketFrameHeader,
mask: Option<[u8; 4]>,
pub payload: Vec<u8>
}
impl WebSocketFrame {
pub fn read<R: Read>(input: &mut R) -> io::Result<WebSocketFrame> {
let buf = try!(input.read_u16::<BigEndian>());
let header = Self::parse_header(buf);
let len = try!(Self::read_length(header.payload_length, input));
let mask_key = if header.masked {
let mask = try!(Self::read_mask(input));
Some(mask)
} else {
None
};
let mut payload = try!(Self::read_payload(len, input));
if let Some(mask) = mask_key {
Self::apply_mask(mask, &mut payload);
}
Ok(WebSocketFrame {
header: header,
payload: payload,
mask: mask_key
})
}
pub fn get_opcode(&self) -> OpCode {
self.header.opcode.clone()
}
fn parse_header(buf: u16) -> Result<WebSocketFrameHeader, String> {
let opcode_num = ((buf >> 8) as u8) & 0x0F;
let opcode = OpCode::from(opcode_num);
if let Some(opcode) = opcode {
Ok(WebSocketFrameHeader {
fin: (buf >> 8) & 0x80 == 0x80,
rsv1: (buf >> 8) & 0x40 == 0x40,
rsv2: (buf >> 8) & 0x20 == 0x20,
rsv3: (buf >> 8) & 0x10 == 0x10,
opcode: opcode,
masked: buf & 0x80 == 0x80,
payload_length: (buf as u8) & 0x7F,
})
} else {
Err(format!("Некорректный opcode: {}", opcode_num))
}
}
fn apply_mask(mask: [u8; 4], bytes: &mut Vec<u8>) {
for (idx, c) in bytes.iter_mut().enumerate() {
*c = *c ^ mask[idx % 4];
}
}
fn read_mask<R: Read>(input: &mut R) -> io::Result<[u8; 4]> {
let mut buf = [0; 4];
try!(input.read(&mut buf));
Ok(buf)
}
fn read_payload<R: Read>(payload_len: usize, input: &mut R) -> io::Result<Vec<u8>> {
let mut payload: Vec<u8> = Vec::with_capacity(payload_len);
payload.extend(iter::repeat(0).take(payload_len));
try!(input.read(&mut payload));
Ok(payload)
}
fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> {
return match payload_len {
PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(|e| io::Error::from(e)),
PAYLOAD_LEN_U16 => input.read_u16::<BigEndian>().map(|v| v as usize).map_err(|e| io::Error::from(e)),
_ => Ok(payload_len as usize) // payload_len < 127
}
}
}
Первое изменение, которое вы могли заметить в коде — использование модификаторов
pub
для функций, структур, переменных, и констант в явном виде. Они определяют интерфейс модуля — т.е., символы, доступные другим модулям для импортирования с использованием конструкции use frame::{a, b, c};
.
Затем мы определяем две новые структуры: WebSocketFrameHeader
содержит данные заголовка фрейма, а WebSocketFrame
нужна для представления получаемых и отправляемых фреймов. Структура WebSocketFrame
также содержит функцию read
, которая позволяет читать фреймы не только из сокета, но и из любых других источников данных — файлов, строк, и т.д. Именно поэтому вместо явного указания типа аргумента как TcpStream
мы используем общий для источников данных абстрактный типаж Read
. Этого правила следует придерживаться всегда — редко когда возникает необходимость указывать конкретные типы для аргументов функций, поэтому лучше использовать интерфейсы/типажи — это может пригодиться, например, при написании тестов.
Чтение заголовка
Начнем разбор кода с распаковки заголовка:
fn parse_header(buf: [u8; 2]) -> WebSocketFrameHeader {
let opcode_num = ((buf >> 8) as u8) & 0x0F;
let opcode = OpCode::from(opcode_num);
if let Some(opcode) = opcode {
Ok(WebSocketFrameHeader {
fin: (buf >> 8) & 0x80 == 0x80,
rsv1: (buf >> 8) & 0x40 == 0x40,
rsv2: (buf >> 8) & 0x20 == 0x20,
rsv3: (buf >> 8) & 0x10 == 0x10,
opcode: opcode,
masked: buf & 0x80 == 0x80,
payload_length: (buf as u8) & 0x7F,
})
} else {
Err(format!("Некорректный opcode: {}", opcode_num))
}
}
В дополнение к побитовому
&
(И) мы используем операцию сдвига вправо — >>
.Идея тут даже проще, чем с побитовым
И
— работает это так:
То есть, мы просто сдвигаем определенное количество битов слева направо для более простой работы с побитовыми масками.
let opcode_num = ((buf >> 8) as u8) & 0x0F;
В примере выше мы сдвигаем 8 старших битов направо и применяем уже известную нам маску:
Дальнейший процесс отличается лишь тем, что мы применяем другие маски к другим частям заголовка.
Затем мы создаем OpCode
из номера типа фрейма:
let opcode = OpCode::from(opcode_num);
В подобных случаях всегда стоит использовать перечисления, потому что они обеспечивают типобезопасность. При использовании нетипизированных чисел и констант очень легко ошибиться, потому что легко можно использовать число, которое никакому реальному опкоду не соответствует. В
opcode_num
в таком случае будет неопределенное значение.К тому же, мы можем связать каждый тип в перечислении с каким-либо числом:
pub enum OpCode {
TextFrame = 1,
BinaryFrame = 2,
…
}
Чтобы из числа получить соответствующий тип
OpCode
, мы используем отдельную функцию:
impl OpCode {
fn from(op: u8) -> Option<OpCode> {
match op {
1 => Some(OpCode::TextFrame),
2 => Some(OpCode::BinaryFrame),
…
_ => None
}
}
}
Функция безопасна, потому что возвращаемое значение имеет тип
Option<OpCode>
— для тех случаев, когда номер не соответствует никакому опкоду, мы просто возвращаем None
.
Определение размера данных
Теперь мы должны определить размер данных. За это отвечает заголовок фрейма размером в 7 битов, поэтому максимально возможное значение для
payload len
— 127. При этом, очевидно, размер фреймов может значительно выходить за эти пределы, поэтому мы применяем специальную логику для определения настоящей длины в байтах. Цитата из RFC (раздел 5.2):
Размер данных, в байтах: если 0-125, то это размер данных. Если 126, то
размером являются следующие 2 байта, которые следует интерпретировать как
16-битное целое беззнаковое число. Если 127, размером являются следующие
8 байт, которые следует интерпретировать как 64-битное целое беззнаковое
число.
То есть, если говорить в терминах Rust, мы читаем
u16
, если payload_len
равно 126
, и u64
, если 127
. Чтобы не использовать в коде эти "магические" значения, определим пару констант с более-менее понятными названиями:
const PAYLOAD_LEN_U16: u8 = 126;
const PAYLOAD_LEN_U64: u8 = 127;
И напишем отдельную функцию для чтения размера данных:
fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> {
return match payload_len {
PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(From::from),
PAYLOAD_LEN_U16 => input.read_u16::<BigEndian>().map(|v| v as usize).map_err(From::from),
_ => Ok(payload_len as usize) // payload_len < 127
}
}
Код довольно плотный, поэтому давайте постепенно его разберем. В первую очередь следует обратить внимание на сигнатуру функции:
fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> {
Она принимает два аргумента:
payload_len
(значение из заголовка), и input
, который использует параметризованный тип R
. <R: Read>
определяет этот тип.
Параметризованные (или обобщенные) типы позволяют функциям принимать и возвращать значения не строго определенного, а произвольного типа — это своего рода "заглушки" для обозначений типов.
В функции read_length
мы принимаем в качестве аргумента любой тип, реализующий типаж Read
. Таким образом мы подразумеваем, что переменная input
должна отвечать определенному интерфейсу, через который мы читаем байты из некоего источника данных (который может быть сетевым сокетом, файлом, или даже просто массивом байтов — типаж Read
реализован для всех этих источников).
И, наконец, мы возвращаем результат типа io::Result<usize>
. io::Result
это псевдоним типа для Result
. В модуле std::io
он определен следующим образом:
type Result<T> = std::result::Result<T, io::Error>
То есть, это всего лишь краткая запись для типа
Result<T, io::Error>
, где T
— точно такой же параметризованный тип, который мы использовали выше для аргументов функции read_length
. Мы можем определять собственные псевдонимы типов таким же образом.
Далее мы делаем сопоставление с переменной payload_len
:
return match payload_len {
PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(From::from),
…
}
Здесь мы используем ту самую библиотеку
byteorder
, о которой мы говорили в начале. В начале модуля мы импортируем ее так:
use byteorder::{ReadBytesExt, BigEndian};
Модуль
ReadBytesExt
предоставляет типаж, который содержит методы для чтения чисел разных размеров — u16
, u32
, и u64
— в определенном порядке байтов. ReadBytesExt
работает интересным образом: он расширяет типаж Read
из стандартной библиотеки, что приводит к добавлению его методов к любым структурам, которые реализуют Read
. Rust позволяет так расширять любой типаж (или структуру), добавляя произвольные методы к уже существующим и будущим структурам[2].
Использовать эту возможность довольно легко — к примеру, вот как реализован ReadBytesExt
в коде byteorder:
/// Расширяем `Read`, добавляя методы для чтения чисел. (Для `std::io`.)
pub trait ReadBytesExt: io::Read {
fn read_u16<T: ByteOrder>(&mut self) -> Result<u16> {
// … код пропущен …
}
// … реализации read_u32, read_u64, и др. пропущены. …
}
/// Добавляем методы из `ReadBytesExt` ко всем типам, реализующим `Read`:
impl<R: io::Read + ?Sized> ReadBytesExt for R {}
Обратите внимание на последнюю строчку, где и происходит самое интересное. Здесь объявляется обобщенный тип
R
, включающий в себя все структуры, реализующие типаж Read
. Для этого типа R
(а точнее — для множества типов) и реализуется типаж ReadBytesExt
. Тело реализации (impl
) пустое, так как ReadBytesExt
уже содержит необходимые реализации методов (что напоминает абстрактные классы из "традиционных" ООП-языков).
Порядок байтов мы также указываем с помощью параметризации типов: он определяется типом, реализующим типаж ByteOrder
. В библиотеке byteorder
уже есть несколько таких реализаций, из которых нас больше всего интересуют BigEndian
и LittleEndian
.
Вот как мы все это используем в нашем коде, читая 8 байтов в сетевом порядке big-endian:
input.read_u64::<BigEndian>()
.map(|v| v as usize)
.map_err(From::from),
С помощью методов структуры
Result
, map
и map_err
, мы приводим результат Result<u64, byteorder::Error>
к нужному нам типу Result<usize, io::Error>
. map
здесь трансформирует тип возвращаемого значения, а map_err
, соответственно, тип ошибки.
Маска
Согласно протоколу, после того, как мы прочитали размер фрейма, нам нужно прочитать 4-байтную маску — но только в том случае, если бит
mask
равен 1.Вполне подходящий момент, чтобы использовать тип
Option
:
let mask_key = if header.masked {
let mask = try!(Self::read_mask(input));
Some(mask)
} else {
None
}
В этом куске кода можно обнаружить новую интересную конструкцию: макрос
try!
. Этот макрос позволяет нам обрабатывать рутинные ошибки в короткой форме. В данном случае он раскрывается в такой код:
match Self::read_mask(input) {
Ok(val) => val,
Err(err) => {
return Err(From::from(err))
}
}
Сначала он проверяет возвращенное функцией
read_mask
значение — если это не ошибка, то оно распаковывается из Ok(...)
и возвращается как результат:
Ok(val) => val,
Если же результатом является ошибка, то она конвертируется в возвращаемый функцией тип (в данном случае это
io::Error
), и немедленно возвращается как результат функции (прерывая дальнейшее ее выполнение):
Err(err) => {
return Err(From::from(err))
}
Макрос
try!
— простой и не загрязняющий код способ обработки ошибок. Обрабатывать каждую ошибку вручную в таком ключе, с сопоставлением через match
, было бы довольно утомительно и, честно говоря, не особо-то приятно.
Посмотрим на реализацию функции read_mask
(которая, впрочем, не представляет собой ничего особенного — мы просто последовательно считываем в массив 4 байта из источника данных):
fn read_mask<R: Read>(input: &mut R) -> io::Result<[u8; 4]> {
let mut buf = [0; 4];
try!(input.read(&mut buf));
Ok(buf)
}
После того, как мы прочитали маску, можно переходить непосредственно к чтению данных фрейма:
fn read_payload<R: Read>(payload_len: usize, input: &mut R) -> io::Result<Vec<u8>> {
let mut payload: Vec<u8> = Vec::with_capacity(payload_len);
payload.extend(iter::repeat(0).take(payload_len));
try!(input.read(&mut payload));
Ok(payload)
}
Здесь нужны пояснения. Первым делом мы создаем буфер, который будет содержать данные фрейма:
let mut payload: Vec<u8> = Vec::with_capacity(payload_len);
Vec::with_capacity
создает вектор с предварительно выделенной для него памятью. Мы используем вектор, динамически выделяемый массив, потому что при стандартном методе объявления байтовых массивов [0; <размер>]
мы не можем использовать переменную для указания размера, поскольку это статические буферы, и их размер не может быть изменен.
Векторы позволяют создавать массивы произвольного размера. Для того, чтобы их эффективно использовать, нужно знать о разнице между емкостью и длиной.
Если длина определяет количество элементов в векторе, емкость — это количество элементов, которые вектор может вместить без перевыделения памяти. При перевыделении памяти для вектора все его элементы перемещаются в другую область памяти — очевидно, это небыстрый процесс (особенно при большом размере элементов), поэтому если мы угадаем с предполагаемой емкостью, то вектор будет работать быстрее.
С этим связана интересная деталь. Мы считываем данные из источника Read
следующим образом:
try!(input.read(&mut payload));
Поскольку количество читаемых байт мы нигде явным образом не указываем, метод
read
для оценки использует размер переданного буфера. Так как размер вектора определяется его длиной, а не емкостью, и наш вектор payload
не содержит элементов, то, соответственно, read
попытается прочитать 0 байтов и ничего не вернет.
По этой причине мы используем такой трюк, предварительно заполняя буфер нулями:
payload.extend(iter::repeat(0).take(payload_len));
Мы создаем итератор, который бесконечно повторяет нули, и ограничиваем количество элементов переменной
payload_len
, в результате получая последовательность [0, 0, 0, ...payload_len]
.
Вот и все — теперь у нас есть вектор с данными фрейма. Нам остается только применить к нему маску, следуя описанию в RFC:
5.3 Маскирование данных от клиента к серверуЧтобы преобразовать маскированные данные в немаскированные данные,
или наоборот, применяется следующий алгоритм. Тот же алгоритм используется
вне зависимости от направления преобразования.Чтобы получить октет i преобразованных данных («transformed-octet-i»),
к октету i оригинальных данных («original-octet-i») применяется исключающее
ИЛИ (XOR) с октетом из ключа маски по индексу i с модулем 4 («masking-key-octet-j»):`j = i MOD 4 transformed-octet-i = original-octet-i XOR masking-key-octet-j`
Именно это мы и делаем далее, применяя исключающее ИЛИ к каждому байту буфера с использованием ключа маски:
fn apply_mask(mask: [u8; 4], bytes: &mut Vec<u8>) {
for (idx, c) in bytes.iter_mut().enumerate() {
*c = *c ^ mask[idx % 4];
}
}
Метод
iterate_mut()
позволяет изменять данные вектора прямо во время итерации, а enumerate()
добавляет к данным индекс, который нам нужен для получения нужного байта из ключа маски.
Побитовая операция исключающего ИЛИ, ^
, работает почти так же, как и побитовое И: 1
на выходе получается только в том случае, если истинны (т.е. равны 1
) A
или B
, но не оба из них.
Тогда как мы используем маски с исключающим И для получения нужной нам части битов, маски с исключающим ИЛИ используются для переключения битов в определенном порядке.
Порядок переключаемых битов может быть случайным, и в определенных случаях это позволяет использовать побитовые маски в качестве базовой меры безопасности при передаче данных (впрочем, не следует понимать это как "маскирование данных обеспечивает безопасность" — это не то же самое, что шифрование). По этой причине в протоколе WebSocket и применяются случайные маски, которые нужны для предотвращения атаки с помощью "отравления кэша".[3].
Итоги
Теперь у нас есть все необходимые части для обработки фреймов, и остается лишь слить их воедино, добавив новый код для получения фреймов в структуру
WebSocketClient
:
impl WebSocketClient {
// … часть кода пропущена …
fn read(&mut self) {
match self.state {
ClientState::AwaitingHandshake(_) => { … },
// Добавляем новый обработчик для состояния `Connected`:
ClientState::Connected => {
let frame = WebSocketFrame::read(&mut self.socket);
match frame {
Ok(frame) => println!("{:?}", frame),
Err(e) => println!("ошибка при чтении фрейма: {}", e)
}
}
}
}
}
Вот и все — можно попробовать запустить сервер с помощью команды
cargo run
и подключиться к нашему серверу из браузера. Входящие фреймы будут выводиться в окне терминала:
19 Отправка фреймов
Отправлять фреймы проще, чем получать, поскольку при отправке не используется маскирование данных. Все, что нам нужно — это создать новый фрейм из какого-нибудь источника (строки, массива байтов, и т.п.) и отправить данные в сокет.
Начнем с функции для создания исходящего фрейма на основе переданных параметров:
impl WebSocketFrameHeader {
fn new_header(len: usize, opcode: OpCode) -> WebSocketFrameHeader {
WebSocketFrameHeader {
fin: true,
rsv1: false, rsv2: false, rsv3: false,
masked: false,
payload_length: Self::determine_len(len),
opcode: opcode
}
}
fn determine_len(len: usize) -> u8 {
if len < (PAYLOAD_LEN_U16 as usize) {
len as u8
} else if len < (u16::MAX as usize) {
PAYLOAD_LEN_U16
} else {
PAYLOAD_LEN_U64
}
}
}
Функция
new_header
принимает 2 аргумента — длина передаваемых данных в байтах и опкод, который отвечает за тип фрейма. Функция determine_len
используется для вычисления "магического" значения, обозначающего размер фрейма (т.е., упоминаемые ранее константы 126 и 127).
Как вы помните, фреймы разделены на несколько типов — для текстовых сообщений и для двоичных данных. На данный момент наше приложение использует только текстовые сообщения, поэтому в первую очередь добавим поддержку конвертации строк во фреймы:
impl<'a> From<&'a str> for WebSocketFrame {
fn from(payload: &str) -> WebSocketFrame {
WebSocketFrame {
header: WebSocketFrameHeader::new_header(payload.len(), OpCode::TextFrame),
payload: Vec::from(payload),
mask: None
}
}
}
Мы используем специальный типаж
From
, который упрощает преобразования типов. Полная сигнатура реализуемого типажа — From<&'a str>
. Возникает естественный вопрос — почему &'a str
, а не просто &str
, и что означает <'a>
в impl<'a>
?
Здесь мы сталкиваемся с одной из важных идей языка Rust: временами жизни. На самом деле мы их уже встречали, и даже применяли неявным образом, потому что определение времени жизни — неотъемлемая часть анализатора заимствований, основы языка. Время жизни определяет, как долго область памяти, относящаяся к какой-либо переменной, остается выделенной и доступной для безопасного использования.
По умолчанию время жизни заимствованных значений ограничивается их областью видимости, но в отдельных случаях мы должны указывать время жизни явным образом — например, при возвращении из функции заимствованного значения. Для этого используются параметры времени жизни, которые определяются похожим на параметризованные типы образом. 'a
из примера кода выше и есть параметр времени жизни.
Обычно, как и в случае с типами переменных, компилятор умеет автоматически определять время жизни заимствований — поэтому, например, в вышеприведенном примере кода мы не используем параметр времени жизни нигде, кроме сигнатуры типажа Form
.
В общем-то, на данный момент это все, что необходимо знать об этой теме. Если вам интересно узнать больше, то об этом можно почитать в переводе книги "Язык программирования Rust".
Вернемся к конкретике и продолжим написание кода для создания фреймов. Теперь, когда у нас в структуре есть все необходимое для создания фрейма, остается преобразовать фрейм в двоичные данные — представление, используемое для передачи по сети. Первой добавим функцию, обратную parse_header
— serialize_header
для сериализации заголовка фрейма:
impl WebSocketFrame {
// … часть кода пропущена …
fn serialize_header(hdr: &WebSocketFrameHeader) -> u16 {
let b1 = ((hdr.fin as u8) << 7)
| ((hdr.rsv1 as u8) << 6)
| ((hdr.rsv2 as u8) << 5)
| ((hdr.rsv3 as u8) << 4)
| ((hdr.opcode as u8) & 0x0F);
let b2 = ((hdr.masked as u8) << 7)
| ((hdr.payload_length as u8) & 0x7F);
((b1 as u16) << 8) | (b2 as u16)
}
}
Как вы уже наверное догадались,
<<
— это операция побитового сдвига. Она работает точно таким же образом, как и операция сдвига >>
, только биты сдвигаются не вправо, а влево:
Вертикальная черта,
|
— побитовое ИЛИ
— еще одна полезная операция, которую можно использовать для сложения битов. То есть, там, где у одной последовательности битов 1
, а у другой — 0
, в результате всегда будет 1
:
Таким образом мы собираем два заголовочных байта в слово (
u16
) — сначала сдвигая первый байт влево на 8 бит, затем складывая результат со вторым байтом с помощью операции ИЛИ:
((b1 as u16) << 8) | (b2 as u16)
Остается собрать все в одно целое в функции
write
, которая будет записывать фрейм в выходной поток (строку, файл, и т.д.):
pub fn write<W: Write>(&self, output: &mut W) -> io::Result<()> {
let hdr = Self::serialize_header(&self.header);
try!(output.write_u16::<BigEndian>(hdr));
match self.header.payload_length {
PAYLOAD_LEN_U16 => try!(output.write_u16::<BigEndian>(self.payload.len() as u16)),
PAYLOAD_LEN_U64 => try!(output.write_u64::<BigEndian>(self.payload.len() as u64)),
_ => {}
}
try!(output.write(&self.payload));
Ok(())
}
Мы записываем 2 заголовочных байта, размер данных фрейма (если он превышает 125 байт), а затем и сами данные — в открытом виде, потому что RFC протокола прямо говорит о том, что при отправке данных от сервера к клиенту данные не должны маскироваться.
Вот и все — мы почти закончили с отправкой фреймов! Остается только поменять функцию WebSocketClient.write
— тот код, что там уже есть мы перемещаем в новую функцию write_handshake
, а в функции write
делаем сопоставление с текущим состоянием подключения:
impl WebSocketClient {
fn write(&mut self) {
match self.state {
ClientState::AwaitingHandshake(_) => {
self.write_handshake();
},
_ => {}
}
}
fn write_handshake(&mut self) {
let headers = self.headers.borrow();
let response_key = gen_key(&headers.get("Sec-WebSocket-Key").unwrap());
…
}
}
Теперь можно добавить обработчик состояния
ClientState::Connected
— в нем мы будем отправлять фреймы, поставленные в очередь. Очередь — необязательный элемент, но ее удобно использовать, когда нужно отправить сразу несколько фреймов, и мы не хотим каждый раз переключать контекст цикла событий с получения на отправку. Создадим новое поле в структуре WebSocketClient
:
struct WebSocketClient {
socket: TcpStream,
…
// Добавляем очередь исходящих фреймов:
outgoing: Vec<WebSocketFrame>
}
Не забывая подправить функцию-конструктор:
impl WebSocketClient {
// … часть кода пропущена …
fn new(socket: TcpStream) -> WebSocketClient {
let headers = Rc::new(RefCell::new(HashMap::new()));
WebSocketClient {
…
outgoing: Vec::new()
}
}
}
И, наконец, добавим отправку фреймов в функцию
write
:
match self.state {
ClientState::HandshakeResponse => …,
ClientState::Connected => {
println!("очередь фреймов на отправку: {}", self.outgoing.len());
for frame in self.outgoing.iter() {
if let Err(e) = frame.write(&mut self.socket) {
println!("ошибка передачи данных: {}", e);
}
}
self.outgoing.clear();
self.interest.remove(EventSet::writable());
self.interest.insert(EventSet::readable());
},
_ => {}
}
Здесь мы отправляем каждый фрейм из исходящей очереди, очищаем ее, и меняем подписку на события получения данных. Чтобы проверить, что все работает, давайте попробуем что-нибудь отправлять в ответ на каждый полученный от пользователя фрейм. Внесем изменения в
read
:
fn read(&mut self) {
match self.state {
ClientState::AwaitingHandshake(_) => { … },
ClientState::Connected => {
let frame = WebSocketFrame::read(&mut self.socket);
match frame {
Ok(frame) => {
println!("{:?}", frame),
// Добавляем ответный фрейм в очередь:
let reply_frame = WebSocketFrame::from("Привет!");
self.outgoing.push(reply_frame);
// Переключаем цикл событий в режим записи:
self.interest.remove(EventSet::readable());
self.interest.insert(EventSet::writable());
},
Err(e) => println!("ошибка при чтении фрейма: {}", e)
}
}
}
}
Можно запустить
cargo run
, открыть в браузере консоль, и убедиться, что все получилось. Выполните этот код в консоли браузера:
ws = new WebSocket('ws://127.0.0.1:10000');
ws.onmessage = function (event) {
console.log('Ответ от сервера: ', event.data);
};
И скажите "привет" нашему серверу:
ws.send('Привет!');
Вот что мы должны увидеть в результате:
20 Контрольные фреймы
Прежде чем перейти к интересным задачам, неплохо было бы закончить с небольшой рутиной. Нам нужно доработать отключение клиентов, на данный момент эта процедура не следует требованиям протокола. На деле это означает лишь то, что мы не обрабатываем несколько контрольных фреймов должным образом. Этот момент не потребует от нас особо глубоких исследований, поэтому пробежимся по-быстрому.
Продолжим рефакторинг, переместив логику получения фрейма из WebSocketClient.read
в отдельную функцию:
pub fn read(&mut self) {
match self.state {
ClientState::AwaitingHandshake(_) => self.read_handshake();
ClientState::Connected => self.read_frame(),
_ => {}
}
}
fn read_frame(&mut self) {
let frame = WebSocketFrame::read(&mut self.socket);
…
}
Для начала добавим поддержку для более простых контрольных фреймов, ping и pong, используемых для проверки активности соединения. Добавим сопоставление по типу фрейма в
read_frame
:
fn read_frame(&mut self) {
let frame = WebSocketFrame::read(&mut self.socket);
match frame {
Ok(frame) => {
match frame.get_opcode() {
OpCode::TextFrame => {
println!("{:?}", frame),
let reply_frame = WebSocketFrame::from("Привет!");
self.outgoing.push(reply_frame);
},
_ => {}
}
self.interest.remove(EventSet::readable());
self.interest.insert(EventSet::writable());
}
Err(e) => println!("ошибка при чтении фрейма: {}", e)
}
}
Сопоставление идет с результатом ранее определенной функции
get_opcode
, которая возвращает тип фрейма.
Теперь добавим образец для обработки фреймов типа Ping
:
match frame.get_opcode() {
OpCode::TextFrame => …,
OpCode::Ping => {
self.outgoing.push(WebSocketFrame::pong(&frame));
}
_ => {}
}
А также код для создания фрейма
Pong
на основе входящего пинга (как того требует протокол):
impl WebSocketFrame {
// … часть кода пропущена …
pub fn pong(ping_frame: &WebSocketFrame) -> WebSocketFrame {
let payload = ping_frame.payload.clone();
WebSocketFrame {
header: WebSocketFrameHeader::new_header(payload.len(), OpCode::Pong),
payload: payload,
mask: None
}
}
}
Теперь перейдем к фреймам типа
Close
:
match frame.get_opcode() {
OpCode::TextFrame => …,
OpCode::Ping => …,
// Добавим образец для обработки фреймов с опкодом ConnectionClose:
OpCode::ConnectionClose => {
self.outgoing.push(WebSocketFrame::close_from(&frame));
},
_ => {}
}
Протокол требует от нас отвечать на каждый получаемый запрос с опкодом
ConnectionClose
соответствующим закрывающим фреймом, при этом ответ должен основываться на входящих данных. Также протокол определяет, что закрывающий фрейм может содержать тело, и в том случае, если тело имеется, его начало должно быть озаглавлено двухбайтовым кодом состояния:
Первыми двумя байтами тела ДОЛЖНО быть 2-байтное беззнаковое целое (в сетевом порядке байтов),
сообщающее код состояния со значением /code/ определенным в Разделе 7.4.
Раздел 7.4 определяет список кодов состояния, однако на данный момент нам это не пригодится — мы рассмотрим эти детали позже. Пока что просто добавим новый конструктор фреймов,
close_from
, который берет запрос и создает на его основе ответ:
impl WebSocketFrame {
…
pub fn close_from(recv_frame: &WebSocketFrame) -> WebSocketFrame {
let body = if recv_frame.payload.len() > 0 {
let status_code = &recv_frame.payload[0..2];
let mut body = Vec::with_capacity(2);
body.write(status_code);
body
} else {
Vec::new()
};
WebSocketFrame {
header: WebSocketFrameHeader::new_header(body.len(), OpCode::ConnectionClose),
payload: body,
mask: None
}
}
}
Осталось немного. Когда мы получаем запрос на закрытие подключения, мы должны также закрыть исходное TCP-соединение. Можно сделать это в тот момент, когда мы отправляем фреймы из очереди в
WebSocketClient
, поменяв подписку на события hup
следующим образом:
impl WebSocketClient {
fn write(&mut self) {
match self.state {
ClientState::HandshakeResponse => …,
ClientState::Connected => {
// Добавим флаг для отслеживания наличия закрывающего фрейма в очереди:
let mut close_connection = false;
for frame in self.outgoing.iter() {
if let Err(e) = frame.write(&mut self.socket) {
println!("ошибка при записи фрейма: {}", e);
}
// Проверим, есть ли в очереди фрейм, который закрывает соединение:
if (frame.is_close()) {
close_connection = true;
}
}
self.outgoing.clear();
self.interest.remove(EventSet::writable());
// Если мы закрываем соединение - подписываемся на события `hup`, если нет -
// то продолжаем читать поступающие фреймы:
if (close_connection) {
self.interest.insert(EventSet::hup());
} else {
self.interest.insert(EventSet::readable());
}
},
_ => {}
}
}
}
Еще следует добавить вспомогательный метод для проверки опкода фрейма — закрывающий ли он:
impl WebSocketFrame {
// … часть кода пропущена …
pub fn is_close(&self) -> bool {
self.header.opcode == OpCode::ConnectionClose
}
}
И последний момент — в
WebSocketServer
нам надо не забыть удалить отключенных клиентов из словаря соединений:
impl Handler for WebSocketServer {
// … часть кода пропущена …
fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) {
if events.is_readable() {
…
}
if events.is_writable() {
…
}
// Обрабатываем закрытие подключения:
if events.is_hup() {
let client = self.clients.remove(&token).unwrap();
client.socket.shutdown(Shutdown::Both);
event_loop.deregister(&client.socket);
}
}
}
Ну и пока на этом все.
21 Заключение
Во второй части мы неплохо продвинулись вперед. Рассмотрели несколько важных концепций сетевого программирования и работы с протоколами, продолжили воплощать реализацию WebSocket и построили фундамент для библиотеки, позволяющей работать с WebSocket на более высоком уровне.
В третьей части мы еще больше упростим использование библиотеки, обсудим многопоточные циклы событий и осветим основы тестирования производительности и оптимизации. Чтобы не терять данную серию статей из вида, подписывайтесь на меня в Твиттере или на RSS-ленту блога.
Код, приведенный в статье, доступен в репозитории на GitHub. С удовольствием рассмотрю любые пулл-реквесты, если вы решите что-то добавить или улучшить.
Cпасибо за внимание!
Заметки
[1] Английское название, кстати, происходит из книги Джонатана Свифта "Путешествия Гулливера", отсылая к спору между "остроконечниками" (little-endian) и "тупоконечниками" (big-endian).
[1] Вас может удивить «неестественный» порядок байтов, принятый в архитектуре x86. На самом деле, здесь все просто: вспомните, как вы переносите числа, складывая их в столбик — начиная с младшего порядка. Процессор, использующий little-endian, складывает или умножает значения точно так же, начиная операции с младшего байта.
[2] Если вы когда-нибудь писали на C#, вы наверняка сталкивались там с похожей функциональностью — методами расширения. А фанаты Хаскеля и Скалы уже должно быть догадались, что это ничто иное, как тайпклассы.
[3] Более подробно с этой техникой атаки можно ознакомиться на сайте вопросов и ответов, посвященным безопасности (англ.). Атака описана в публикации "Talking to Yourself for Fun and Profit".
Как и всегда, спасибо podust за помощь в иллюстрировании и вычитке.
This entry passed through the Full-Text RSS service - if this is your content and you're reading it on someone else's site, please read the FAQ at http://ift.tt/jcXqJW.
Комментарии (0)