Если в вкратце то в dataproc впечатлила простота запуска и настроек, на фоне Oracle и Cloudera. На первом этапе я играл с one node cluster на 8 vCpu, максимум какой позволяет совсем бесплатный триал. Если смотреть на простоту, то их технологии уже позволяют совсем индусу в 15 минут запустить кластер, загрузить сампл данные и подготовить отчетик обычным BI инструментом, без каких либо промежуточных субд под витрины. Каких-то глубоких знаний о хадупе уже совсем не требуется.
В принципе я увидел, что штука замечательная под быстрый старт и за вменяемые деньги можно запустить прототип, оценить какое железо под задачку понадобиться. Однако кластер покрупней, в десятки нод, явно будет кушать значительно больше, чем аренда + пара приглядывающих за кластером админа. Далеко не факт, что клауд будет выглядеть экономически выгодным. Первым этапом я попытался оценить совсем микро вариант с one node cluster 8 vCpu и 0.5 Тб сырых данных. В принципе тестов spark+hadoop на кластерах покрупней и так полно в инетах, но я планирую и чуть крупнее вариант потестировать позже.
Буквально за час я нагуглил скрипты создания хадуп кластера, настроек его firewall и настройки thrift server, который позволил по jdbc с домашней винды подключаться к spark sql. Еще часа два-три я потратил на оптимизацию дефолтных настроек spark и загрузку пары мелких таблиц размером около 10 Гб (размер датафайлов в оракле). Таблицы я запихнул целиком в память (alter table cache;) и с ними можно было работать с моей Windows машины из Dbeaver и Tableau (через spark sql коннектор).
По дефолту спарк использовал лишь 1 executor на 4 vCpu, я отредактировал spark-defaults.conf, поставил 3 executers, по 2 vCpu и долго не мог понять почему у меня реально в работе лишь 1 executer. Выяснилось что я не отредактировал память, двум другим yarn просто не мог выделить память. Я выставил 6.5 Гб на executer, после этого как ожидалось начали подниматься все три.
Далее я решил поиграть с чуть серьезней объемами и более приближенной к DWH задачей из тестов TPC-DS. Для начала я официальной тулзой сгенерировал таблицы со scale factor 500. Получилось что-то около 480 Гб сырых данных (текст с разделителем). Тест TPC-DS — типичное DWH, с фактами и дименсиями. Как сгенерировать данные сразу на google storage я не понял, пришлось генерировать на диск виртуалки и потом копировать на google storage. Гугл как я понял считает, что хадуп отлично работает с google storage и скорость там обещает чуть не лучше, чем если бы данные лежали внутри кластера на HDFS. При этом получается часть нагрузки уходит с HDFS на google storage.
Подключившись через Dbeaver я SQL командами переконвертировал текстовые файлы в таблички на базе parquet со snappy упаковкой. 480 Гб текстовых данных упаковались в 187 Гб parquet файлы. Занял процесс порядка двух часов, крупнейшая таблица занимала в тексте 188 Гб, 3 spark executers в parquet их превратили за 74 минуты, размер паркетника 66.8 Гб. На своем десктопе с примерно теми же 8 vCpu (i7-3770k) думаю «insert into table select * ...» в оракловую таблицу с 8к блоком занял бы сутки, а сколько бы занял датафайл даже страшно представить.
Далее я проверил работоспособность BI инструментов на таком конфиге, построил простенький отчет в Tableua
Что касается запросов то Query1 из теста TPC-DS
WITH customer_total_return
AS (SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
Sum(sr_return_amt) AS ctr_total_return
FROM store_returns,
date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2001
GROUP BY sr_customer_sk,
sr_store_sk)
SELECT c_customer_id
FROM customer_total_return ctr1,
store,
customer
WHERE ctr1.ctr_total_return > (SELECT Avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100;
выполнился за 1:08, Query2 с участием крупнейших таблиц (catalog_sales, web_sales)
WITH wscs
AS (SELECT sold_date_sk,
sales_price
FROM (SELECT ws_sold_date_sk sold_date_sk,
ws_ext_sales_price sales_price
FROM web_sales)
UNION ALL
(SELECT cs_sold_date_sk sold_date_sk,
cs_ext_sales_price sales_price
FROM catalog_sales)),
wswscs
AS (SELECT d_week_seq,
Sum(CASE
WHEN ( d_day_name = 'Sunday' ) THEN sales_price
ELSE NULL
END) sun_sales,
Sum(CASE
WHEN ( d_day_name = 'Monday' ) THEN sales_price
ELSE NULL
END) mon_sales,
Sum(CASE
WHEN ( d_day_name = 'Tuesday' ) THEN sales_price
ELSE NULL
END) tue_sales,
Sum(CASE
WHEN ( d_day_name = 'Wednesday' ) THEN sales_price
ELSE NULL
END) wed_sales,
Sum(CASE
WHEN ( d_day_name = 'Thursday' ) THEN sales_price
ELSE NULL
END) thu_sales,
Sum(CASE
WHEN ( d_day_name = 'Friday' ) THEN sales_price
ELSE NULL
END) fri_sales,
Sum(CASE
WHEN ( d_day_name = 'Saturday' ) THEN sales_price
ELSE NULL
END) sat_sales
FROM wscs,
date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq)
SELECT d_week_seq1,
Round(sun_sales1 / sun_sales2, 2),
Round(mon_sales1 / mon_sales2, 2),
Round(tue_sales1 / tue_sales2, 2),
Round(wed_sales1 / wed_sales2, 2),
Round(thu_sales1 / thu_sales2, 2),
Round(fri_sales1 / fri_sales2, 2),
Round(sat_sales1 / sat_sales2, 2)
FROM (SELECT wswscs.d_week_seq d_week_seq1,
sun_sales sun_sales1,
mon_sales mon_sales1,
tue_sales tue_sales1,
wed_sales wed_sales1,
thu_sales thu_sales1,
fri_sales fri_sales1,
sat_sales sat_sales1
FROM wswscs,
date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 1998) y,
(SELECT wswscs.d_week_seq d_week_seq2,
sun_sales sun_sales2,
mon_sales mon_sales2,
tue_sales tue_sales2,
wed_sales wed_sales2,
thu_sales thu_sales2,
fri_sales fri_sales2,
sat_sales sat_sales2
FROM wswscs,
date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 1998 + 1) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1;
выполнился за 4:33 минуты, Query3 за 3.6, Query4 за 32 минуты.
Если кому-то интересны настройки, под катом мои заметки по созданию кластера. В принципе там лишь пара gcloud команд и настройка HIVE_SERVER2_THRIFT_PORT.
gcloud dataproc --region europe-north1 clusters create test1 \
--subnet default \
--bucket tape1 \
--zone europe-north1-a \
--single-node \
--master-machine-type n1-highmem-8 \
--master-boot-disk-size 500 \
--image-version 1.3 \
--initialization-actions gs://dataproc-initialization-actions/hue/hue.sh \
--initialization-actions gs://dataproc-initialization-actions/zeppelin/zeppelin.sh \
--initialization-actions gs://dataproc-initialization-actions/hive-hcatalog/hive-hcatalog.sh \
--project 123
вариант на 3 ноды:
gcloud dataproc --region europe-north1 clusters \
create cluster-test1 --bucket tape1 \
--subnet default --zone europe-north1-a \
--master-machine-type n1-standard-1 \
--master-boot-disk-size 10 --num-workers 2 \
--worker-machine-type n1-standard-1 --worker-boot-disk-size 10 \
--initialization-actions gs://dataproc-initialization-actions/hue/hue.sh \
--initialization-actions gs://dataproc-initialization-actions/zeppelin/zeppelin.sh \
--initialization-actions gs://dataproc-initialization-actions/hive-hcatalog/hive-hcatalog.sh \
--project 123
gcloud compute --project=123 \
firewall-rules create allow-dataproc \
--direction=INGRESS --priority=1000 --network=default \
--action=ALLOW --rules=tcp:8088,tcp:50070,tcp:8080,tcp:10010,tcp:10000 \
--source-ranges=xxx.xxx.xxx.xxx/32 --target-tags=dataproc
at master node:
sudo su — vi /usr/lib/spark/conf/spark-env.sh
change: export HIVE_SERVER2_THRIFT_PORT=10010
sudo -u spark /usr/lib/spark/sbin/start-thriftserver.sh
To be continued…
Комментариев нет:
Отправить комментарий