...

четверг, 13 октября 2016 г.

[Из песочницы] Кластер Asterisk. Централизация информации о регистрации

У большинства администраторов, работающих с телефонией на базе Asterisk, в компаниях, где штат превышает 500+ сотрудников, рано или поздно встает вопрос о полноценной кластеризации Active/Active. Предпосылками к этому может быть и наличие региональных ответвлений, и желание сделать систему надежнее. Тема обширная и не является целью данной статьи в полном объеме, которая написана с целью показать один из самых быстрых и надежных способов добыть информацию о регистрации устройств на серверах в кластере, с целью последующей централизации или/и дистрибуции внутри кластера. Логично предположить, что самый производительный способ — это быть частью самого Asterisk.

Поэтому, чтобы не тянуть кота за хвост, шаблон загружаемого модуля для Asterisk:
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision: $")

#include "asterisk/module.h"
#include "asterisk/logger.h"

static int load_module(void){
        // Init code here
        return AST_MODULE_LOAD_SUCCESS;
}

static int unload_module(void){
        // Destroy code here
        return 0;
}

AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Hello World");


Это минимально необходимый код, который нужно поместить в папку с сорцами Asterisk в подкаталог res. При компиляции, будет собран новый модуль с названием «как имя файла» и дискрипцией «Hello World».

Отлично, внутрь Asterisk мы попали, что дальше? Нам нужно получить информацию о регистрации телефона, бонусом мы хотим знать IP адрес телефона, возможно его джитер и статус (USE/NOT_USE/HOLD).

В Asterisk для этого существует Stasis . К сожалению, единственный способ разобраться в работе этого механизма — это изучить исходные коды. Итак, сделаем 3 простых шага:

1. Подписываемся на получения событий от шины стазиса

stasis_subscribe(ast_endpoint_topic_all(),acl_change_stasis_dev_status,NULL);


ast_endpoint_topic_all() — возвращает название «Топика» сообщения.
acl_change_stasis_dev_status — Это функция, которая будет вызвана, когда в стазисе появиться нужное для нас сообщение из указанного топика.

2. Создаем функцию, в которой будем ловить нужные сообщения.
static void acl_change_stasis_dev_status(void *data, struct stasis_subscription *sub, struct stasis_message *msg){
        
        }

3. Собственно самое вкусное — код обработки события.
Перед тем как начеркать свое, нужно понять, а в каком виде оно к нам придет? Для этого лезем в сорцы и находим вот такой кусок:
ast_endpoint_blob_publish(peer->endpoint, ast_endpoint_state_type(), blob);


и далее:
void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type, struct ast_json *blob)
{
        RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
        if (blob) {
                message = ast_endpoint_blob_create(endpoint, type, blob);
        }
        if (message) {
                stasis_publish(ast_endpoint_topic(endpoint), message);
        }
}

struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint,
        struct stasis_message_type *type, struct ast_json *blob)
{
        RAII_VAR(struct ast_endpoint_blob *, obj, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);

        if (!type) {
                return NULL;
        }
        if (!blob) {
                blob = ast_json_null();
        }

        if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
                return NULL;
        }

        if (endpoint) {
                if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
                        return NULL;
                }
        }

        obj->blob = ast_json_ref(blob);

        if (!(msg = stasis_message_create(type, obj))) {
                return NULL;
        }

        ao2_ref(msg, +1);
        return msg;
}


Что дает нам это кусок кода? Понимание того, что в качестве полезной нагрузки в нашу функцию мы получим ast_endpoint_blob, внутри которого будет ast_endpoint_snapshot и JSON.

Теперь наш код:

 if(ast_endpoint_state_type() != stasis_message_type(msg))return; // Проверим, мы получили нужное нам сообщение?
        
        // Все остальное выдергиваеться из исходных кодов Asterisk при длительном и внимательном изучении
        struct ast_endpoint_blob * n=stasis_message_data(msg); // Полезная нагрузка в данном топике ходит в виде структуры ast_endpoint_blob
        
        // Сведения о регистрации конвентированы в JSON
        struct ast_json * m;
        
    struct ast_endpoint_snapshot *snap; // А вот информация о пире лежит рядом c JSON в структуре ast_endpoint_snapshot
    snap=n->snapshot; // Получаем информацию о пире
    m=n->blob; // Получаем JSON
        
        char buffer[1050];
        sprintf(buffer,"Device %s (%s): %s\n",snap->id,ast_endpoint_state_to_string(snap->state),ast_json_dump_string_format(m,AST_JSON_COMPACT));
        ast_log(LOG_NOTICE,buffer); // И выводим в лог  


Описанные выше структуры потянут за собой следующие инклуды:
 #include "asterisk/stasis_endpoints.h"
        #include "asterisk/stasis.h"
        #include "asterisk/stasis_message_router.h"
        #include "asterisk/stasis_channels.h"
        #include "asterisk/stasis_bridges.h"
        #include "asterisk/stasis_system.h"
        #include "asterisk/devicestate.h"
        #include "asterisk/json.h"


Про них тоже нужно не забыть, при создании модуля.

Итого:

#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision: $")

#include "asterisk/module.h"
#include "asterisk/logger.h"

#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_system.h"
#include "asterisk/devicestate.h"
#include "asterisk/json.h"

static void acl_change_stasis_dev_status(void *data, struct stasis_subscription *sub, struct stasis_message *msg){
        if(ast_endpoint_state_type() != stasis_message_type(msg))return;
        
        struct ast_endpoint_blob * n=stasis_message_data(msg);
        
        struct ast_json * m;
    struct ast_endpoint_snapshot *snap;
    snap=n->snapshot;
    m=n->blob;
        
        char buffer[1050];
        sprintf(buffer,"Device %s (%s): %s\n",snap->id,ast_endpoint_state_to_string(snap->state),ast_json_dump_string_format(m,AST_JSON_COMPACT));
        ast_log(LOG_NOTICE,buffer);
}

static int load_module(void){
        stasis_subscribe(ast_endpoint_topic_all(),acl_change_stasis_dev_status,NULL);
        return AST_MODULE_LOAD_SUCCESS;
}

static int unload_module(void){
        // Тут нужно отписаться от события
        return 0;
}

Для чего это нужно? Полученную информацию можно класть в единую базу кластера, которая всегда будет знать на каком конкретно сервере зарегистрировано устройство. Смаршутизировать звонок, опираясь на эту информацию уже можно средствами диалплана.

И, собственно, как это выглядит:

image

Комментарии (0)

    Let's block ads! (Why?)

    Комментариев нет:

    Отправить комментарий