Для масштабирования коллектора мы используем многопроцессный подход, с одним управляющим и несколькими рабочими процессами, межпроцессное взаимодействие происходит только между рабочим и управляющим процессом.
Рабочие процессы выполняют одинаковые задачи — сбор, обработка и запись логов с серверов PostgreSQL. При этом сбор и запись — это по сути IO-задачи, в которых nodejs очень хороша. А вот обработка и парсинг планов запросов — это довольно CPU-емкая задача, блокирующая event-loop. Поэтому такие задачи лучше выносить в отдельный воркер или пул воркеров, передавая им данные на обработку посредством обмена IPC-сообщениями.
Раньше, для задачи обработки и парсинга планов запросов мы использовали именно такой подход. Но у него есть недостаток — большие объемы передаваемых данных по IPC могут привести к значительному увеличению затрат на сериализацию в JSON и обратно.
Например при передаче по IPC буфера, в которой содержится строка 'test' происходит передача строки:
'{"type":"Buffer","data":[116,101,115,116]}'
При большом количестве передаваемых данных накладные расходы могут стать такими:
Решением для нас стало использование worker_threads, появившихся в Node.JS 10.5.0, работающих в рамках одного процесса и позволяющих использовать новые методы коммуникации между потоками.
Архитектура изменилась:
А вместе с ней и подходы к мониторингу. Например, использование CPU внутри worker_thread традиционным способом измерить не получится.
Т.е. раньше, для каждого процесса-воркера, мы измеряли CPU-usage с помощью process.cpuUsage() и process.hrtime() примерно так:
let startCpuUsage = process.cpuUsage();
let startTime = process.hrtime();
let n = 1000;
while (n--) Math.sin(n);
let {user, system} = process.cpuUsage(startCpuUsage); // время в микросекундах
let time = process.hrtime(startTime); // наносекунды
let cpuUsage = 100 * 1000 * (user + system) / (time[0] * 1e9 + time[1]);
Но для процесса с worker_threads вызов process.cpuUsage() выдает процессорное время для всего процесса в целом, суммируя все его потоки. И такой же результат мы получим, если вызовем process.cpuUsage() изнутри worker_thread.
Почему так происходит?
Дело в то что process.cpuUsage() использует вызов uv_getrusage, а тот в ОС Linux выполняет системный вызов getrusage с параметром RUSAGE_SELF, т.е. возвращает статистику для вызывающего процесса как сумму по всем его потокам, при этом не важно из какого потока мы делаем этот вызов — во всех потоках он будет возвращать одинаковые значения.
Как же получить CPU-usage для worker_threads и почему в Node.JS нет встроенных методов для профилирования CPU worker_threrads?
Здесь есть ответ разработчика worker_threads.
Предложено два варианта — либо с помощью системного вызова gettid() получить tid для worker_thread и просматривать данные в /proc/${tid}, либо использовать getrusage() с параметром RUSAGE_THREAD, позволяющим получать статистику только для вызывающего потока.
Кстати, таким же образом можно получать метрики использования CPU и для основного потока процесса, без учета всех дополнительных потоков и worker_threads.
Итак, разобравшись с этим вопросом, мы стали искать готовые модули для мониторинга worker_threads, и не нашли… Поэтому сделали свой , заодно добавив в него выдачу всех остальных метрик для мониторинга Node.JS приложения. Серверные метрики мы уже получаем с помощью своей системы сбора метрик .
Мониторинг CPU
Для анализа использования CPU мы берем метрики от воркеров и worker_threads, а также метрики общей загруженности CPU и в разрезе ядер:
- Для воркеров в целом:
- Для основных потоков воркеров:
- Для worker_threads (по первым из пула, но полезным будет и суммарный в разрезе воркера):
- Общая загрузка CPU на сервере:
С метриками CPU разобрались, а что насчет профайлинга worker_threads?
Проверим, запустив этот небольшой тест с параметром node --prof
const { Worker, isMainThread} = require('worker_threads');
const crypto = require('crypto');
function mainTest() {
let cnt = 10000;
while (cnt--) crypto.randomBytes(cnt);
}
function threadTest() {
let cnt = 10000;
while (cnt--) crypto.randomBytes(cnt);
}
if (isMainThread) {
let worker = new Worker(__filename);
setInterval(mainTest, 1000);
} else {
setInterval(threadTest, 1000);
}
В результате получили два isolate-* файла, для основного потока и для worker_thread.
Далее, с помощью node --prof-process <isolate_file> можем посмотреть нужный профайл.
Кстати, с опцией --no-logfile-per-isolate вместо нескольких isolate* файлов будет один — v8.log с суммарным результатом по всем потокам, включая основной.
И еще — используя опцию node --inspect или послав сигнал SIGUSR1 работающему процессу с целью снять CPU профайл в Chrome DevTools, мы увидим данные только по основному потоку.
Использование памяти
Также как и для CPU, снимая профайл в Chrome DevTools мы получим Heap snapshot только основного потока.
К счастью, с версии node 12.17.0 появилась возможность получить heap snapshot прямо из кода worker_threads с помощью вызова worker.getHeapSnapshot(), а с версии 11.13.0 также для основного потока вызовом v8.getHeapSnapshot()
const { Worker, isMainThread } = require('worker_threads');
const v8 = require('v8');
const fs = require('fs');
if (isMainThread) {
let worker = new Worker(__filename);
let mainArray = [];
function mainTest() {
let cnt = 100;
while (cnt--) mainArray.push(`main-msg-${cnt}`);
}
process.on('SIGUSR2', () => {
v8.getHeapSnapshot().pipe(fs.createWriteStream(`process_${process.pid}.heapsnapshot`));
worker.getHeapSnapshot().then((heapsnapshot) => {
heapsnapshot.pipe(fs.createWriteStream(`process_${process.pid}_wt_${worker.threadId}.heapsnapshot`));
})
});
setInterval(mainTest, 1000);
} else {
let threadArray = [];
function threadTest() {
let cnt = 100;
while (cnt--) threadArray.push(`thread-msg-${cnt}`);
}
setInterval(threadTest, 1000);
}
Послав сигнал SIGUSR2 процессу, мы получим два heapsnapshot, которые затем можно проанализировать в Chrome DevTools:
- Основной процесс:
- worker_thread:
Какие метрики памяти интересны для анализа?
Мы используем те, что выдает process.memoryUsage() — rss, heapTotal, heapUsed, external.
И также v8.getHeapSpaceStatistics(), с его помощью можно получить данные по сегментам Heap — new_space, old_space, code_space, large_object_space.
rss всегда выдается для всего процесса, а остальные метрики — в рамках вызывающего контекста.
- Суммарный по воркерам:
- По воркеру:
- По worker_threads:
Сборка мусора
Т.к. в каждом worker_thread запускается свой инстанс Node.JS с v8/libuv, то и GC у каждого тоже свой и мониторить их надо по отдельности.
Для мониторинга GC нам нужно получать данные об общей продолжительности сборки мусора, а также количество запусков и время выполнения каждого цикла.
Уже довольно давно, с версии 8.5.0, в Node.JS появился механизм PerformanceObserver, позволяющий кроме всего прочего получить всю необходимую информацию по циклам GC.
const { PerformanceObserver, constants } = require('perf_hooks');
let stats = {};
let gcObserver = new PerformanceObserver((list) => {
list
.getEntries()
.map(({kind, duration}) => {
stats['gc.time'] += duration;
switch (kind) {
case constants.NODE_PERFORMANCE_GC_MINOR:
stats['gc.Scavenge.count']++;
stats['gc.Scavenge.time'] += duration;
break;
case constants.NODE_PERFORMANCE_GC_MAJOR:
stats['gc.MarkSweepCompact.count']++;
stats['gc.MarkSweepCompact.time'] += duration;
break;
case constants.NODE_PERFORMANCE_GC_INCREMENTAL:
stats['gc.IncrementalMarking.count']++;
stats['gc.IncrementalMarking.time'] += duration;
break;
case constants.NODE_PERFORMANCE_GC_WEAKCB:
stats['gc.ProcessWeakCallbacks.count']++;
stats['gc.ProcessWeakCallbacks.time'] += duration;
break;
}
})
});
function resetStats() {
Object.assign(stats, {
'gc.time': 0,
'gc.Scavenge.count': 0,
'gc.Scavenge.time': 0,
'gc.MarkSweepCompact.count': 0,
'gc.MarkSweepCompact.time': 0,
'gc.IncrementalMarking.count': 0,
'gc.IncrementalMarking.time': 0,
'gc.ProcessWeakCallbacks.count': 0,
'gc.ProcessWeakCallbacks.time': 0,
});
}
resetStats();
gcObserver.observe({entryTypes: ['gc'], buffered: true});
function triggerScavenge() {
let arr = [];
for (let i = 0; i < 5000; i++) {
arr.push({});
}
setTimeout(triggerScavenge, 50);
}
let ds = [];
function triggerMarkCompact() {
for (let i = 0; i < 10000; i++) {
ds.push(new ArrayBuffer(1024));
}
if (ds.length > 40000) {
ds = [];
}
setTimeout(triggerMarkCompact, 200);
}
triggerScavenge();
triggerMarkCompact();
setInterval(() => {
console.log(stats);
resetStats();
}, 5000);
Результат:
{
'gc.time': 158.716144,
'gc.Scavenge.count': 11,
'gc.Scavenge.time': 135.690545,
'gc.MarkSweepCompact.count': 2,
'gc.MarkSweepCompact.time': 22.96941,
'gc.IncrementalMarking.count': 2,
'gc.IncrementalMarking.time': 0.056189,
'gc.ProcessWeakCallbacks.count': 0,
'gc.ProcessWeakCallbacks.time': 0
}
Этот метод работает как в основном потоке так и в worker_threads, для нашего коллектора мы получаем графики с метриками за секунду:
- По воркерам
- По worker_threads
- Общее время GC в разрезе воркеров
- Общее время GC в разрезе worker_threads
Event-loop latency
Для мониторинга задержек event-loop удобно использовать появившийся в версии 11.10.0 monitorEventLoopDelay — тут можно получить не только среднее и предельные значения, но и различные перцентили.
Мы используем max, min, mean, и percentile(99):
- Суммарный по всем воркерам
- Суммарный по worker_threads
- По воркеру
- По worker_thread
Мониторинг пула worker_threads
Системные показатели работы пула уже приведены выше, а здесь поговорим о метриках производительности многопоточного приложения.
При старте каждый воркер коллектора запускает пул с одним worker_thread, который обрабатывает очередь поступающих планов запросов.
Дополнительные worker_thread запускаются при увеличении размера очереди и при нахождении задач в очереди дольше определенного времени. Также они автоматически завершаются после периода неактивности.
const SPAWN_LAG = 2000;
this._queue = [];
assignTask(msg) {
if (this.mainExplainer.ready === true) {
this.mainExplainer.ready = false;
this.mainExplainer.sendMessage(msg);
} else if (this._idleExplainers.length > 0) {
let explainer = this._idleExplainers.pop();
clearTimeout(explainer.timeoutIdle);
explainer.sendMessage(msg);
} else {
this._checkAndStartNew(msg);
}
}
_checkAndStartNew(msg) {
let ts = Date.now();
let q = this._queue;
if (msg && process[hasFreeRAM]) q.push({msg, ts});
if (this._canCreateExplainer && q.length > this._workersCnt() && q[0].ts + SPAWN_LAG < ts) {
this._createExplainer();
}
}
explainer.on('explainDone', (msg) => {
explainer.pulse();
});
explainer.pulse = () => {
if (this._queue.length > explainer.id) {
explainer.sendMessage(this._queue.shift().msg);
} else if (this._isMain(explainer)) {
explainer.ready = true;
} else {
this._idleExplainers.push(explainer);
explainer.unpool();
}
};
Важными метриками пула worker_thread являются размер очереди и количество работающих потоков:
Кроме этого мы мониторим скорость и производительность worker_thread и воркеров в целом:
Заключение
Мы рассмотрели особенности мониторинга многопоточного приложения Node.JS.
Для комплексного анализа работы приложения необходимо отслеживать массу показателей — метрики по серверу в целом, использование приложением системных ресурсов, метрики среды выполнения, а также различные показатели самого приложения. В общем всего, что включает в себя APM.
Комментариев нет:
Отправить комментарий