вторник, 11 апреля 2017 г.

[recovery mode] Cluster + EXPRESS + socket.io без REDIS

Мне очень понравилась библиотека soket.io. С помощью нее можно реализовать реально realtime приложения. Она сама выбирает протокол в зависимости от браузера, если надо то создает WebSoket.

Это конечно все хорошо, но не стоит забывать о том, что Node.js работает в одном потоке. Для этого у нас есть отличный инструмент Cluset.

Передо мной встал вопрос, а можно ли сделать приложения с кластеризацией, но не использовать для этого REDIS? Также для удобства используем EXPRESS.

Создадим простой пример (пока без кластера): Пример из официальной документации.

var app = require('express')();
var server = require('http').Server(app);
var io = require('socket.io')(server);

server.listen(3038);

app.get('/', function (req, res) {
  res.sendfile(__dirname + '/index.html');
});

io.on('connection', function (socket) {
  socket.emit('news', { hello: 'world' });
  socket.on('my other event', function (data) {
    console.log(data);
  });
});


Пока ничего сложного, все понятно, вешаем сервер на порт 3038 с express и soket.io. Клиентский код будет неизменным:
Немного пояснений по клиентскому коду. Создаем объект soket с настройками (порт 3038, таймаут подключения 10мс). В случае ошибок подключения более 5 раз, отключаем работу soket, если менее, то обнуляем счетчик. А также подключаемся к комнате news.

Следующим этапом будет создание кластера. Т.к. сокет нельзя создать на одном порту, то каждый новый worker будет создавать soket.io на новом порту:

var cluster = require('cluster');//Включаем cluster
var cpuCount = require('os').cpus().length;//Количество ядер процессора
var io = [];
//В мастере создаем worker'ов равное количеству ядер процессоров
if (cluster.isMaster) {
    for (var i = 0; i < cpuCount; i += 1) {
        var worker = cluster.fork();
    }
} 
//В воркере
if (cluster.isWorker) {
                var worker_id = cluster.worker.id;
                var express  = require('express');
var app = express();
                var server = require('http').Server(app);
                io[worker_id] = require('socket.io')(server);
                server.listen(3030+worker_id);


                io[worker_id].on('connection', function (socket) {
                        console.log( socket.id );
                        console.log( "WORKER ID :"+worker_id );
                        socket.emit('news', { hello: 'world' });
                        socket.on('my other event', function (data) {
                        console.log(data);
                        });
                });

var app_express = express();
                app_express.listen(8081);
                app_express.use(express.static('public'));//отдаем статичные данные (см. код клиента)
                app_express.get('/', function (request, response) {
                        response.send('Hello from Worker '+worker_id);   
                        console.log( '------' );
                        
                });
                
                
                app_express.get('/get_port', function (request, response) {
                        response.send(3030+worker_id);   
                        console.log( 'get_port' );
                });
}

                app_express.get('/api', function (request, response) {
                        response.setHeader('Content-Type', 'application/json');
                        var id = request.param('id');
                        var msg = request.param('msg');
                        var port = request.param('port');
                        var JSON_DATA = {
                                        "worker_id":worker_id
                                        ,"id":id
                                        ,"msg":msg
                                        ,"port":port
                        };
                });


Схематично покажем как все это выглядит:

image

Когда мы открываем в браузере localhost:8081 cluster сервирует нас на worker по своему правилу. У нас есть вызов:

app_express.get('/api', function (request, response){...}

В этом вызове при открытии localhost:8081/api?msg=test&port=3032&id=100500 мы хотим передать клиенту подключенному по порту 3032 id=100500 сообщение с msg=teest.

Самый простой способ, создать клиента и открыть soket.io по определенному порту

var http = require('http');
var client = http.createClient(3032 , "localhost");
request = client.request();
request.on('response', function( res ) {
    res.on('data', function( data ) {
        console.log( data );
    } );
} );
request.end();


Но к примеру нам нужно отправить broadcast сообщение, и нам неизвестен порт на котором сервируется клиента. Проблема в следующем, мы достоверно не знаем что при открытии localhost:8081/api?msg=test&port=3032&id=100500 мы будем сервироваться на worker 2.

image

На рисунке показано что при отправки broadcast сообщения с worker1 мы не имеем возможности отправить на прямую клиентам soket.io 2, soket.io 3, soket.io 4, soket.io…

Для решения этой задачи мы можем воспользоваться методами в cluster. В cluster'e мы можем передавать сообщения из master → worker и из worker → master. Для наглядности изобразим это схематично.

image

Пример работы с отправкой master → worker и worker → master представлен ниже:

// Кластерное приложение
var cluster = require('cluster');
var io = [];
var cpuCount = require('os').cpus().length;
var workers = [];

if (cluster.isMaster) {
    // Count the machine's CPUs
    // Create a worker for each CPU
    for (var i = 0; i < cpuCount; i += 1) {
        var worker = cluster.fork();
        
        //Слушаем сообщение от workera
        worker.on('message', function(data) {
                //отправляем всем worker'ам сообщение
                for (var j in workers) {workers[j].send(data);}
        });
        //Добавляем объект worker в массив
        workers.push(worker);
    }
    
    
} 


if (cluster.isWorker) {
                //------------------------------------------------------------------------------------//
                var worker_id = cluster.worker.id;
                var express  = require('express');
                var app = express();
                var server = require('http').Server(app);
                io[worker_id] = require('socket.io')(server);
                server.listen(3030+worker_id);
                
                io[worker_id].on('connection', function (socket) {
                        console.log( socket.id );
                        console.log( "WORKER ID :"+worker_id );
                        socket.emit('news', { hello: 'world' });
                        socket.on('my other event', function (data) {
                        console.log(data);
                        });
                });
                //------------------------------------------------------------------------------------//
                var app_express = express();
                app_express.listen(8081);
                app_express.use(express.static('public'));//отдаем статичные данные
                app_express.get('/', function (request, response) {
                        response.send('Hello from Worker '+worker_id);   
                        console.log( '------' );
                        
                });
                
                
                app_express.get('/get_port', function (request, response) {
                        response.send(3030+worker_id);   
                        console.log( 'get_port' );
                });
                
                
                app_express.get('/api', function (request, response) {
                        //process.send("----------------------");
                        response.setHeader('Content-Type', 'application/json');
                        var id = request.param('id');
                        var msg = request.param('msg');
                        var JSON_DATA = {
                                        "worker_id":worker_id
                                        ,"id":id
                                        ,"msg":msg
                        };
                        
                        io[port-3030].to(msg.id).emit('news', msg.msg);

                        
                        response.send(  JSON.stringify(JSON_DATA) );  
                        
                        //Отправляем всем процессам данные
                        process.send(JSON_DATA);
                        
                });
                

                //Обработка сообщений от worker//
                process.on('message', function(msg){
                        console.log(worker_id);
                    console.log(msg.id);
                    console.log(msg.msg);
                    io[worker_id].to(msg.id).emit('news', msg.msg);
                    
                });
                
}


Надеюсь статья будет полезна для новичков, чтобы познакомиться с cluster и общением между master → worker и worker → master. Кратко рассмотрен soket.io. И самое главное не используется redis.

Комментарии (0)

    Let's block ads! (Why?)

    Комментариев нет:

    Отправить комментарий