...

среда, 10 июля 2013 г.

Изучаем Storm Framework. Часть I

В 2011 году Twitter открыл, под лицензией Eclipse Public License, проект распределенных вычислений Storm. Storm был создан в компании BackType и перешел к Twitter после покупки.

Storm это система ориентированная на распределенную обработку больших потоков данных, аналогичная Apache Hadoop, но в реальном времени.


Ключевые особенности Storm:



  • Масштабируемость. Задачи обработки распределяются по узлам кластера и потокам на каждом узле.

  • Гарантированная защита от потери данных.

  • Простота развертывания и спровождения.

  • Восстановление после сбоев. Если какой либо из обработчиков отказывает, задачи переадресуются на другие обработчики.

  • Возможность написания компонентов не только на Java. Простой Multilang protocol с использованием JSON объектов. Есть готовые адаптеры для языков Python, Ruby и Fancy.




В первой части рассматриваются базовые понятия и основы создания приложения c использованием Storm версии 0.8.2.



Элементы Storm




Tuple

Элемент представления данных. По умолчанию может содержать Long, Integer, Short, Byte, String, Double, Float, Boolean и byte[] поля. Пользовательские типы используемые в Tuple должны быть сериализуемыми.

Stream

Последовательность из Tuple. Содержит схему именования полей в Tuple.


Spout

Поставщик данных для Stream. Получает данные из внешних источников, формирует из них Tuple и отправляет в Stream. Может отправлять Tuple в несколько разных Stream. Есть готовые для популярных систем обмена сообщениями: RabbitMQ / AMQP, Kestrel, JMS, Kafka.


Bolt

Обработчик данных. На вход поступают Tuple. На выход отправляет 0 или более Tuple.


Topology

Совокупность элементов с описанием их взаимосвязи. Аналог MapReduce job в Hadoop. В отличии от MapReduce job — не останавливается после исчерпания входного потока данных. Осуществляет транспорт Tuple между элементами Spout и Bolt. Может запускаться локально или загружаться в Storm кластер.


Пример использования




Задача




Есть поток данных о телефонных вызовах Cdr. На основании source номера определяется id клиента. На основании destination номера и id клиента определяется тариф и считается стоимость звонка. Каждый из этапов должен работать в несколько потоков.

Пример будет запускаться на локальной машине.

Реализация




Для начала просто распечатаем входные данные BasicApp.

Создаем новую Topology:



TopologyBuilder builder = new TopologyBuilder();




Добавляем Spout CdrSpout генерирующий входные данные:

builder.setSpout("CdrReader", new CdrSpout());




Добавляем Bolt с двумя потоками и указываем что на вход подается выходной поток CdrReader. shuffleGrouping означает что данные из CdrReader подаются на случайно выбранный PrintOutBolt.

builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader");




Конфигурируем и запускам локальный Storm кластер:

Config config = new Config(); // Конфигурация кластера по умолчанию
config.setDebug(false);

LocalCluster cluster = new LocalCluster(); // Создаем локальный Storm кластер
cluster.submitTopology("T1", config, builder.createTopology()); // Стартуем Topology
Thread.sleep(1000*10);
cluster.shutdown(); // Останавливаем кластер




На выходе получаем примерно следующее:

Скрытый текст


OUT>> [80]Cdr{callSource='78119990005', callDestination='8313610698077174239',
callTime=7631, clientId=0, price=0}
OUT>> [78]Cdr{callSource='78119990006', callDestination='2238707710336895468',
callTime=20738, clientId=0, price=0}
OUT>> [78]Cdr{callSource='78119990007', callDestination='579372726495390920',
callTime=31544, clientId=0, price=0}
OUT>> [80]Cdr{callSource='78119990006', callDestination='2010724447342634423',
callTime=10268, clientId=0, price=0}




Число в квадратных скобках — Thread Id, видно что обработка ведется параллельно.

Для дальнейших экспериментов нужно разобраться с распределением входных данных между несколькими обработчиками.

В примере выше был использован случайный подход. Но в реальном применении Bolt'ы наверняка будут использовать внешние справочные системы и базы данных. В этом случае желательно чтобы каждый Bolt обрабатывал свое подмножество входных данных. Тогда можно будет организовать эффективное кэширование данных из внешних систем.


Для этого в Storm предусмотрен интерфейс CustomStreamGrouping.

Добавим в проект CdrGrouper. Его задача — отправлять Tuple с одинаковыми source номерами на один и тот же Bolt. Для этого в CustomStreamGrouping предусмотрено два вызова:

prepare — вызывается перед первым использованием:



@Override
public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) {
tasks = new ArrayList<>(integers); // Запоминаем номера Bolts
}




и chooseTasks — где на вход подается список из Tuple, а возвращается список состоящий из номеров Bolt'ов для каждой позиции в списке Tuple:

@Override
public List<Integer> chooseTasks(int i, List<Object> objects) {
List<Integer> rvalue = new ArrayList<>(objects.size());
for(Object o: objects) {
Cdr cdr = (Cdr) o;

rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) %
tasks.size()));
}
return rvalue;
}




Заменим shuffleGrouping на CdrGrouper BasicGroupApp:

builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).
customGrouping("CdrReader", new CdrGrouper());




Запустим и убедимся что работает как задумано:

Скрытый текст


OUT>> [80]Cdr{callSource='78119990007', callDestination='3314931472251135073',
callTime=17632, clientId=0, price=0}
OUT>> [80]Cdr{callSource='78119990007', callDestination='4182885669941386786',
callTime=31533, clientId=0, price=0}






Далее в проект добавляем:

ClientIdBolt — определяет id клиента по source номеру.

ClientIdGrouper — Группирует по id клиента.

RaterBolt — занимается тарификацией.

CalcApp — окончательный вариант программы.

Если тема будет интересна, то в следующей части надеюсь рассказать о механизмах защиты от потери данных и запуске на реальном кластере. Код доступен на GitHub.


PS. Из песни конечно слова не выкинешь, но название обработчика данных «Bolt» несколько смущает :)


This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at fivefilters.org/content-only/faq.php#publishers. Five Filters recommends: 'You Say What You Like, Because They Like What You Say' - http://www.medialens.org/index.php/alerts/alert-archive/alerts-2013/731-you-say-what-you-like-because-they-like-what-you-say.html


Комментариев нет:

Отправить комментарий