Активный объект — шаблон проектирования, который отделяет выполнение метода от его вызова. Шаблон позволяет повысить параллелизм и упростить синхронный доступ к объекту, который живет в собственном потоке.
Элементами шаблона являются Proxy (объект-заместитель), Method Request (запрос), Activation Queue (очередь), Scheduler (планировщик), Servant (обслуживающий объект) и Future.
Объект-заместитель предоставляет интерфейс, позволяющий клиентам вызывать публично доступные методы активного объекта использую стандартные, строго типизированные средства языка программирования вместо передачи слабо типизированных сообщений между потоками. Когда клиент вызывает метод, определяемый объектом-заместителем, создается запрос, который помещается в очередь. Все это происходит в клиентском потоке.
Запрос используется для передачи контекстной информации о вызове определенного метода, такой как параметры вызова, от объекта-заместителя планировщику, работающему в отдельном потоке. Для каждому метода активного объекта, предоставляемого объектом-заместителем, определяется конкретный подкласс абстрактного класса запроса. Экземпляры этих классов создаются объектом-заместителем, когда вызываются его методы, и содержат контекстную информацию, необходимую для того, чтобы выполнить эти методы и вернуть результаты обратно клиентам.
Очередь обеспечивает ограниченный буфер запросов, созданных объектом-заместителем и ожидающих выполнения. Очередь отделяет клиентский поток от потока обслуживающего объекта, поэтому два потока могут работать параллельно.
Планировщик работает в собственном потоке отличном от клиентских потоков и управляет очередью запросов, ожидающих выполнения. Планировщик решает, какой запрос извлечь из очереди следующим, чтобы выполнить его на обслуживающем объекте, реализующем соответствующий метод.
Обслуживающий объект определяет поведение и состояние активного объекта и реализует методы, определенные в объекте-заместителе и соответствующих запросах. Метод обслуживающего объекта вызывается, когда соответствующий запрос выполняется планировщиком, следовательно, обслуживающий объект выполняется в потоке планировщика.
Future позволяет клиенту получить результат вызова метода, после того как обслуживающий объект завершит выполнение этого метода. Когда клиент вызывает метод, Future возвращается клиенту сразу же. Для получения результата вызова клиент опрашивает или блокируется на Future до тех пор, пока результат не будет доступен.
В качестве примера реализации шаблона Активного объекта можно привести следующий код на Java.
public class NormalClass {
private double val = 0.0;
public void doSomething() {
val = 1.0;
}
public void doSomethingElse() {
val = 2.0;
}
}
public class MyTask {
private double val;
private final BlockingQueue<Runnable> dispatchQueue = new LinkedBlockingQueue<>();
public MyTask() {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
dispatchQueue.take().run();
} catch (InterruptedException e) {
// okay, just terminate the dispatcher
}
}
}
}).start();
}
public void doSomething() throws InterruptedException {
dispatchQueue.put(new Runnable() {
@Override
public void run() {
val = 1.0;
}
});
}
public void doSomethingElse() throws InterruptedException {
dispatchQueue.put(new Runnable() {
@Override
public void run() {
val = 2.0;
}
});
}
}
В данном примере класс NormalClass преобразован в класс MyTask, который реализует активный объект, объединяя функциональность объекта-заместителя и обслуживающего объекта. Код методов оригинального класса перенесен в классы запросов, реализующих интерфейс Runnable, экземпляры которых помещаются в очередь при вызове методов активного объекта. Класс потока, играющий роль планировщика, извлекает запросы из очереди и выполняет их.
Один из недостатков данной реализации — необходимость переписывать классы, добавляя однотипный служебный код в конструкторы и методы, для того чтобы «превратить» эти классы в активные объекты. Другим недостатком является то, что каждому активному объекту соответствует отдельный поток Java. При росте числа активных объектов в приложении потоки, соответствующие этим объектам, будут иметь значительные суммарные накладные расходы на занимаемую память и процессорное время, связанное с их созданием и переключением контекстов.
Примером другой реализации шаблона является модуль Typed Actors (типизированные акторы) из инструментария Akka. Двумя составляющими типизированных акторов являются публичный интерфейс и внутренняя реализация. Типизированные акторы обеспечивают реализацию публичного интерфейса, которая делегирует вызовы методов внутренней реализации для асинхронного выполнения.
public interface Squarer {
void squareDontCare(int i); //fire-forget
Future<Integer> square(int i); //non-blocking send-request-reply
Option<Integer> squareNowPlease(int i); //blocking send-request-reply
int squareNow(int i); //blocking send-request-reply
}
public class SquarerImpl implements Squarer {
@Override
public void squareDontCare(int i) {
int sq = i * i; //Nobody cares :(
}
@Override
public Future<Integer> square(int i) {
return Futures.successful(i * i);
}
@Override
public Option<Integer> squareNowPlease(int i) {
return Option.some(i * i);
}
@Override
public int squareNow(int i) {
return i * i;
}
}
public class Main {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create();
Squarer mySquarer = TypedActor.get(system).typedActorOf(new TypedProps<>(Squarer.class, SquarerImpl.class));
mySquarer.squareDontCare(10);
Future<Integer> fSquare = mySquarer.square(10);
Option<Integer> oSquare = mySquarer.squareNowPlease(10);
int iSquare = mySquarer.squareNow(10);
assert Await.result(fSquare, Duration.create(3, TimeUnit.SECONDS)).intValue() == 100;
assert oSquare.get().intValue() == 100;
assert iSquare == 100;
TypedActor.get(system).stop(mySquarer);
system.shutdown();
}
}
Модуль скрывает детали реализации шаблона, но в то же время создание и уничтожение типизированных акторов осуществляется с помощью специальных средств библиотеки. Другой особенностью является то, что, если типизированному актору необходимо передать вовне ссылку на самого себя, вместо this требуется использовать ссылку на объект-заместитель, получаемую через вызов TypedActor.self().
Хотя типизированные акторы не имеют таких проблем с потоками как в предыдущей реализации, благодаря тому, что в их основе лежит система акторов Akka, использование блокирующих операций во внутренних реализациях ограничено, даже если это вызовы методов других типизированных акторов. Это обязывает проектировать публичные интерфейсы таким образом, чтобы их методы не были блокирующими. Это в свою очередь может, во-первых, нарушить совместимость с существующими клиентами, во-вторых, усложнить код из-за использования обратных вызовов.
В данной статье предлагается другой подход к реализации на Java шаблона Активный объект, основанный на использовании аспектно-ориентированного расширения Java AspectJ и проекта Zephyr, который добавляет в Java легковесные потоки. Цель описываемого подхода заключается в том, чтобы обойти недостатки существующих реализаций данного шаблона и сделать новую реализацию более прозрачной.
Пример
Проблема обедающих философов является классическим примером, используемым при разработке параллельных алгоритмов для иллюстрации проблем синхронизации и способов их решения.
Пять безмолвных философов сидят вокруг круглого стола, перед каждым философом стоит тарелка спагетти. Вилки лежат на столе между каждой парой ближайших философов.
Каждый философ может либо есть, либо размышлять. Приём пищи не ограничен количеством оставшихся спагетти — подразумевается бесконечный запас. Тем не менее, философ может есть только тогда, когда держит две вилки — взятую справа и слева.
Каждый философ может взять ближайшую вилку (если она доступна), или положить — если он уже держит её. Взятие каждой вилки и возвращение её на стол являются раздельными действиями, которые должны выполняться одно за другим.
Суть проблемы заключается в том, чтобы разработать модель поведения (параллельный алгоритм), при котором ни один из философов не будет голодать, то есть будет вечно чередовать приём пищи и размышления.
Данный пример позволяет продемонстрировать применение описываемого подхода.
@Active
public class Philosopher {
private final String name;
private final Fork leftFork;
private final Fork rightFork;
public Philosopher(String name, Fork leftFork, Fork rightFork) {
this.name = name;
this.leftFork = leftFork;
this.rightFork = rightFork;
}
@Oneway
public void start() {
while (true) {
Fork.Handle left = leftFork.take();
if (left == null) {
continue;
}
Fork.Handle right = rightFork.take();
if (right == null) {
left.put();
continue;
}
System.out.println(name + " starts to eat");
sleep(5000);
left.put();
right.put();
System.out.println(name + " starts to think");
sleep(5000);
}
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
}
Класс Philosopher, моделирующий философа, обозначен аннотацией Active и является активным объектом. Класс имеет ссылки на левую и правую вилки, поля leftFork и rightFork соответственно. Метод start, реализующий поведение философа, обозначен аннотацией Oneway, которая означает, что такой метод ничего не возвращает, и клиент, вызвавший этот метод, получает управление сразу же после вызова.
В начале цикла каждый философ пытается завладеть левой вилкой, вызывая метод take класса Fork. Если попытка неудачная, то возвращается null, и философ переходит к началу цикла. В случае успеха возвращается ссылка на экземпляр класса Fork.Handle, и философ переходит к попытке завладеть правой вилкой. В случае неудачи философ кладет кладет левую вилку, вызывая метод put класса Fork.Handle, и возвращается к началу цикла, иначе он переходит в режим ожидания. По окончании времени ожидания философ кладет обе вилки и опять ждет, после чего возвращается к началу цикла.
@Active
public class Fork {
private boolean taken;
public Handle take() {
if (taken) {
return null;
}
taken = true;
return new Handle();
}
@Include
private void put() {
taken = false;
}
@Active
public class Handle {
private boolean used;
private Handle() {
}
public void put() {
if (used) {
throw new IllegalStateException();
}
used = true;
Fork.this.put();
}
}
}
Класс Fork — это активный объект, моделирующий вилку. Поле taken указывает на то, свободна ли вилка или занята. Если вилка свободна, то есть значение поля taken равно false, метод take, реализующий взятие вилки, устанавливает это поле в true и возвращает экземпляр класса Fork.Handle, иначе возвращает null.
Класс Fork.Handle также является активным объектом, но в отличие от классов Philosopher и Fork создается динамически при вызове метода take. Освобождение вилки реализуется методом put данного класса, который отмечает объект Fork.Handle как использованный, устанавливая поле used в true, и делегирует вызов методу put класса Fork.
Аннотация Include, которой обозначен приватный метод put класса Fork, указывает на то, что этот метод должен выполняться в потоке активного объекта. Использование данной аннотации необходимо в связи с тем, что в классах, реализующих активные объекты, по умолчанию только публичные методы выполняются в потоке активного объекта.
public class Main {
public static void main(String[] args) throws InterruptedException {
Fork fork1 = new Fork();
Fork fork2 = new Fork();
Fork fork3 = new Fork();
Fork fork4 = new Fork();
Fork fork5 = new Fork();
Philosopher philosopher1 = new Philosopher("Descartes", fork1, fork2);
Philosopher philosopher2 = new Philosopher("Nietzsche", fork2, fork3);
Philosopher philosopher3 = new Philosopher("Kant", fork3, fork4);
Philosopher philosopher4 = new Philosopher("Hume", fork4, fork5);
Philosopher philosopher5 = new Philosopher("Plato", fork5, fork1);
philosopher1.start();
philosopher2.start();
philosopher3.start();
philosopher4.start();
philosopher5.start();
Thread.sleep(60000);
}
}
Из приведенного примера видно, что создание активных объектов не требует использования специальных средств и выполняется с помощью оператора new. При вызове методов активного объекта не используется ссылка на внешний объект-заместитель, и вызовы осуществляются по this. Также реализация шаблона подразумевает, что динамическое создание активных объектов не вызывает утечек памяти, так как уничтожение этих объектов, включая остановку потоков, происходит автоматически.
Реализация
Использование AspectJ дает нам возможность обойти такие недостатки существующих реализаций шаблона Активный объект, как необходимость в использовании объекта-заместителя и ссылок на него, специальных средств создания и уничтожения активных объектов, а также изменения интерфейсов существующих классов. Кроме того AspectJ позволяет сделать реализацию шаблона более прозрачной, скрыв детали в аспектах.
Начнем реализацию с добавления в аспект очереди запросов. Для этого нам понадобится объявить inter-type поле, содержащее ссылку на очередь. Делается это с помощью конструкции declare parents. Так как мы собираемся использовать аннотацию для обозначения активного объекта, то применим соответствующий шаблон типа для declare parents.
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Active {
}
public aspect ActiveObjectAspect {
public interface ActiveObject {
}
declare parents: @Active * implements ActiveObject;
private BlockingQueue<Runnable> ActiveObject.queue = new LinkedBlockingQueue<>();
}
В качестве планировщика будем использовать класс потока Java, в который передадим ссылку на очередь
final class ActiveObjectThread extends Thread {
private final BlockingQueue<? extends Runnable> queue;
ActiveObjectThread(BlockingQueue<? extends Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
Runnable task;
try {
task = queue.take();
} catch (InterruptedException ignored) {
continue;
}
task.run();
}
}
}
Далее необходимо стартовать поток, и делать мы это будем в конструкторе активного объекта. Для этого добавим соответствующий advice.
public aspect ActiveObjectAspect {
...
after(ActiveObject obj) returning: initialization((ActiveObject+ && !ActiveObject).new(..)) && this(obj) {
new ActiveObjectThread(obj.queue).start();
}
}
Методы активных объектов в нашей реализации принадлежат двум типам: обычные и one-way методы. Обычные методы, как правило, имеют возвращаемое значение, и при их вызове клиент блокируется до завершения выполнения метода. One-way методы не имеют возвращаемого значения, и клиент получает управление сразу же после вызова, а метод продолжает выполняться асинхронно.
Общая идея реализации advice методов активного объекта заключается в применении так называемого шаблона worker object, суть которого состоит в переносе вызова proceed в анонимный класс, что позволяет выполнять метод асинхронно.
Ниже приведена реализация advice для one-way метода. Анонимный класс, реализующий интерфейс Runnable, играет роль класса запроса.
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Oneway {
}
public aspect ActiveObjectAspect {
...
void around(final ActiveObject obj): execution(@Oneway void ActiveObject+.*(..)) && this(obj) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
proceed(obj);
} catch (Throwable e) {
e.printStackTrace();
}
}
};
boolean interrupted = false;
try {
while (true) {
try {
obj.queue.put(task);
break;
} catch (InterruptedException ignored) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
Все исключения, возникающие в one-way методах, просто логируются, так как нет возможности вернуть их клиенту.
Обычные методы активных объектов в отличие от one-way возвращают значения, поэтому в реализации advice для таких методов вместо Runnable используется класс FutureTask и анонимный класс, реализующий интерфейс Callable.
public aspect ActiveObjectAspect {
...
Object around(final ActiveObject obj): execution(!@Oneway * ActiveObject+.*(..)) && this(obj) {
RunnableFuture<?> task = new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return proceed(obj);
}
});
boolean interrupted = false;
try {
while (true) {
try {
obj.queue.put(task);
break;
} catch (InterruptedException ignored) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
try {
interrupted = false;
try {
while (true) {
try {
return task.get();
} catch (InterruptedException ignored) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} catch (ExecutionException e) {
throw e.getCause(); // ошибка компиляции
}
}
}
В результате выполнения методов могут возникать исключения, которые будут выброшены при вызове метода get класса FutureTask. Так как в общем случае это проверяемые исключения, которые объявлены в методе активного объекта, но не в advice, выбрасывание таких исключений из advice приведет к ошибке компиляции. Для того чтобы обойти данное ограничение, можно использовать особенность реализации generics в Java, позволяющую выбрасывать проверяемые исключения из методов, в которых эти исключения не объявлены.
public aspect ActiveObjectAspect {
...
Object around(final ActiveObject obj): execution(!@Oneway * ActiveObject+.*(..)) && this(obj) {
...
try {
...
} catch (ExecutionException e) {
throw ActiveObjectAspect.<RuntimeException>throwException(e.getCause());
}
}
@SuppressWarnings("unchecked")
private static <E extends Throwable> E throwException(Throwable exception) throws E {
throw (E) exception;
}
}
Активные объекты как и любые объекты Java автоматически удаляются из памяти сборщиком мусора, когда становятся недостижимыми. Но даже после сборки активного объекта его поток остается жить, так как работающие потоки не считаются недостижимыми, и, соответственно, не удаляются сборщиком мусора.
Поскольку мы не хотим останавливать поток активного объекта вручную, необходимо каким-то образом связать остановку потока с удалением активного объекта. Один из способов сделать это — поместить код остановки потока в метод finalize активного объекта, правда есть серьезные аргументы против использования finalize, связанные с производительностью. Поэтому мы будем использовать фантомные ссылки, которые предоставляют более гибкий и безопасный способ очистки ресурсов.
public interface Disposable {
void dispose();
}
public final class Disposer {
private final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
private final Map<Object, Disposable> disposables = new ConcurrentHashMap<>();
public Disposer() {
DisposerThread thread = new DisposerThread(referenceQueue, disposables);
thread.setName(Disposer.class.getSimpleName());
thread.setDaemon(true);
thread.start();
}
public void register(Object obj, Disposable disposable) {
Objects.requireNonNull(obj);
Objects.requireNonNull(disposable);
disposables.put(new PhantomReference<>(obj, referenceQueue), disposable);
}
private static final class DisposerThread extends Thread {
private final ReferenceQueue<?> referenceQueue;
private final Map<?, ? extends Disposable> disposables;
DisposerThread(ReferenceQueue<?> referenceQueue, Map<?, ? extends Disposable> disposables) {
this.referenceQueue = referenceQueue;
this.disposables = disposables;
}
@Override
public void run() {
while (true) {
Reference<?> reference;
try {
reference = referenceQueue.remove();
} catch (InterruptedException ignored) {
continue;
}
Disposable disposable = disposables.remove(reference);
try {
disposable.dispose();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
}
Класс Disposer позволяет регистрировать ссылку на объект, в результате уничтожения которого будет вызван метод dispose интерфейса Disposable. В нашем случает регистрируемым объектом является активный объект, а ресурсом, реализующим метод dispose, — поток активного объекта.
final class ActiveObjectThread extends Thread implements Disposable {
...
volatile boolean running;
@Override
public void run() {
while (running) {
...
}
}
@Override
public void dispose() {
running = false;
interrupt();
}
}
Регистрацию ссылка на активный объект поместим в advice конструктора.
public aspect ActiveObjectAspect {
...
private static final Disposer disposer = new Disposer();
after(ActiveObject obj) returning: initialization((ActiveObject+ && !ActiveObject).new(..)) && this(obj) {
...
disposer.register(obj, thread);
thread.start();
}
...
}
В связи с тем, что активный объект может иметь поля, которые не являются final или volatile, а конструктор и методы активного объекта выполняются в разных потоках, необходимо обеспечить видимость значений этих полей потоке активного объекта. Для этого можно использовать поле running класса ActiveObjectThread, сделав запись в это поле в advice конструктора. Это гарантирует, что значения полей активного объекта, заданные в конструкторе, будут видны в потоке активного объекта.
public aspect ActiveObjectAspect {
...
after(ActiveObject obj) returning: initialization((ActiveObject+ && !ActiveObject).new(..)) && this(obj) {
...
thread.running = true;
thread.start();
}
...
}
В результате применения аспектно-ориентированного подхода мы получили реализацию шаблона Активный объект, лишенную многих недостатков существующих реализаций. Нерешенной остается проблема использование потоков Java, которую мы устраним с помощью проекта Zephyr, реализующего подключаемые потоки.
Один из модулей Zephyr реализует легковесные потоки, которые можно подключить к приложению, не меняя исходного кода. Необходимо только изменить процесс сборки приложения.
Добавим в исходный POM-файл несколько плагинов.
<project xmlns="http://ift.tt/IH78KX" xmlns:xsi="http://ift.tt/ra1lAU"
xsi:schemaLocation="http://ift.tt/IH78KX http://ift.tt/VE5zRx">
<modelVersion>4.0.0</modelVersion>
...
<build>
<plugins>
...
<plugin>
<groupId>org.jvnet.zephyr.maven</groupId>
<artifactId>remapping-maven-plugin</artifactId>
<configuration>
<outputDirectory>${project.build.directory}/remapping-classes</outputDirectory>
<testOutputDirectory>${project.build.directory}/remapping-test-classes</testOutputDirectory>
<mappingEntries>
<mappingEntry>
<oldName>java/lang/Thread</oldName>
<newName>org/jvnet/zephyr/jcl/java/lang/Thread</newName>
</mappingEntry>
<mappingEntry>
<oldName>java/util/concurrent/FutureTask</oldName>
<newName>org/jvnet/zephyr/jcl/java/util/concurrent/FutureTask</newName>
</mappingEntry>
<mappingEntry>
<oldName>java/util/concurrent/LinkedBlockingQueue</oldName>
<newName>org/jvnet/zephyr/jcl/java/util/concurrent/LinkedBlockingQueue</newName>
</mappingEntry>
</mappingEntries>
</configuration>
<executions>
<execution>
<goals>
<goal>remapping</goal>
<goal>testRemapping</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jvnet.zephyr.maven</groupId>
<artifactId>javaflow-maven-plugin</artifactId>
<configuration>
<classesDirectory>${project.build.directory}/remapping-classes</classesDirectory>
<testClassesDirectory>${project.build.directory}/remapping-test-classes</testClassesDirectory>
</configuration>
<dependencies>
<dependency>
<groupId>org.jvnet.zephyr.thread</groupId>
<artifactId>thread-api</artifactId>
<version>${zephyr.version}</version>
</dependency>
<dependency>
<groupId>org.jvnet.zephyr.jcl</groupId>
<artifactId>jcl-jdk7</artifactId>
<version>${zephyr.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
<goal>javaflow</goal>
<goal>testJavaflow</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>javaflow</id>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>javaflow</classifier>
<classesDirectory>${project.build.directory}/javaflow-classes</classesDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Плагин remapping-maven-plugin переназначает используемые в реализации классы Thread, FutureTask и LinkedBlockingQueue на соответствующие классы с поддержкой подключаемых потоков. Плагин javaflow-maven-plugin добавляет в методы поддержку продолжений из проекта Commons Javaflow, которая требуется модулем Zephyr, реализующим легковесные потоки.
Теперь при запуске приложения достаточно добавить в classpath соответствующие библиотеки, для того чтобы активные объекты начали использовать легковесные потоки вместо обычных.
Полную версию реализации шаблона Активный объект, поддерживающую включение и исключение методов, наследование классов активных объектов, различные типы очередей и таймауты, а так же пример использования, можно найти здесь и здесь.
This entry passed through the Full-Text RSS service - if this is your content and you're reading it on someone else's site, please read the FAQ at http://ift.tt/jcXqJW.
Комментариев нет:
Отправить комментарий