...

суббота, 3 августа 2019 г.

RESTinio — это асинхронный HTTP-сервер. Простой пример из практики: отдача большого объема данных в ответ

Недавно мне довелось поработать над приложением, которое должно было контролировать скорость своих исходящих подключений. Например, подключаясь к одному URL приложение должно было ограничить себя, скажем, 200KiB/sec. А подключаясь к другому URL — всего 30KiB/sec.

Самым интересным моментом здесь оказалось тестирование этих самых ограничений. Мне потребовался HTTP-сервер, который бы отдавал трафик с какой-то заданной скоростью, например, 512KiB/sec. Тогда бы я мог видеть, действительно ли приложение выдерживает скорость 200KiB/sec или же оно срывается на более высокие скорости.

Но где взять такой HTTP-сервер?

Поскольку я имею некоторое отношение к встраиваемому в С++ приложения HTTP-серверу RESTinio, то не придумал ничего лучше, чем быстренько набросать на коленке простой тестовый HTTP-сервер, который способен отдавать клиенту длинный поток исходящих данных.

О том, насколько это было просто и хотелось бы рассказать в статье. Заодно узнать в комментариях, действительно ли это просто или же я сам себя обманываю. В принципе, данную статью можно рассматривать как продолжение предыдущей статьи про RESTinio под названием "RESTinio — это асинхронный HTTP-сервер. Асинхронный". Посему, если кому-то интересно прочитать о реальном, пусть и не очень серьезном применении RESTinio, то милости прошу под кат.

Общая идея упомянутого выше тестового сервера очень проста: когда клиент подключается к серверу и выполняет HTTP GET запрос, то взводится таймер, срабатывающий раз в секунду. Когда таймер срабатывает, то клиенту отсылается очередной блок данных заданного размера.


Но все несколько сложнее

Если клиент вычитывает данные с меньшим темпом, нежели отсылает сервер, то просто отсылать по N килобайт раз в секунду не есть хорошая идея. Поскольку данные начнут скапливаться в сокете и ни к чему хорошему это не приведет.

Поэтому при отсылке данных желательно на стороне HTTP-сервера контролировать готовность сокета к записи. Пока сокет готов (т.е. в нем не скопилось еще слишком много данных), то новую порцию отсылать можно. А вот если не готов, то нужно подождать пока сокет не перейдет в состояние готовности к записи.

Звучит разумно, но ведь операции ввода-вывода скрыты в потрохах RESTinio… Как тут узнать, можно ли записывать следующую порцию данных или нет?

Из данной ситуации можно выйти, если использовать after-write нотификаторы, которые есть в RESTinio. Например, мы можем написать так:

void request_handler(restinio::request_handle_t req) {
   req->create_response() // Начинаем формировать ответ.
      ... // Наполняем ответ содержимым.
      .done([](const auto & ec) {
          ... // Вот этот код будет вызван когда запись ответа закончится.
      });
}

Лямбда, переданная в метод done() будет вызвана когда RESTinio завершит запись исходящих данных. Соответственно, если сокет какое-то время был не готов к записи, то лямбда будет вызвана не сразу, а после того, как сокет придет в должное состояние и примет все исходящие данные.

За счет использования after-write нотификаторов логика работы тестового сервера будет такой:


  • отсылаем очередную порцию данных, вычисляем время, когда нам нужно было бы отослать следующую порцию при нормальном развитии событий;
  • вешаем after-write нотификатор на очередную порцию данных;
  • когда after-write нотификатор вызывается, мы проверяем, наступило ли время отсылки следующей порции. Если наступило, то сразу же инициируем отсылку следующей порции. Если не наступило, то взводим таймер.

В результате получится, что как только запись начнет притормаживать, отсылка новых данных приостановится. И возобновиться когда сокет будет готов принимать новые исходящие данные.


И еще немного сложного: chunked_output

RESTinio поддерживает три способа формирования ответа на HTTP-запрос. Самый простой способ, который применяется по умолчанию, в данном случае не подходит, т.к. мне требуется практически бесконечный поток исходящих данных. И такой поток, естественно, нельзя отдать в единственный вызов метода set_body.

Поэтому в описываемом тестовом сервере используется т.н. chunked_output. Т.е. при создании ответа я указываю RESTinio, что ответ будет формироваться частями. После чего просто периодически вызываю методы append_chunk для добавления к ответу очередной части и flush для записи накопленных частей в сокет.

Пожалуй, достаточно уже вступительных слов и пора перейти к рассмотрению самого кода, который можно найти в этом репозитории. Начнем с функции request_processor, которая вызывается для обработки каждого корректного HTTP-запроса. При этом углубимся в те функции, которые из request_processor вызываются. Ну а затем уже посмотрим, как именно request_processor ставится в соответствие тому или иному входящему HTTP-запросу.


Функция request_processor и её подручные

Функция request_processor вызывается для обработки нужных мне HTTP GET запросов. Ей в качестве аргументов передаются:


  • Asio-шный io_context, на котором ведется вся работа (он потребуется, например, для взведения таймеров);
  • размер одной части ответа. Т.е. если мне нужно отдавать исходящий поток с темпом в 512KiB/sec, то в качестве этого параметра будет передано значение 512KiB;
  • количество частей в ответе. На случай, если поток должен иметь какую-то ограниченную длину. Например, если нужно отдавать поток с темпом 512KiB/sec в течении 5 минут, то в качестве этого параметра будет передано значение 300 (60 блоков в минуту в течении 5 минут);
  • ну и сам входящий запрос для обработки.

Внутри request_processor создается объект с информацией о запросе и параметрах его обработки, после чего эта самая обработка и начинается:

void request_processor(
        asio_ns::io_context & ctx,
        std::size_t chunk_size,
        std::size_t count,
        restinio::request_handle_t req) {
    auto data = std::make_shared<response_data>(
            ctx,
            chunk_size,
            req->create_response<output_t>(),
            count);

    data->response_
        .append_header(restinio::http_field::server, "RESTinio")
        .append_header_date_field()
        .append_header(
                restinio::http_field::content_type,
                "text/plain; charset=utf-8")
        .flush();

    send_next_portion(data);
}

Тип response_data, содержащий все относящиеся к запросу параметры, выглядит следующим образом:

struct response_data {
    asio_ns::io_context & io_ctx_;
    std::size_t chunk_size_;
    response_t response_;
    std::size_t counter_;

    response_data(
        asio_ns::io_context & io_ctx,
        std::size_t chunk_size,
        response_t response,
        std::size_t counter)
        : io_ctx_{io_ctx}
        , chunk_size_{chunk_size}
        , response_{std::move(response)}
        , counter_{counter}
    {}
};

Тут нужно заметить, что одна из причин появления структуры response_data состоит в том, что объект типа restinio::response_builder_t<restinio::chunked_output_t> (а именно этот тип спрятан за коротким псевдонимом response_t) является moveable-, но не copyable-типом (по аналогии с std::unique_ptr). Поэтому этот объект нельзя просто так захватить в лямбда-функции, которая затем оборачивается в std::function. Но если объект-response поместить в динамически созданный экземпляр response_data, то умный указатель на экземпляр reponse_data уже можно без проблем захватывать в лямбда-функции с последующим сохранением этой лямбды в std::function.


Функция send_next_portion

Функция send_next_portion вызывается каждый раз, когда требуется отослать клиенту очередную часть ответа. Ничего сложного в ней не происходит, поэтому выглядит она достаточно просто и лаконично:

void send_next_portion(response_data_shptr data) {
    data->response_.append_chunk(make_buffer(data->chunk_size_));

    if(1u == data->counter_) {
        data->response_.flush();
        data->response_.done();
    }
    else {
        data->counter_ -= 1u;
        data->response_.flush(make_done_handler(data));
    }
}

Т.е. отсылаем очередную часть. И, если эта часть была последней, то завершаем обработку запроса. А если не последняя, то в метод flush передается after-write нотификатор, который создается, пожалуй, наиболее сложной функцией данного примера.


Функция make_done_handler

Функция make_done_handler отвечает за создание лямбды, которая будет передана в RESTinio в качестве after-write нотификатора. Этот нотификатор должен проверить, завершилась ли запись очередной части ответа успешно. Если да, то нужно разобраться, следует ли следующую часть отослать сразу же (т.е. были "тормоза" в сокете и темп отсылки выдерживать не получается), либо же после некоторой паузы. Если нужна пауза, то она обеспечивается через взведение таймера.

В общем-то, несложные действия, но в коде получается лямбда внутри лямбды, что может смутить людей, не привыкших к "современному" С++. Которому не так уж и мало лет чтобы называться современным ;)

auto make_done_handler(response_data_shptr data) {
    const auto next_timepoint = steady_clock::now() + 1s;
    return [=](const auto & ec) {
        if(!ec) {
            const auto now = steady_clock::now();
            if(now < next_timepoint) {
                auto timer = std::make_shared<asio_ns::steady_timer>(data->io_ctx_);
                timer->expires_after(next_timepoint - now);
                timer->async_wait([timer, data](const auto & ec) {
                        if(!ec)
                            send_next_portion(data);
                    });
            }
            else
                data->io_ctx_.post([data] { send_next_portion(data); });
        }
    };
}

На мой взгляд, основная сложность в этом коде проистекает из-за особенностей создания и "взвода" таймеров в Asio. По-моему, получается как-то слишком уж многословно. Но тут уж что есть, то есть. Зато не нужно никаких дополнительных библиотек привлекать.


Подключение express-like роутера

Показанные выше request_processor, send_next_portion и make_done_handler в общем-то и составляли самую первую версию моего тестового сервера, написанного буквально за 15 или 20 минут.

Но через пару дней использования этого тестового сервера оказалось, что в нем есть серьезный недостаток: он всегда отдает ответный поток с одинаковой скоростью. Скомпилировал со скоростью 512KiB/sec — отдает всем 512KiB/sec. Перекомпилировал со скоростью 20KiB/sec — будет отдавать всем 20KiB/sec и никак иначе. Что было неудобно, т.к. стало нужно иметь возможность получать ответы разной "толщины".

Тогда и появилась идея: а что, если скорость отдачи будет запрашиваться прямо в URL? Например, сделали запрос на localhost:8080/ и получили ответ с заранее заданной скоростью. А если сделали запрос на localhost:8080/128K, то стали получать ответ со скоростью 128KiB/sec.

Потом мысль пошла еще дальше: в URL также можно задавать и количество отдельных частей в ответе. Т.е. запрос localhost:8080/128K/3000 приведет к выдаче потока из 3000 частей со скоростью 128KiB/sec.

Нет проблем. В RESTinio есть возможность использовать маршрутизатор запросов, сделанный под влиянием ExpressJS. В итоге появилась вот такая функция описания обработчиков входящих HTTP-запросов:

auto make_router(asio_ns::io_context & ctx) {
    auto router = std::make_unique<router_t>();

    router->http_get("/", [&ctx](auto req, auto) {
            request_processor(ctx, 100u*1024u, 10000u, std::move(req));
            return restinio::request_accepted();
        });

    router->http_get(
                R"(/:value(\d+):multiplier([MmKkBb]?))",
                [&ctx](auto req, auto params) {

            const auto chunk_size = extract_chunk_size(params);

            if(0u != chunk_size) {
                request_processor(ctx, chunk_size, 10000u, std::move(req));
                return restinio::request_accepted();
            }
            else
                return restinio::request_rejected();
        });

    router->http_get(
                R"(/:value(\d+):multiplier([MmKkBb]?)/:count(\d+))",
                [&ctx](auto req, auto params) {

            const auto chunk_size = extract_chunk_size(params);
            const auto count = restinio::cast_to<std::size_t>(params["count"]);

            if(0u != chunk_size && 0u != count) {
                request_processor(ctx, chunk_size, count, std::move(req));
                return restinio::request_accepted();
            }
            else
                return restinio::request_rejected();
        });

    return router;
}

Здесь формируются обработчики HTTP GET запросов для URL трех типов:


  • вида http://localhost/;
  • вида http://localhost/<speed>[<U>]/;
  • вида http://localhost/<speed>[<U>]/<count>/

Где speed — это число, определяющее скорость, а U — это опциональный мультипликатор, который указывает, в каких единицах задана скорость. Так 128 или 128b означает скорость в 128 байт в секунду. А 128k — 128 килобайт в секунду.

На каждый URL вешается своя лямбда функция, которая разбирается с полученными параметрами, если все нормально, вызывает уже показанную выше функцию request_processor.

Вспомогательная функция extract_chunk_size выглядит следующим образом:

std::size_t extract_chunk_size(const restinio::router::route_params_t & params) {
    const auto multiplier = [](const auto sv) noexcept -> std::size_t {
        if(sv.empty() || "B" == sv || "b" == sv) return 1u;
        else if("K" == sv || "k" == sv) return 1024u;
        else return 1024u*1024u;
    };

    return restinio::cast_to<std::size_t>(params["value"]) *
            multiplier(params["multiplier"]);
}

Здесь C++ная лямбда используется для эмуляции локальных функций из других языков программирования.


Функция main

Осталось посмотреть, как все это запускается в функции main:

using router_t = restinio::router::express_router_t<>;
...
int main() {
    struct traits_t : public restinio::default_single_thread_traits_t {
        using logger_t = restinio::single_threaded_ostream_logger_t;
        using request_handler_t = router_t;
    };

    asio_ns::io_context io_ctx;

    restinio::run(
        io_ctx,
        restinio::on_this_thread<traits_t>()
            .port(8080)
            .address("localhost")
            .write_http_response_timelimit(60s)
            .request_handler(make_router(io_ctx)));

    return 0;
}

Что здесь происходит:


  1. Поскольку мне нужен не обычный штатный роутер запросов (который вообще ничего делать сам не может и перекладывает всю работу на плечи программиста), то я определяю новые свойства для своего HTTP-сервера. Для этого беру штатные свойства однопоточного HTTP-сервера (тип restinio::default_single_thread_traits_t) и указываю, что в качестве обработчика запросов будет использоваться экземпляр express-like роутера. Заодно, чтобы контролировать, что происходит внутри, указываю, чтобы HTTP-сервер использовал настоящий логгер (по умолчанию используется null_logger_t который вообще ничего не логирует).
  2. Поскольку мне нужно взводить таймеры внутри after-write нотификаторов, то мне нужен экземпляр io_context, с которым я смог бы работать. Поэтому я его создаю сам. Это дает мне возможность передать ссылку на мой io_context в функцию make_router.
  3. Остается только запустить HTTP-сервер в однопоточном варианте на ранее созданном мной io_context-е. Функция restinio::run вернет управление только когда HTTP-сервер завершит свою работу.

В статье не был показан полный код моего тестового сервера, только его основные моменты. Полный код, которого чуть-чуть больше из-за дополнительных typedef-ов и вспомогательных функций, несколько подлиннее. Увидеть его можно здесь. На момент написания статьи это 185 строк, включая пустые строки и комментарии. Ну и написаны эти 185 строк за пару-тройку подходов суммарной длительностью вряд ли более часа.

Мне такой результат понравился и задача оказалась интересной. В практическом плане быстро был получен нужный мне вспомогательный инструмент. И в плане дальнейшего развития RESTinio появились кое-какие мысли.

В общем, если кто-то еще не пробовал RESTinio, то я приглашаю попробовать. Сам проект живет на BitBucket, есть зеркало на GitHub. Задать вопрос или высказать свои предложения можно в Google-группе или прямо здесь, в комментариях.

Let's block ads! (Why?)

Комментариев нет:

Отправить комментарий