...

воскресенье, 28 июля 2013 г.

[Из песочницы] Контейнер серверного java-кода с поддержкой постоянного соединения

Disclaimer




Все, описанное в статье, является личным практическим опытом и не претендует на звание «истины в последней инстанции».

Преамбула




Здравствуйте. Я увлекаюсь созданием компьютерных игр. Моим любимым направлением, в котором я постоянно стараюсь совершенствоваться и узнавать что-то новое, являются браузерные многопользовательские игры.

Для создания прототипа для одной идеи в качестве контейнера сервлетов используется Apache Tomcat. Он общается с клиентской частью по http протоколу. Для такого типа игры схема вполне действующая, причем достаточно простая в реализации.

Но одной из преждевременных оптимизаций(да, это плохо, но тут я решил себе это позволить) стала идея использовать постоянное соединение между сервером и клиентом, т.к. в такой схеме не тратится время на открытие\закрытие соединения в каждом запросе. Для реализации схемы рассматривалось WebSocket API для Tomcat, но стало интересно написать свой велосипед, поэтому, встречайте рассказ о разработке под катом.

Инструменты




Итак, для реализации данной идеи использовались:


  • NetBeans 7.2.1: в ней, собственно, написан весь java-код для решения этой задачи

  • JDK 1.7

  • Netty: было решено использовать nio, чтобы сервер был максимально производительным на большом числе подключений, этот фреймворк отлично подошел.

  • Socket IO: на стороне клиента

  • Apache Tomcat 7.0.27: для сравнительных тестов

  • Maven: для сборки всего этого добра


Архитектура






Сначала рассмотрим логику работы приложения:

Контейнер представляется главным классом SocketServletContainer. Он служит для запуска/остановки контейнера, также содержит методы для управления сервлетами. Хочу заметить, что в статье термин сервлет обозначает объект, содержащий в себе метод с серверным кодом, и не имеет ничего общего со спецификацией Servlet от JCP. Просто мне удобнее называть такие объекты именно сервлетами.


Собственно, мы имеет базовый класс Servlet, от которого наследуются все сервлеты пользователя, класс сессии соединения(SocketSession), служащий для хранения информации о сессии и для отправки сообщений пользователю(почему я сделал именно так, объясню позже). Также были реализованы классы входящего и исходящего буфера(InputBuffer и OutputBuffer) соответственно.


Также понадобилось реализовать вспомогательный класс Config, отвечающий за парсинг файла конфигурации в формате xml. Следует упомянуть и о классах QueueHandler и TaskHandler.


QueueHandler является обработчиком очереди запросов и содержит метод для добавления экземпляра класса Task на обработку.

TaskHandler реализует интерфейс Runnable. В методе run содержится обработка переданного запроса.

Класс Task содержит в себе информацию о пришедшем запросе(к какому сервлету обратиться и параметры, переданные на сервер) и методы для работы с сетью(read\write).


Теперь рассмотрим организацию работы с сетью:


Я не буду подробно расписывать работу с Netty, потому что это уже сделали до меня( отдельное спасибо хабраюзеру Rena4ka за ее статью по Netty). Читайте на хабре или документацию на официальном сайте, как вам будет удобнее. Рассмотрю только ту часть, которая необходима для понимания основных принципов человеком, не имеющим опыта программирования с Netty.

Класс ServerPipelineFactory является фабрикой ChannelPipeline и нужен для функционирования Netty. Также пришлось реализовать 3 класса: Decoder, Encoder, NioHandler.

Первые 2- это обработчики пакетов, пришедших на сервер. Декодер отвечает за правильный разбор пакета, поступившего из сети и возвращает экземпляр класса Task. Encoder отвечает за корректную запись экземпляра Task в сеть и отправку на клиент.

NioHandler, по сути, является сетевым менеджером: принимает соединения, отправляет задачи на обработку и управляет сессиями.


Протокол




Для общения клиента и сервера нужен свой протокол. Я решил сделать его достаточно простым и текстовым.

В итоге, клиент посылает на сервер строку запроса, имеющую следующий вид: имя_сервлета[sysDiv]параметры_запроса.

Формат списка параметров запроса: name1=value1, name2=value2,…

Пример: «TS[sysDiv]message=Hello habrahabr.ru».


Нужно заметить, что протокол является симметричным в том смысле, что клиент получает строку с указанием сервлета, сформировавшего ответ, и списком переданных параметров.


А теперь перейдем непосредственно к рассмотрению кода нашего контейнера. Но обо всем по порядку.


Формат конфигурационного файла



<?xml version="1.0" encoding="utf-8"?>
<config>
<address>localhost</address>
<port>9999</port>
<workThreadCount>2</workThreadCount>
<processThreadCount>2</processThreadCount>
</config>


workThreadCount — число потоков, который принимают сообщения из сети и пишут в сеть(нужно для инициализации Netty).

processThreadCount — число потоков, обрабатывающих общую очередь запросов, пришедших на сервер. В них, собственно, происходит парсинг строк-запросов, работа всего серверного кода и формирование ответов.


SocketServletContainer




Данный класс представляет собой синглтон, поскольку он является «центральным» классом и так к нему будет удобнее обращаться с других классов программы. И, разумеется, подразумевается 1 копия сервера на приложение(поэтому не требуется потокобезопасная реализация синглтона). Что, по моему мнению, логично.

public class SocketServletContainer {
private Channel channel;
private ServerBootstrap networkServer;
private QueueHandler queueHander;
private Map<String, Servlet> servlets;
private Config conf;
private static SocketServletContainer server= null;

private static List<SocketSession> list= new ArrayList<SocketSession>();

public List<SocketSession> getListSession()
{
return list;
}

static public SocketServletContainer getInstance()
{
if (server==null)
{
server= new SocketServletContainer();
}

return server;
}

private SocketServletContainer()
{
conf= new Config("conf.xml");
//Парсим конфиг, в случае ошибки- кидаем Exception.
try
{
conf.read();
}
catch(Exception e)
{
throw new ContainerInitializeException(e.toString());
}

servlets= new HashMap<String, Servlet>();
}

public void start()
{
//Инициализируем Netty
ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS);
ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(conf.getWorkThreadCount(), 400000000,
2000000000, 60, TimeUnit.SECONDS);
networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, conf.getWorkThreadCount()));
networkServer.setOption("backlog", 500);
networkServer.setOption("connectTimeoutMillis", 10000);
networkServer.setPipelineFactory(new ServerPipelineFactory());
channel = networkServer.bind(new InetSocketAddress(conf.getAddress(), conf.getPort()));
//Создаем обработчик очереди запросов
queueHander= new QueueHandler(conf.getProcessThreadCount());

System.out.println("Ready");
}
//Метод «грубой» остановки сервера
public void stop()
{
if (channel.isOpen())
{
ChannelFuture future= channel.close();
future.awaitUninterruptibly();
}

queueHander.stop();
}

public QueueHandler getQueueHandler()
{
return this.queueHander;
}
//Метод регистрации сервлета в нашем контейнере
public void registerServlet(Servlet servlet, String name)
{
//Если сервлет еще не зарегистрирован- добавляем его в HashMap.
synchronized(servlets)
{
if (!servlets.containsKey(name))
{
servlets.put(name, servlet);
}
}
}

public Servlet getServlet(String name)
{
return servlets.get(name);
}
}




Servlet




Тут все просто и понятно. Метод doRequest вызывается, когда приходит пакет с указанием вызвать данный сервлет.

Зам: передача сессии в метод doRequest сделано с той целью, чтобы сервлет мог получить List всех имеющихся сессии и разослать им сообщение. Например, при реализации чата.

abstract public class Servlet {
abstract public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session);
}




SocketSession




Каждая сессия имеет свой уникальный id. Существует пул id-ников на 20000 подключенных клиентов. При превышении этого лимита сервер, при попытке создать сессию, будет логировать ошибку, отправлять сообщение об ошибке не клиент и закрывать канал.

Значение размера пула лучше вычислить опытным путем, в идеале он должен быть чуть больше числа максимально возможных одновременно подключенных клиентов на вашем сервере.

public class SocketSession {

private static byte[] idPool;

public int generateId()
{
synchronized(idPool)
{
if (idPool==null)
{
idPool= new byte[20000];
for (int j=0;j<idPool.length;j++)
{
idPool[j]=0;
}
}
for (int j=0;j<idPool.length;j++)
{
if (idPool[j]==0)
{
idPool[j]=1;
return j;
}
}
return -1;
}
}

private int id;
private Channel channel;

//Создаем сессию и добавляем ее в List уже имеющихся.
public SocketSession(Channel channel)
{
this.channel= channel;
this.id= generateId();
//если мест больше нет
if (this.id==-1)
{
OutputBuffer out= new OutputBuffer();
out.setPar("error", "Connection limit error");
send(out, "System Servlet");
//Залогируем ошибку
System.err.println("Connection limit error");
return;
}

SocketServletContainer.getInstance().getListSession().add(this);
}

public int getId()
{
return id;
}
//Отправка сообщению клиенту. Метод вынесен в класс Сессии для удобной синхронизации отправки с нескольких потоков.
public void send(OutputBuffer output, String servletName)
{

synchronized(channel)
{
channel.write(new Task(servletName, output.toString()));
}
}
//Закрытие сессии, например, при обрыве соединения или в каком-то «служебном» сервлете
public void close()
{
synchronized(idPool)
{
idPool[this.id]= 0;
}
channel.close();
SocketServletContainer.getInstance().getListSession().remove(this);
}
}




InputBuffer




В конструкторе происходит инициализация, строка source должна содержать список параметров запроса в заданном формате.

public class InputBuffer {

private Map<String, String> map= new HashMap<String, String>();

public InputBuffer(String source)
{
String[] par= source.split(",");
for (int j=0; j< par.length; j++)
{
if (!par[j].contains("="))
{
continue;
}
String[] data= par[j].split("=");
if (data.length<2)
{
System.err.println("Parsing Error");
continue;
}
map.put(data[0], data[1]);
}
}

public String getPar(String key)
{
return map.get(key);
}
}




OutputBuffer




Интерфейс класса достаточно понятен. Важное замечание- нужно переопределить метод toString(), поскольку именно он используется для формирования ответа в классе SocketSession.

public class OutputBuffer {

private List<String> list= new ArrayList<String>();

public void setPar(String key, String par)
{
list.add(key+"="+par);
}

@Override
public String toString()
{
StringBuilder res= new StringBuilder();

for (int j=0; j< list.size();j++)
{
res.append(list.get(j));
if (j!=list.size()-1)
{
res.append(",");
}
}
return res.toString();
}
}




Config




Реализацию этого класса я приводить не буду, потому что интерфейс его понятен из того, что используется в SocketServletContainer, а реализаций xml-парсеров на java в интернете достаточно много и, я надеюсь, читатель найдет для него наиболее подходящую.

Лично я использовал DOM-парсер.

QueueHandler




Данный класс тоже очень прост в реализации. Внутри он содержит пул потоков, который занимается выполнением задач(TaskHandler). Планирование я переложил на надежную и проверенную реализацию threadPool. Для создания пула используется фабрика Executors.newFixedThreadPool(n).

При вызове метода stop, уже имеющиеся задачи, стоящие в очереди, будут обработаны, но новые TaskHandler-ы на обработку приниматься уже не будут.



public class QueueHandler {

private ExecutorService threadPool;
private int threadPoolSize;

public QueueHandler(int size)
{
threadPoolSize= size;
threadPool= Executors.newFixedThreadPool(threadPoolSize);
}

public void stop()
{
threadPool.shutdown();
}

public void addTaskToProcess(Task task, SocketSession session)
{
threadPool.execute(new TaskHandler(task, session));
}
}




TaskHandler




Здесь все устроено также весьма несложно. В конструктор передается сессия игрока и задача, которую нужно обработать(Task).

public class TaskHandler implements Runnable{

private Task task;
private SocketSession session;

public TaskHandler(Task task, SocketSession session)
{
this.task= task;
this.session= session;
}


@Override
public void run()
{
//Пытаемся получить сервлет по имени
Servlet servlet= SocketServletContainer.getInstance().getServlet(task.getServletName());
OutputBuffer output= new OutputBuffer();
//Если сервлета нет, то фиксируем ошибку.
if (servlet==null)
{
output.setPar("error", "servlet not found");
session.send(output, "Error Message");
return;
}

//Вызываем его с нужными параметрами
servlet.doRequest(new InputBuffer(task.getBuffer()),output, session);
//Отсылаем измененный буфер.
session.send(output, task.getServletName());

}
}




Task




Объект Task имеет поля «имя сервлета» и «буфер». Буфер является строкой параметров запроса.

Статичные методы write/read требуется для получения экземпляров класса/ записи в канал для работы Netty.

public class Task {
private String servletName="";
private String buffer="";

public Task(String servletName, String buffer)
{
this.servletName= servletName;
this.buffer= buffer;
}

public Task()
{

}

public String getServletName() {
return servletName;
}

public String getBuffer() {
return buffer;
}
//Получаем данные из канала
public void get(ChannelBuffer buffer)
{

int length= buffer.readInt();
byte[] bytes= new byte[length];
buffer.readBytes(bytes);
String input= new String(bytes);
String[] data= input.split(java.util.regex.Pattern.quote("[sysDiv]"));

if (data.length<2)
{
System.err.println("Parsing error");
return;
}
this.servletName= data[0];
this.buffer= data[1];
}
//Пишем данные в канал
public void send(ChannelBuffer buffer)
{
String output= this.servletName + "[sysDiv]"+ this.buffer;
buffer.writeInt(output.getBytes().length);
buffer.writeBytes(output.getBytes());
}

public static Task read(ChannelBuffer buffer)
{
Task task= new Task();
task.get(buffer);
return task;
}

public static void write(Task task, ChannelBuffer buffer)
{
task.send(buffer);
}
}




Сетевая часть



Как и обещал, подробно рассматривать работу с netty я не буду, просто приведу код и поясню моменты, которые касаются реализации логики.

ServerPipelineFactory



public class ServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new Encoder(),new Decoder(),new NioHandler());
}
}




Decoder




Пакет на сервер приходит в следующем формате: первые 4 байта- длина «полезных» данных, далее идут сами данные. Decoder реализует считывание, чтобы в слоях, расположенных выше, мы могли не думать о том, что данные еще не пришли полностью.

public class Decoder extends ReplayingDecoder<DecoderState> {

public enum DecoderState
{
READ_LENGTH,
READ_CONTENT;
}

public Decoder()
{
super(DecoderState.READ_LENGTH);
}
private int length;

@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
ctx.sendUpstream(e);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
ctx.sendUpstream(e);
}

@Override
protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, DecoderState state)
{
switch (state)
{
case READ_LENGTH:
length = buffer.readInt();
checkpoint(DecoderState.READ_CONTENT);

case READ_CONTENT:
ChannelBuffer frame= buffer.readBytes(length);
//Читаем task из сети и передаем его выше
Task task= Task.read(frame);
checkpoint(DecoderState.READ_LENGTH);
return task;

default:
throw new Error( "Shouldn't reach here" );
}
}
}




Encoder



public class Encoder extends OneToOneEncoder {

@Override
protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception {
//Если передан не экземпляр Task, то просто кидаем его выше
if(!(obj instanceof Task))
{
return obj;
}

Task task= (Task)obj;

ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
//Пишем task в сеть
Task.write(task, buffer);
return buffer;
}
}




NioHandler




Этот объект обрабатывает основные события работы с сетью: подключение клиентов, получение сообщений, обрыв соединения.

public class NioHandler extends SimpleChannelUpstreamHandler {

private SocketSession session;
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

//Тут заводим сессию работы сокета
session= new SocketSession(e.getChannel());
System.out.println("Has connect");
}

@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
session.close();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
if(e.getChannel().isOpen())
{
//Получаем экземпляр Task и передаем его на обработку в QueueHandler.
Task message= (Task)e.getMessage();
SocketServletContainer.getInstance().getQueueHandler().addTaskToProcess(message, session);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Произошла ошибка. Логируем ошибку, закрываем канал и сессию.
session.close();
e.getCause().printStackTrace(System.err);
ctx.getChannel().close();
}
}


Пример сервлета



public class TS extends Servlet {

@Override
public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session) {
output.setPar("request", input.getPar("message")+session.getId());
}
}


Как это работает или главный класс приложения




Собственно, в приложении не так уж много строк кода, все просто и прозрачно.

public class App
{
public static void main( String[] args ) throws ContainerInitializeException
{
SocketServletContainer server= SocketServletContainer.getInstance();
server.registerServlet(new TS(), "TS");
server.start();
}
}




Немного тестов




Что ж, контейнер написан, он работает. Заморачиваться с созданием клиентской обертки под него я не стал, ограничился прямой записью в сокет, выглядит это примерно так:

socket = new Socket("127.0.0.1", 9999);
DataOutputStream dos= new DataOutputStream(socket.getOutputStream());
DataInputStream dis= new DataInputStream(socket.getInputStream());
String buffer= "TS[sysDiv]message=IloveJava";

dos.writeInt(buffer.getBytes().length+4);
dos.writeInt(buffer.getBytes().length);
dos.write(buffer.getBytes());
dos.flush();




В общем, как и говорилось в самом начале статьи, создание такой системы- это одна из преждевременных оптимизаций, которую я решил себе позволить. Поэтому глупо было бы не провести пару тестов, раз уж мы все это написали.

Собственно, я решил сравнить это решение с контейнером сервлетов, работающем по http.

Для тестов был написан сервлет, крутящийся в Tomcat и сервлет, работающий внутри созданного контейнера.


Зам: Я намеренно сравнил производительность http-протокола и решения на сокетах, поскольку web-socket, которые Tomcat успешно поддерживают, мной не рассматривались для реализации данного проекта игры.


Особенности теста:



  • Оба сервлета выполняли примерно одинаковые операции, а именно, записывали одинаковую строку в выходной поток

  • Интересующий меня параметр тестирования- время отклика от сервера при обработке запроса

  • Замеры производились на локалхосте

  • Время отклика я получал с помощью стандартных средств языка Java

  • Меня интересовали не конкретные цифры, а сравнение порядков результатов, поскольку тест был написан весьма «грубо»

  • Для каждого теста было выполнено по 10 000 запросов с одинаковыми параметрами, после чего было вычислено среднее значение




А результат таков: в среднем, обработка 1 «пустого» сервлета для Tomcat заняла 0, 99 мс.

Описанный в статье контейнер справился с аналогичной задачей за 0, 09 мс.

Мы имеем 2 результата, различающихся на порядок. Но с учетом того, что мысль использовать сокеты пришла ко мне не из-за скорости, а из-за необходимости иметь возможность достучаться от сервера к клиенту, результат можно признать более, чем удовлетворительным.


TODO:




Также остался небольшой список, что еще можно было бы реализовать для подобной системы, который я также приведу:

  1. Валидацию входных данных. У input-буфера можно добавить метод validate(String mask), который по маске типов данных для соответствующих параметров бы автоматически конвертировал их к нужному(не только строковому) типу. Выглядеть это могло бы примерно так: validate(“message:String, count:int”);

  2. Добавить шифрование данных. Именно для этого заложена запись в буфер byte[], а не writeUTF8(), хотя протокол и является текстовым. Можно реализовать interface Crypto{}, который имел бы 2 метода: code() и encode(). И Реализацию такого интерфейса передавать в SocketServletContainer(), для удобной смены или выбора алгоритма криптографии.

  3. Работу с аннотациями(как это сделано в Tomcat) и отложенную инициализацию сервлетов.

  4. Более «безопасный» парсинг входного буфера с экранированием разделителя

  5. Кучу других полезных мелочей, который могли бы понадобиться в такой системе.




Вместо заключения




Результаты работы контейнера вполне удовлетворили мои ожидания. Использование NIO позволило экономично расходовать потоки и сконфигурировать контейнер под имеющееся железо так, чтобы он работал максимально эффективно.

Функционал, который предоставляет контейнер, позволяет достаточно удобно разрабатывать приложение, не заботясь о «низкоуровневых» вещах, типа парсинга пакетов и так далее(мне, привыкшему разрабатывать на tomcat, все показалось достаточно удобным:)).

Но я все-таки не решился использовать подобное решение как основу реального проекта, потому что для меня наличие сокетов, конечно, было бы очень кстати(для обратной связи сервер-клиент), но, в принципе, не критично. А вот производительность и надежность Tomcat, проверенного годами и тысячами разработчиков, не вызывает вопросов.

Я планирую использовать реализованную систему в «узких», но некритичных местах, в которых http-протокол использовать совсем уж нехорошо, например, она отлично подойдет для реализации чата.


Надеюсь, данная статья будет интересна для читателей и кому-нибудь окажется полезной. Старался передать целостную картину, возможно, немного переборщил с рассуждениями и количеством кода. С удовольствием выслушаю ваши вопросы и предложения по написанному материалу.


This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at fivefilters.org/content-only/faq.php#publishers. Five Filters recommends: 'You Say What You Like, Because They Like What You Say' - http://www.medialens.org/index.php/alerts/alert-archive/alerts-2013/731-you-say-what-you-like-because-they-like-what-you-say.html


Комментариев нет:

Отправить комментарий