...

воскресенье, 27 октября 2013 г.

Высокопроизводительный SUN/ONCRPC сервер на Java NIO

В статьe о dCache рассказано о том, как использовать его в качестве NFS сервера. Но функциональной совместимости с существующими клиентами недостаточно, чтобы системой можно было пользоваться. Производительность тоже должна быть на высоте. Рабочей лошадкой NFS протокола является ONCRPC протокол. В dCache мы используем собственную реализацию, основанную на grizzly nio framework.
Немного истории для молодых



ONC RPC (Open Network Computing Remote Procedure Call) — протокол, созданный Sun Microsystems в конце 80х и опубликован в 1995г вместе с NFSv2. ONCRPC получил быстрое распространение и широко использовался, пока в начале 2000 не был вытеснен модными альтернативами, как CORBA, SOAP, а позже REST и JSON-RPC. Тем не менее, ONCRPC всё ещё используется, где простота и скорость важнее моды — в сетевых файловых системах.

Реализация




Чтобы не изобретать очередной велосипед, вначале мы использовали реализацию Remote Tea, но вскоре столкнулись с ограничениями, которые не могли легко решить: IPv6, GSSAPI, NIO. Так что велосипед пришлось изобретать, но не с нуля. Мы максимально сохранили совместимость с RemoteTea и адаптировали уже написанный код.
Grizzly-NIO



В основу мы взяли grizzly-nio, используемый в glassfish. Как и все современные NIO фраймворки, grizzly основан на обработке событий и шаблоне цепочка обязанностей. Т.е., мы описываем цепь фильтров, которые вызываются при определённом событии.

package org.glassfish.grizzly.filterchain;

import java.io.IOException;

public interface Filter {
public void onAdded(FilterChain fc);
public void onRemoved(FilterChain fc);
public void onFilterChainChanged(FilterChain fc);
public NextAction handleRead(FilterChainContext fcc) throws IOException;
public NextAction handleWrite(FilterChainContext fcc) throws IOException;
public NextAction handleConnect(FilterChainContext fcc) throws IOException;
public NextAction handleAccept(FilterChainContext fcc) throws IOException;
public NextAction handleEvent(FilterChainContext fcc, FilterChainEvent fce) throws IOException;
public NextAction handleClose(FilterChainContext fcc) throws IOException;
public void exceptionOccurred(FilterChainContext fcc, Throwable thrwbl);
}


Методы handleXXXX возвращают NextAction, который может быть StopAction или ContinueAction. Если фильтр возвращает StopAction, то обработка цепочки останавливается. В основном, нас интересуют handleRead и handleWrite, которые вызываются при чтении и записи сетевого соединения.



@Override
public NextAction handleRead(FilterChainContext ctx) throws IOException {

Buffer messageBuffer = ctx.getMessage();
if (!isMessageArrived(messageBuffer)) {
// пришла только часть сообщения
// ждём остальную часть
return ctx.getStopAction(messageBuffer);
}
// читаем полное сообщение
ctx.setMessage(getMessage(messageBuffer));
return ctx.getInvokeAction();
}


Боевой код


import java.io.IOException;

import java.nio.ByteOrder;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.memory.BuffersBuffer;

public class RpcMessageParserTCP extends BaseFilter {

/**
* RPC fragment record marker mask
*/
private final static int RPC_LAST_FRAG = 0x80000000;
/**
* RPC fragment size mask
*/
private final static int RPC_SIZE_MASK = 0x7fffffff;

@Override
public NextAction handleRead(FilterChainContext ctx) throws IOException {

Buffer messageBuffer = ctx.getMessage();
if (messageBuffer == null) {
return ctx.getStopAction();
}

if (!isAllFragmentsArrived(messageBuffer)) {
return ctx.getStopAction(messageBuffer);
}

ctx.setMessage(assembleXdr(messageBuffer));

final Buffer reminder = messageBuffer.hasRemaining()
? messageBuffer.split(messageBuffer.position()) : null;

return ctx.getInvokeAction(reminder);
}

@Override
public NextAction handleWrite(FilterChainContext ctx) throws IOException {

Buffer b = ctx.getMessage();
int len = b.remaining() | RPC_LAST_FRAG;

Buffer marker = GrizzlyMemoryManager.allocate(4);
marker.order(ByteOrder.BIG_ENDIAN);
marker.putInt(len);
marker.flip();
marker.allowBufferDispose(true);
b.allowBufferDispose(true);
Buffer composite = GrizzlyMemoryManager.createComposite(marker, b);
composite.allowBufferDispose(true);
ctx.setMessage(composite);
return ctx.getInvokeAction();
}

private boolean isAllFragmentsArrived(Buffer messageBuffer) throws IOException {
final Buffer buffer = messageBuffer.duplicate();
buffer.order(ByteOrder.BIG_ENDIAN);

while (buffer.remaining() >= 4) {

int messageMarker = buffer.getInt();
int size = getMessageSize(messageMarker);

/*
* fragmen size bigger than we have received
*/
if (size > buffer.remaining()) {
return false;
}

/*
* complete fragment received
*/
if (isLastFragment(messageMarker)) {
return true;
}

/*
* seek to the end of the current fragment
*/
buffer.position(buffer.position() + size);
}

return false;
}

private static int getMessageSize(int marker) {
return marker & RPC_SIZE_MASK;
}

private static boolean isLastFragment(int marker) {
return (marker & RPC_LAST_FRAG) != 0;
}

private Xdr assembleXdr(Buffer messageBuffer) {

Buffer currentFragment;
BuffersBuffer multipleFragments = null;

boolean messageComplete;
do {
int messageMarker = messageBuffer.getInt();

int size = getMessageSize(messageMarker);
messageComplete = isLastFragment(messageMarker);

int pos = messageBuffer.position();
currentFragment = messageBuffer.slice(pos, pos + size);
currentFragment.limit(size);

messageBuffer.position(pos + size);
if (!messageComplete & multipleFragments == null) {
/*
* we use composite buffer only if required
* as they not for free.
*/
multipleFragments = GrizzlyMemoryManager.create();
}

if (multipleFragments != null) {
multipleFragments.append(currentFragment);
}
} while (!messageComplete);

return new Xdr(multipleFragments == null ? currentFragment : multipleFragments);
}
}







Если мы остановили цепь из-за недостатка данных, то следующий вызов handleRead будет содержать композитный буфер( состоящий из нескольких буферов).

Примитивный сервер выглядит так



public static void main(String[] args) throws IOException {

FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
filterChainBuilder.add(new TransportFilter());
filterChainBuilder.add(new /* здесь парсер */);
filterChainBuilder.add(new /* здесь обработчик */);

final TCPNIOTransport transport =
TCPNIOTransportBuilder.newInstance().build();
transport.setProcessor(filterChainBuilder.build());
transport.bind(HOST, PORT);
transport.start();
System.in.read();
}




На странице проекта можно найти много примеров. По умолчанию, grizzly создаст столько тредов, сколько на машине имеется процессоров. Этот подход хорошо зарекомендовал себя на практике. На машине с 24 ядрами, наш NFS сервер с лёгкостью обслуживает порядка тысячи клиентов.

Сам проект активно развивается, и команда разработчиков быстро реагирует как на сообщения об ошибках, так и на посылаемые патчи и рекомендации.


oncrpc4j



Весь ONCRPC код оформлен в виде простой для использования отдельной библиотеки. Поддерживаются два типичных варианта интеграции — сервис, встроенный в приложение или сервис, инициализируемый как Spring bean.
Встроенное приложение


import org.dcache.xdr.RpcDispatchable;
import org.dcache.xdr.RpcCall;
import org.dcache.xdr.XdrVoid;
import org.dcache.xdr.OncRpcException;

public class Svcd {
private static final int DEFAULT_PORT = 1717;
private static final int PROG_NUMBER = 111017;
private static final int PROG_VERS = 1;

public static void main(String[] args) throws Exception {
RpcDispatchable dummy = new RpcDispatchable() {
@Override
public void dispatchOncRpcCall(RpcCall call)
throws OncRpcException, IOException {
call.reply(XdrVoid.XDR_VOID);
}
};
OncRpcSvc service = new OncRpcSvcBuilder()
.withTCP()
.withAutoPublish()
.withPort(DEFAULT_PORT)
.withSameThreadIoStrategy()
.build();
service.register(new OncRpcProgram(PROG_NUMBER, PROG_VERS), dummy);
service.start();
}
}


Интеграция со Spring



Я не боюсь XML


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

<bean id="my-rpc-svc" class="me.mypackage.Svcd">
<description>My RPC service</description>
</bean>

<bean id="my-rpc" class="org.dcache.xdr.OncRpcProgram">
<description>My RPC program number</description>
<constructor-arg index="0" value="1110001" />
<constructor-arg index="1" value="1" />
</bean>

<bean id="rpcsvc-builder" class="org.dcache.xdr.OncRpcSvcFactoryBean">
<description>Onc RPC service builder</description>
<property name="port" value="1717"/>
<property name="useTCP" value="true"/>
</bean>

<bean id="oncrpcsvc" class="org.dcache.xdr.OncRpcSvc" init-method="start" destroy-method="stop">
<description>My RPC service</description>
<constructor-arg ref="rpcsvc-builder"/>
<property name="programs">
<map>
<entry key-ref="my-rpc" value-ref="my-rpc-svc"/>
</map>
</property>
</bean>
</beans>





Производительность




Как видно из графика, код на яве не только не медленнее написанного на 'C', но и обгоняет линуксовское ядро (из-за бага, который, надеюсь, уже починили).


To steal and contribute code




Код доступен на гитхабе под LGPL лицензией.

This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at fivefilters.org/content-only/faq.php#publishers. Five Filters recommends:



Комментариев нет:

Отправить комментарий