Немного истории для молодых
ONC RPC (Open Network Computing Remote Procedure Call) — протокол, созданный Sun Microsystems в конце 80х и опубликован в 1995г вместе с NFSv2. ONCRPC получил быстрое распространение и широко использовался, пока в начале 2000 не был вытеснен модными альтернативами, как CORBA, SOAP, а позже REST и JSON-RPC. Тем не менее, ONCRPC всё ещё используется, где простота и скорость важнее моды — в сетевых файловых системах.
Реализация
Чтобы не изобретать очередной велосипед, вначале мы использовали реализацию Remote Tea, но вскоре столкнулись с ограничениями, которые не могли легко решить: IPv6, GSSAPI, NIO. Так что велосипед пришлось изобретать, но не с нуля. Мы максимально сохранили совместимость с RemoteTea и адаптировали уже написанный код.
Grizzly-NIO
В основу мы взяли grizzly-nio, используемый в glassfish. Как и все современные NIO фраймворки, grizzly основан на обработке событий и шаблоне цепочка обязанностей. Т.е., мы описываем цепь фильтров, которые вызываются при определённом событии.
package org.glassfish.grizzly.filterchain;
import java.io.IOException;
public interface Filter {
public void onAdded(FilterChain fc);
public void onRemoved(FilterChain fc);
public void onFilterChainChanged(FilterChain fc);
public NextAction handleRead(FilterChainContext fcc) throws IOException;
public NextAction handleWrite(FilterChainContext fcc) throws IOException;
public NextAction handleConnect(FilterChainContext fcc) throws IOException;
public NextAction handleAccept(FilterChainContext fcc) throws IOException;
public NextAction handleEvent(FilterChainContext fcc, FilterChainEvent fce) throws IOException;
public NextAction handleClose(FilterChainContext fcc) throws IOException;
public void exceptionOccurred(FilterChainContext fcc, Throwable thrwbl);
}
Методы handleXXXX возвращают NextAction, который может быть StopAction или ContinueAction. Если фильтр возвращает StopAction, то обработка цепочки останавливается. В основном, нас интересуют handleRead и handleWrite, которые вызываются при чтении и записи сетевого соединения.
@Override
public NextAction handleRead(FilterChainContext ctx) throws IOException {
Buffer messageBuffer = ctx.getMessage();
if (!isMessageArrived(messageBuffer)) {
// пришла только часть сообщения
// ждём остальную часть
return ctx.getStopAction(messageBuffer);
}
// читаем полное сообщение
ctx.setMessage(getMessage(messageBuffer));
return ctx.getInvokeAction();
}
import java.io.IOException;
import java.nio.ByteOrder;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.memory.BuffersBuffer;
public class RpcMessageParserTCP extends BaseFilter {
/**
* RPC fragment record marker mask
*/
private final static int RPC_LAST_FRAG = 0x80000000;
/**
* RPC fragment size mask
*/
private final static int RPC_SIZE_MASK = 0x7fffffff;
@Override
public NextAction handleRead(FilterChainContext ctx) throws IOException {
Buffer messageBuffer = ctx.getMessage();
if (messageBuffer == null) {
return ctx.getStopAction();
}
if (!isAllFragmentsArrived(messageBuffer)) {
return ctx.getStopAction(messageBuffer);
}
ctx.setMessage(assembleXdr(messageBuffer));
final Buffer reminder = messageBuffer.hasRemaining()
? messageBuffer.split(messageBuffer.position()) : null;
return ctx.getInvokeAction(reminder);
}
@Override
public NextAction handleWrite(FilterChainContext ctx) throws IOException {
Buffer b = ctx.getMessage();
int len = b.remaining() | RPC_LAST_FRAG;
Buffer marker = GrizzlyMemoryManager.allocate(4);
marker.order(ByteOrder.BIG_ENDIAN);
marker.putInt(len);
marker.flip();
marker.allowBufferDispose(true);
b.allowBufferDispose(true);
Buffer composite = GrizzlyMemoryManager.createComposite(marker, b);
composite.allowBufferDispose(true);
ctx.setMessage(composite);
return ctx.getInvokeAction();
}
private boolean isAllFragmentsArrived(Buffer messageBuffer) throws IOException {
final Buffer buffer = messageBuffer.duplicate();
buffer.order(ByteOrder.BIG_ENDIAN);
while (buffer.remaining() >= 4) {
int messageMarker = buffer.getInt();
int size = getMessageSize(messageMarker);
/*
* fragmen size bigger than we have received
*/
if (size > buffer.remaining()) {
return false;
}
/*
* complete fragment received
*/
if (isLastFragment(messageMarker)) {
return true;
}
/*
* seek to the end of the current fragment
*/
buffer.position(buffer.position() + size);
}
return false;
}
private static int getMessageSize(int marker) {
return marker & RPC_SIZE_MASK;
}
private static boolean isLastFragment(int marker) {
return (marker & RPC_LAST_FRAG) != 0;
}
private Xdr assembleXdr(Buffer messageBuffer) {
Buffer currentFragment;
BuffersBuffer multipleFragments = null;
boolean messageComplete;
do {
int messageMarker = messageBuffer.getInt();
int size = getMessageSize(messageMarker);
messageComplete = isLastFragment(messageMarker);
int pos = messageBuffer.position();
currentFragment = messageBuffer.slice(pos, pos + size);
currentFragment.limit(size);
messageBuffer.position(pos + size);
if (!messageComplete & multipleFragments == null) {
/*
* we use composite buffer only if required
* as they not for free.
*/
multipleFragments = GrizzlyMemoryManager.create();
}
if (multipleFragments != null) {
multipleFragments.append(currentFragment);
}
} while (!messageComplete);
return new Xdr(multipleFragments == null ? currentFragment : multipleFragments);
}
}
Если мы остановили цепь из-за недостатка данных, то следующий вызов handleRead будет содержать композитный буфер( состоящий из нескольких буферов).
Примитивный сервер выглядит так
public static void main(String[] args) throws IOException {
FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
filterChainBuilder.add(new TransportFilter());
filterChainBuilder.add(new /* здесь парсер */);
filterChainBuilder.add(new /* здесь обработчик */);
final TCPNIOTransport transport =
TCPNIOTransportBuilder.newInstance().build();
transport.setProcessor(filterChainBuilder.build());
transport.bind(HOST, PORT);
transport.start();
System.in.read();
}
На странице проекта можно найти много примеров. По умолчанию, grizzly создаст столько тредов, сколько на машине имеется процессоров. Этот подход хорошо зарекомендовал себя на практике. На машине с 24 ядрами, наш NFS сервер с лёгкостью обслуживает порядка тысячи клиентов.
Сам проект активно развивается, и команда разработчиков быстро реагирует как на сообщения об ошибках, так и на посылаемые патчи и рекомендации.
oncrpc4j
Весь ONCRPC код оформлен в виде простой для использования отдельной библиотеки. Поддерживаются два типичных варианта интеграции — сервис, встроенный в приложение или сервис, инициализируемый как Spring bean.
Встроенное приложение
import org.dcache.xdr.RpcDispatchable;
import org.dcache.xdr.RpcCall;
import org.dcache.xdr.XdrVoid;
import org.dcache.xdr.OncRpcException;
public class Svcd {
private static final int DEFAULT_PORT = 1717;
private static final int PROG_NUMBER = 111017;
private static final int PROG_VERS = 1;
public static void main(String[] args) throws Exception {
RpcDispatchable dummy = new RpcDispatchable() {
@Override
public void dispatchOncRpcCall(RpcCall call)
throws OncRpcException, IOException {
call.reply(XdrVoid.XDR_VOID);
}
};
OncRpcSvc service = new OncRpcSvcBuilder()
.withTCP()
.withAutoPublish()
.withPort(DEFAULT_PORT)
.withSameThreadIoStrategy()
.build();
service.register(new OncRpcProgram(PROG_NUMBER, PROG_VERS), dummy);
service.start();
}
}
Интеграция со Spring
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="my-rpc-svc" class="me.mypackage.Svcd">
<description>My RPC service</description>
</bean>
<bean id="my-rpc" class="org.dcache.xdr.OncRpcProgram">
<description>My RPC program number</description>
<constructor-arg index="0" value="1110001" />
<constructor-arg index="1" value="1" />
</bean>
<bean id="rpcsvc-builder" class="org.dcache.xdr.OncRpcSvcFactoryBean">
<description>Onc RPC service builder</description>
<property name="port" value="1717"/>
<property name="useTCP" value="true"/>
</bean>
<bean id="oncrpcsvc" class="org.dcache.xdr.OncRpcSvc" init-method="start" destroy-method="stop">
<description>My RPC service</description>
<constructor-arg ref="rpcsvc-builder"/>
<property name="programs">
<map>
<entry key-ref="my-rpc" value-ref="my-rpc-svc"/>
</map>
</property>
</bean>
</beans>
Производительность
Как видно из графика, код на яве не только не медленнее написанного на 'C', но и обгоняет линуксовское ядро (из-за бага, который, надеюсь, уже починили).
To steal and contribute code
Код доступен на гитхабе под LGPL лицензией.
This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at fivefilters.org/content-only/faq.php#publishers. Five Filters recommends:
- Massacres That Matter - Part 1 - 'Responsibility To Protect' In Egypt, Libya And Syria
- Massacres That Matter - Part 2 - The Media Response On Egypt, Libya And Syria
- National demonstration: No attack on Syria - Saturday 31 August, 12 noon, Temple Place, London, UK
Комментариев нет:
Отправить комментарий