Изначально он был написан на Си, как и все остальные демоны в нашей компании. Однако мы столкнулись с тем, что существенная часть процессорного времени (около 10%) тратилась, по сути, впустую: это запуск интерпретатора и загрузка «ядра» нашего фреймворка. Поэтому, чтобы иметь возможность инициализировать интерпретатор и наш фреймворк только один раз, было принято решение переписать демон на PHP. Мы назвали его Phprocksyd (по аналогии с Phproxyd — PHP Proxy Daemon, демоном на Си, который у нас был до этого). Он принимает запросы на запуск отдельных классов и делает fork() на каждый запрос, а также умеет сообщать о статусе исполнения каждого из запусков. Такая архитектура во многом похожа на модель веб-сервера Apache, когда вся инициализация делается один раз в «мастере» и «дети» занимаются уже именно обработкой запроса. В качестве дополнительной «плюшки» мы получаем возможность включить opcode cache в CLI, который будет правильно работать, поскольку все дети наследуют ту же область общей памяти, что и мастер-процесс. Чтобы уменьшить задержки при обработке запроса на запуск, можно делать fork() заранее (prefork-модель), но в нашем случае задержки на fork() составляют около 1 мс, что нас вполне устраивает.
Однако, поскольку мы обновляем код весьма часто, этот демон также приходится часто перезапускать, иначе код, который загружен в него, может устареть. Так как каждый рестарт сопровождался бы массой ошибок вида connection reset by peer, включая отказы в обслуживании конечных пользователей (демон полезен не только для облака, но и для части нашего сайта), мы решили поискать способы сделать рестарт демона без потери уже установленных соединений. Существует одна популярная техника, с помощью которой делается graceful reload для демонов: делается fork-exec и при этом потомку передается дескриптор от listen-сокета. Таким образом, новые соединения принимаются уже новой версией демона, а старые «дорабатывают» с использованием старой версии.
В этой статье мы рассмотрим усложненный вариант graceful reload: старые подключения будут продолжать обрабатываться новой версией демона, что важно в нашем случае, поскольку иначе он будет запускать старый код.
Теория
Давайте для начала подумаем: возможно ли то, что мы хотим получить? И если да, то как этого достичь?
Поскольку демон работает под Linux, который является POSIX-совместимым, нам доступны следующие возможности:
- Все открытые файлы и сокеты — это числа, соответствующие номеру открытого дескриптора. Стандартный ввод, вывод и поток ошибок имеют дескрипторы 0, 1 и 2 соответственно.
- Никаких существенных отличий между открытым файлом, сокетом и каналом (pipe) нет (например, с сокетами можно работать как с помощью системных вызовов read/write, так и sendto/recvfrom).
- При выполнении системного вызова fork() все открытые дескрипторы наследуются с сохранением их номеров и позиций чтения/записи (в файлах).
- При выполнении системного вызова execve() все открытые дескрипторы также наследуются, причем в дополнение сохраняется PID процесса и, следовательно, привязка к своим детям.
- Список открытых дескрипторов процесса доступен из директории /dev/fd, который в Linux является симлинком на /proc/self/fd.
Таким образом, у нас есть все основания полагать, что наша задача выполнима, причем без особых усилий. Итак, приступим.
Патчи к PHP
К сожалению, есть одна небольшая деталь, которая осложняет нам работу: в PHP нет возможности получить номер файлового дескриптора для потоков (streams) и открыть файловый дескриптор по номеру (вместо этого открывается копия файлового дескриптора, что для нашего демона не подходит, поскольку мы очень тщательно следим за открытыми дескрипторами, чтобы не создавать утечек при рестарте и при запуске дочерних процессов).
Для начала мы внесем пару небольших патчей в код PHP, чтобы добавить возможность получить fd у потока (stream) и сделать так, чтобы fopen(php://fd/<num>) не приводил к открытию копии дескриптора (второе изменение несовместимо с текущим поведением PHP, поэтому вместо него можно добавить новый «адрес», к примеру, php://fdraw/<num>):
diff --git a/ext/standard/php_fopen_wrapper.c b/ext/standard/php_fopen_wrapper.c
index f8d7bda..fee964c 100644
--- a/ext/standard/php_fopen_wrapper.c
+++ b/ext/standard/php_fopen_wrapper.c
@@ -24,6 +24,7 @@
#if HAVE_UNISTD_H
#include <unistd.h>
#endif
+#include <fcntl.h>
#include "php.h"
#include "php_globals.h"
@@ -296,11 +297,11 @@ php_stream * php_stream_url_wrap_php(php_stream_wrapper *wrapper, char *path, ch
"The file descriptors must be non-negative numbers smaller than %d", dtablesize);
return NULL;
}
-
- fd = dup(fildes_ori);
- if (fd == -1) {
+
+ fd = fildes_ori;
+ if (fcntl(fildes_ori, F_GETFD) == -1) {
php_stream_wrapper_log_error(wrapper, options TSRMLS_CC,
- "Error duping file descriptor %ld; possibly it doesn't exist: "
+ "File descriptor %ld invalid: "
"[%d]: %s", fildes_ori, errno, strerror(errno));
return NULL;
}
diff --git a/ext/standard/streamsfuncs.c b/ext/standard/streamsfuncs.c
index 0610ecf..14fd3b0 100644
--- a/ext/standard/streamsfuncs.c
+++ b/ext/standard/streamsfuncs.c
@@ -24,6 +24,7 @@
#include "ext/standard/flock_compat.h"
#include "ext/standard/file.h"
#include "ext/standard/php_filestat.h"
+#include "ext/standard/php_fopen_wrappers.h"
#include "php_open_temporary_file.h"
#include "ext/standard/basic_functions.h"
#include "php_ini.h"
@@ -484,6 +485,7 @@ PHP_FUNCTION(stream_get_meta_data)
zval *arg1;
php_stream *stream;
zval *newval;
+ int tmp_fd;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg1) == FAILURE) {
return;
@@ -502,6 +504,9 @@ PHP_FUNCTION(stream_get_meta_data)
add_assoc_string(return_value, "wrapper_type", (char *)stream->wrapper->wops->label, 1);
}
add_assoc_string(return_value, "stream_type", (char *)stream->ops->label, 1);
+ if (SUCCESS == php_stream_cast(stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&tmp_fd, 1) && tmp_fd != -1) {
+ add_assoc_long(return_value, "fd", tmp_fd);
+ }
add_assoc_string(return_value, "mode", stream->mode, 1);
Мы добавили поле fd в результат, возвращаемый функцией stream_get_meta_data(), если оно имеет смысл (например, для zlib-потоков поле fd не будет присутствовать). Также мы заменили вызов dup() от переданного файлового дескриптора на простую его проверку. К сожалению, этот код не будет работать без модификаций под Windows, поскольку вызов fcntl() — это POSIX-specific, так что полный патч должен содержать в себе дополнительные ветки кода под другие ОС.
Демон без возможности перезапуска
Для начала напишем небольшой сервер, который сможет принимать запросы в формате JSON и отдавать какой-нибудь ответ. К примеру, он будет отдавать количество элементов в массиве, который пришел в запросе.
Демон прослушивает порт 31337. Результат работы должен быть примерно следующим:
$ telnet localhost 31337
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
{"hash":1} # ввод пользователя
"Request had 1 keys"
{"hash":1,"cnt":2} # ввод пользователя
"Request had 2 keys"
Мы будем использовать stream_socket_server() для того, чтобы начать слушать порт, и stream_select() для того, чтобы определить, какие дескрипторы готовы к чтению/записи.
<?php
class Simple
{
const PORT = 31337;
const SERVER_KEY = 'SERVER';
/** @var resource[] (client_id => stream) */
private $streams = [];
/** @var string[] (client_id => read buffer) */
private $read_buf = [];
/** @var string[] (client_id => write buffer) */
private $write_buf = [];
/** @var resource[] (client_id => stream from which to read) */
private $read = [];
/** @var resource[] (client_id => stream where to write) */
private $write = [];
/** @var int Total connection count */
private $conn_count = 0;
public function run()
{
$this->listen();
echo "Entering main loop\n";
$this->mainLoop();
}
protected function listen()
{
$port = self::PORT;
$ip_port = "0.0.0.0:$port";
$address = "tcp://$ip_port";
$server = stream_socket_server($address, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
if (!$server) {
fwrite(STDERR, "stream_socket_server failed: $errno $errstr\n");
exit(1);
}
$this->read[self::SERVER_KEY] = $server;
echo "Listening on $address\n";
}
public function response($stream_id, $response)
{
$json_resp = json_encode($response);
echo "stream$stream_id " . $json_resp . "\n";
$this->write($stream_id, $json_resp . "\n");
}
public function write($stream_id, $buf)
{
$this->write_buf[$stream_id] .= $buf;
if (!isset($this->write[$stream_id])) {
$this->write[$stream_id] = $this->streams[$stream_id];
}
}
public function accept($server)
{
echo "Accepting new connection\n";
$client = stream_socket_accept($server, 1, $peername);
$stream_id = ($this->conn_count++);
if (!$client) {
fwrite(STDERR, "Accept failed\n");
return;
}
stream_set_read_buffer($client, 0);
stream_set_write_buffer($client, 0);
stream_set_blocking($client, 0);
stream_set_timeout($client, 1);
$this->read_buf[$stream_id] = '';
$this->write_buf[$stream_id] = '';
$this->read[$stream_id] = $this->streams[$stream_id] = $client;
echo "Connected stream$stream_id: $peername\n";
}
private function disconnect($stream_id)
{
echo "Disconnect stream$stream_id\n";
unset($this->read_buf[$stream_id], $this->write_buf[$stream_id]);
unset($this->streams[$stream_id]);
unset($this->write[$stream_id], $this->read[$stream_id]);
}
private function handleRead($stream_id)
{
$buf = fread($this->streams[$stream_id], 8192);
if ($buf === false || $buf === '') {
echo "got EOF from stream$stream_id\n";
if (empty($this->write_buf[$stream_id])) {
$this->disconnect($stream_id);
} else {
unset($this->read[$stream_id]);
}
return;
}
$this->read_buf[$stream_id] .= $buf;
$this->processJSONRequests($stream_id);
}
private function processJSONRequests($stream_id)
{
if (!strpos($this->read_buf[$stream_id], "\n")) return;
$requests = explode("\n", $this->read_buf[$stream_id]);
$this->read_buf[$stream_id] = array_pop($requests);
foreach ($requests as $req) {
$res = json_decode(rtrim($req), true);
if ($res !== false) {
$this->response($stream_id, "Request had " . count($res) . " keys");
} else {
$this->response($stream_id, "Invalid JSON");
}
}
}
private function handleWrite($stream_id)
{
if (!isset($this->write_buf[$stream_id])) {
return;
}
$wrote = fwrite($this->streams[$stream_id], substr($this->write_buf[$stream_id], 0, 65536));
if ($wrote === false) {
fwrite(STDERR, "write failed into stream #$stream_id\n");
$this->disconnect($stream_id);
return;
}
if ($wrote === strlen($this->write_buf[$stream_id])) {
$this->write_buf[$stream_id] = '';
unset($this->write[$stream_id]);
if (empty($this->read[$stream_id])) {
$this->disconnect($stream_id);
}
} else {
$this->write_buf[$stream_id] = substr($this->write_buf[$stream_id], $wrote);
}
}
public function mainLoop()
{
while (true) {
$read = $this->read;
$write = $this->write;
$except = null;
echo "Selecting for " . count($read) . " reads, " . count($write) . " writes\n";
$n = stream_select($read, $write, $except, NULL);
if (!$n) {
fwrite(STDERR, "Could not stream_select()\n");
}
if (count($read)) {
echo "Can read from " . count($read) . " streams\n";
}
if (count($write)) {
echo "Can write to " . count($write) . " streams\n";
}
if (isset($read[self::SERVER_KEY])) {
$this->accept($read[self::SERVER_KEY]);
unset($read[self::SERVER_KEY]);
}
foreach ($read as $stream_id => $_) {
$this->handleRead($stream_id);
}
foreach ($write as $stream_id => $_) {
$this->handleWrite($stream_id);
}
}
}
}
$instance = new Simple();
$instance->run();
Код этого демона более чем стандартный, однако хотелось бы отметить одну деталь реализации: мы храним все буферы чтения и записи с привязкой к конкретным соединениям и выполняем обработку запросов прямо в том же месте, где читаем запрос. Это важно, потому что один из таких запросов может быть restart, и в этом случае до обработки следующих запросов дело не дойдет. Тем не менее, поскольку запросы мы еще не прочитали, в следующий раз stream_select() от тех же дескрипторов вернет такой же результат. Таким образом, мы не потеряем ни единого запроса, если осуществим рестарт прямо из обработчика команды (кроме случая, когда нам пришлют сразу несколько команд в то же соединение, и одной из этих команд будет restart).
Итак, как же сделать возможным перезапуск демона?
Демон с перезапуском и сохранением установленных соединений
Наш простейший пример не умел делать ничего полезного, поэтому давайте все же напишем демон, о котором шла речь в самом начале. Мы хотим получить примерно следующее (команды демону присылаются в виде «имя_команды[ JSON-данные]», ответ в виде JSON):
$ telnet localhost 31337
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
# сразу же попросим демон перезапуститься
restart
# ответ посылает уже перезапущенный демон
"Restarted successfully"
# запустим тестовый класс
run {"hash":1,"params":[1,2,3],"class":"TestClass1"}
# запущен успешно
{"error_text":"OK"}
# рестартим демон еще раз (его child TestClass1 все еще работает)
restart
"Restarted successfully"
# проверим статус задания: все еще работает
check {"hash":1}
{"error_text":"Still running"}
# подождем 5 секунд и проверим еще раз: класс TestClass1 отработал успешно
check {"hash":1}
{"retcode":0}
# демон помнит обо всех запусках, поэтому нужно делать free
check {"hash":1}
{"retcode":0}
free {"hash":1}
{"error_text":"OK"}
restart
"Restarted successfully"
# я обновил код, поэтому второй раз мы видим уже другой ответ на restart
restart
{"error_text":"Restarted successfully"}
bye
Connection closed by foreign host.
Идея для рестарта проста: мы будем создавать файл со всей необходимой информацией, а при запуске будем пытаться прочитать его и восстановить открытые файловые дескрипторы.
Для начала напишем код для записи в restart-файл:
echo "Creating restart file...\n";
if (!$res = $this->getFdRestartData()) {
fwrite(STDERR, "Could not get restart FD data, exiting, graceful restart is not supported\n");
exit(0);
}
/* Close all extra file descriptors that we do not know of, including opendir() descriptor :) */
$dh = opendir("/proc/self/fd");
$fds = [];
while (false !== ($file = readdir($dh))) {
if ($file[0] === '.') continue;
$fds[] = $file;
}
foreach ($fds as $fd) {
if (!isset($this->known_fds[$fd])) {
fclose(fopen("php://fd/" . $fd, 'r+'));
}
}
$contents = serialize($res);
if (file_put_contents(self::RESTART_DIR . self::RESTART_FILENAME, $contents) !== strlen($contents)) {
fwrite(STDERR, "Could not fully write restart file\n");
unlink(self::RESTART_DIR . self::RESTART_FILENAME);
}
Код для получения массива данных (функция getFdRestartData()) приведён ниже:
$res = [];
foreach (self::$restart_fd_resources as $prop) {
$res[$prop] = [];
foreach ($this->$prop as $k => $v) {
$meta = stream_get_meta_data($v);
if (!isset($meta['fd'])) {
fwrite(STDERR, "No fd in stream metadata for resource $v (key $k in $prop), got " . var_export($meta, true) . "\n");
return false;
}
$res[$prop][$k] = $meta['fd'];
$this->known_fds[$meta['fd']] = true;
}
}
foreach (self::$restart_fd_props as $prop) {
$res[$prop] = $this->$prop;
}
return $res;
В коде учитывается, что у нас есть 2 вида свойств:
- Свойства, содержащие ресурсы с соединениями: $restart_fd_resources = ['read', 'write', 'streams'].
- Свойства, содержащие буферы и другую информацию о соединениях, которые можно «сериализовать» в сыром виде: $restart_fd_props = ['read_buf', 'write_buf', 'conn_count'].
Также мы запоминаем все fd, сохраненные в restart-файле, и закрываем все остальные (если они есть), поскольку иначе можно допустить утечку файловых дескрипторов.
Дальше мы должны загрузить этот файл на старте и продолжать использовать открытые дескрипторы, как будто ничего не произошло :). Код двух функций (загрузки restart-файла и загрузки информации о файловых дескрипторах) приведён ниже:
Загрузка файла:
if (!file_exists(self::RESTART_DIR . self::RESTART_FILENAME)) {
return;
}
echo "Restart file found, trying to adopt it\n";
$contents = file_get_contents(self::RESTART_DIR . self::RESTART_FILENAME);
unlink(self::RESTART_DIR . self::RESTART_FILENAME);
if ($contents === false) {
fwrite(STDERR, "Could not read restart file\n");
return;
}
$res = unserialize($contents);
if (!$res) {
fwrite(STDERR, "Could not unserialize restart file contents");
return;
}
foreach (self::$restart_props as $prop) {
if (!array_key_exists($prop, $res)) {
fwrite(STDERR, "No property $prop in restart file\n");
continue;
}
$this->$prop = $res[$prop];
}
$this->loadFdRestartData($res);
Функция loadFdRestartData() по развертыванию массива файловых дескрипторов обратно:
$fd_resources = [];
foreach (self::$restart_fd_resources as $prop) {
if (!isset($res[$prop])) {
fwrite(STDERR, "Property '$prop' is not present in restart fd resources\n");
continue;
}
$pp = [];
foreach ($res[$prop] as $k => $v) {
if (isset($fd_resources[$v])) {
$pp[$k] = $fd_resources[$v];
} else {
$fp = fopen("php://fd/" . $v, 'r+');
if (!$fp) {
fwrite(STDERR, "Failed to open fd = $v, exiting\n");
exit(1);
}
stream_set_read_buffer($fp, 0);
stream_set_write_buffer($fp, 0);
stream_set_blocking($fp, 0);
stream_set_timeout($fp, self::CONN_TIMEOUT);
$fd_resources[$v] = $fp;
$pp[$k] = $fp;
}
}
$this->$prop = $pp;
}
foreach (self::$restart_fd_props as $prop) {
if (!isset($res[$prop])) {
fwrite(STDERR, "Property '$prop' is not present in restart fd properties\n");
continue;
}
$this->$prop = $res[$prop];
}
Мы заново выставляем значения read_buffer и write_buffer для открытых файловых дескрипторов и настраиваем тайм-ауты. Как ни странно, после этих манипуляций PHP совершенно спокойно делает accept() на эти файловые дескрипторы и продолжает нормально читать/писать в них даже при том, что он не знает, что это сокеты.
В конце концов, мы должны написать логику по запуску и слежению за статусом исполнения воркеров. Поскольку это не имеет отношения к теме статьи, полная реализация демона помещена на github-репозиторий, ссылка на который приведена ниже.
Заключение
Итак, в этой статье была описана реализация демона, который общается по JSON-протоколу и умеет запускать произвольные классы в отдельных процессах со слежением за процессом их исполнения. Для запуска отдельных классов используется модель fork() на запрос, поэтому для обработки запроса не требуется повторный запуск интерпретатора и загрузка фреймворка, при этом становится возможным использование opcode cache в CLI. Поскольку при каждом обновлении кода демон нужно перезапускать, необходимо обеспечить механизм плавного перезапуска этого демона (в нашей компании обновление кода иногда происходит раз в несколько минут, в виде «хотфиксов»).
Перезапуск происходит путем выполнения системного вызова execve(), в результате чего все потомки остаются привязаны к родителю (поскольку PID процесса при execve() не меняется). Также сохраняются все открытые файловые дескрипторы, что позволяет продолжать обрабатывать запросы от пользователей в уже открытых соединениях. Все сетевые буферы, информация о запущенных детях и об открытых дескрипторах сохраняется в отдельный restart-файл, который считывается новым экземпляром демона, после чего работа продолжается в стандартном event loop.
Полный код реализации можно увидеть на GitHub по следующему адресу: http://ift.tt/1wyrcJH
Вопросы, пожелания, уточнения приветствуются.
С уважением,
Юрий youROCK Насретдинов
Lead PHP developer
Badoo
This entry passed through the Full-Text RSS service - if this is your content and you're reading it on someone else's site, please read the FAQ at http://ift.tt/jcXqJW.
Комментариев нет:
Отправить комментарий