...

четверг, 9 января 2014 г.

«Дружим» redis с nginx


/*
* Copyright (C) 2012 Valery Kholodkov
* 2014 Alexander V Makkoveev
*/

#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <nginx.h>

#define NGX_SOCKETLOG_FACILITY_LOCAL7 23
#define NGX_SOCKETLOG_SEVERITY_INFO 6

#define NGX_REDIS_APPEND "*3" CRLF "$6" CRLF "APPEND" CRLF
#define NGX_REDIS_AUTH "*2" CRLF "$4" CRLF "AUTH" CRLF

//#define NGX_DEF_FORMAT "combined"
#define NGX_DEF_FORMAT "main"
#define IF_DEBUG 0
#define IF_DEBUG_2 0
//*****************************************************************************

typedef struct ngx_http_log_op_s ngx_http_log_op_t;

typedef u_char *(*ngx_http_log_op_run_pt) (ngx_http_request_t *r, u_char *buf,
ngx_http_log_op_t *op);

typedef size_t (*ngx_http_log_op_getlen_pt) (ngx_http_request_t *r,
uintptr_t data);

struct ngx_redislog_peer;

typedef void (*ngx_redislog_send_handler_pt)(struct ngx_redislog_peer*);

struct ngx_http_log_op_s {
size_t len;
ngx_http_log_op_getlen_pt getlen;
ngx_http_log_op_run_pt run;
uintptr_t data;
};

typedef struct {
ngx_str_t name;
#if defined nginx_version && nginx_version >= 7018
ngx_array_t *flushes;
#endif
ngx_array_t *ops; /* array of ngx_http_log_op_t */
} ngx_http_log_fmt_t;

typedef struct {
ngx_array_t formats; /* array of ngx_http_log_fmt_t */
ngx_uint_t combined_used; /* unsigned combined_used:1 */
} ngx_http_log_main_conf_t;

typedef struct {
ngx_str_t name;
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_msec_t write_timeout;
ngx_msec_t read_timeout;
ngx_msec_t connect_timeout;
ngx_msec_t reconnect_timeout;
ngx_msec_t flush_timeout;
ngx_msec_t ping_timeout;

ngx_bufs_t bufs;
size_t recv_buf_size;

ngx_str_t password;

unsigned authenticate:1;
} ngx_redislog_peer_conf_t;

typedef struct {
ngx_array_t *peers;
} ngx_redislog_conf_t;

typedef struct ngx_redislog_peer {
ngx_redislog_peer_conf_t *conf;
ngx_peer_connection_t conn;
ngx_event_t reconnect_timer;
ngx_event_t flush_timer;
ngx_event_t ping_timer;
ngx_log_t *log;
ngx_pool_t *pool;

ngx_chain_t *busy;
ngx_chain_t *free;

ngx_buf_t *recv_buf;

ngx_uint_t discarded;
ngx_uint_t reconnect_timeout;

ngx_uint_t num_queued;
ngx_uint_t state;
u_char *password_pos;

ngx_redislog_send_handler_pt send_handler;

unsigned connecting:1;
unsigned authenticated:1;
unsigned flush_timer_set:1;
} ngx_redislog_peer_t;

typedef struct {
ngx_str_t peer_name;
ngx_uint_t peer_idx;
ngx_http_log_fmt_t *format;
ngx_http_complex_value_t *key;
ngx_str_t command;
ngx_str_t arg1;
//***
//ngx_str_t arg_num;
//***
ngx_http_complex_value_t *_if;
ngx_http_complex_value_t *ifnot;

unsigned has_arg1;
} ngx_http_redislog_t;

typedef struct {
ngx_array_t *logs; /* array of ngx_http_redislog_t */
unsigned off;
} ngx_http_redislog_conf_t;

static ngx_array_t ngx_redislog_peers;

static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p);
static void ngx_http_redislog_append(ngx_redislog_peer_t *p, u_char *buf, size_t len);
static void ngx_http_redislog_send(ngx_redislog_peer_t *p);
static void ngx_redislog_flush_handler(ngx_event_t*);
static u_char *ngx_redislog_size(u_char*, u_char*, size_t);
static size_t ngx_redislog_size_len(size_t);

static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t*, ngx_buf_t*);

static void ngx_redislog_read_handler(ngx_event_t *rev);
static void ngx_redislog_idle_read_handler(ngx_event_t *rev);

static char *ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);

static void *ngx_http_redislog_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent,
void *child);

static ngx_int_t ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);

static void *ngx_redislog_create_conf(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_redislog_add_variables(ngx_conf_t *cf);
static ngx_int_t ngx_http_redislog_init(ngx_conf_t *cf);
static ngx_int_t ngx_redislog_init_process(ngx_cycle_t *cycle);

static ngx_command_t ngx_http_redislog_commands[] = {

{ ngx_string("access_redislog"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_SIF_CONF|NGX_HTTP_LIF_CONF
|NGX_HTTP_LMT_CONF|NGX_CONF_TAKE1234,
ngx_http_redislog_set_log,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },

ngx_null_command
};

static ngx_http_module_t ngx_http_redislog_module_ctx = {
ngx_http_redislog_add_variables, /* preconfiguration */
ngx_http_redislog_init, /* postconfiguration */

NULL, /* create main configuration */
NULL, /* init main configuration */

NULL, /* create server configuration */
NULL, /* merge server configuration */

ngx_http_redislog_create_loc_conf, /* create location configration */
ngx_http_redislog_merge_loc_conf /* merge location configration */
};

extern ngx_module_t ngx_http_log_module;

ngx_module_t ngx_http_redislog_module = {
NGX_MODULE_V1,
&ngx_http_redislog_module_ctx, /* module context */
ngx_http_redislog_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};

static ngx_command_t ngx_redislog_commands[] = {

{ ngx_string("redislog"),
NGX_MAIN_CONF|NGX_CONF_TAKE23,
ngx_http_redislog_command,
0,
0,
NULL },

ngx_null_command
};

static ngx_core_module_t ngx_redislog_module_ctx = {
ngx_string("redislog"),
ngx_redislog_create_conf,
NULL
};

ngx_module_t ngx_core_redislog_module = {
NGX_MODULE_V1,
&ngx_redislog_module_ctx, /* module context */
ngx_redislog_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
ngx_redislog_init_process, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};

static ngx_http_variable_t ngx_http_redislog_variables[] = {

{ ngx_string("redislog_yyyy"), NULL, ngx_http_redislog_yyyy_variable, 0,
0, 0 },

{ ngx_string("redislog_yyyymm"), NULL, ngx_http_redislog_yyyymm_variable, 0,
0, 0 },

{ ngx_string("redislog_yyyymmdd"), NULL, ngx_http_redislog_yyyymmdd_variable, 0,
0, 0 },

{ ngx_string("redislog_yyyymmddhh"), NULL, ngx_http_redislog_yyyymmddhh_variable, 0,
0, 0 },

{ ngx_null_string, NULL, NULL, 0, 0, 0 }
};
//-----------------------------------------------------------------------------
ngx_int_t
ngx_http_redislog_handler(ngx_http_request_t *r)
{

u_char *line, *p;
size_t len, command_size_len, arg1_size_len, record_len, key_size_len, record_size_len;
ngx_uint_t i, l;
ngx_str_t key, _if, ifnot;
ngx_http_redislog_t *log;
ngx_http_log_op_t *op;
ngx_http_redislog_conf_t *slcf;
time_t time;
ngx_tm_t tm;
ngx_redislog_peer_t **peer;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http redislog handler");

slcf = ngx_http_get_module_loc_conf(r, ngx_http_redislog_module);

ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"redislog conf=%p, off=%ud, logs=%p", slcf, slcf->off, slcf->logs);

if(slcf->off || slcf->logs == NULL) {
return NGX_OK;
}

time = ngx_time();
ngx_gmtime(time, &tm);

log = slcf->logs->elts;

for (l = 0; l < slcf->logs->nelts; l++) {
#if defined nginx_version && nginx_version >= 7018
ngx_http_script_flush_no_cacheable_variables(r, log[l].format->flushes);
#endif

len = 0;
op = log[l].format->ops->elts;
for (i = 0; i < log[l].format->ops->nelts; i++) {
if (op[i].len == 0) {
len += op[i].getlen(r, op[i].data);

} else {
len += op[i].len;
}
}

if(log[l].ifnot != NULL) {
if(ngx_http_complex_value(r, log[l].ifnot, &ifnot) != NGX_OK) {
return NGX_ERROR;
}

if(ifnot.len && (ifnot.len != 1 || ifnot.data[0] != '0')) {
continue;
}
}

if(log[l]._if != NULL) {
if(ngx_http_complex_value(r, log[l]._if, &_if) != NGX_OK) {
return NGX_ERROR;
}

if(!_if.len || (_if.len == 1 && _if.data[0] == '0')) {
continue;
}
}

if(ngx_http_complex_value(r, log[l].key, &key) != NGX_OK) {
return NGX_ERROR;
}

command_size_len = ngx_redislog_size_len(log[l].command.len);
key_size_len = ngx_redislog_size_len(key.len);

if(log[l].arg1.len) {
arg1_size_len = ngx_redislog_size_len(log[l].arg1.len);
}
else {
arg1_size_len = 0;
}

len += 2 + sizeof(CRLF) - 1 + 1 + command_size_len + 1 + sizeof(CRLF) - 1
+ command_size_len + sizeof(CRLF) - 1 + log[l].command.len + sizeof(CRLF) - 1
+ key_size_len + sizeof(CRLF) - 1 + key.len + sizeof(CRLF) - 1
+ 1 + NGX_OFF_T_LEN + sizeof(CRLF) - 1 + sizeof(CRLF) - 1;

if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
len++;
}

if(log[l].has_arg1) {
len += arg1_size_len + sizeof(CRLF) - 1 + log[l].arg1.len + sizeof(CRLF) - 1;
}

#if defined nginx_version && nginx_version >= 7003
line = ngx_pnalloc(r->pool, len);
#else
line = ngx_palloc(r->pool, len);
#endif
if (line == NULL) {
return NGX_ERROR;
}

p = line;

for(i = 0; i < log[l].format->ops->nelts; i++) {
p = op[i].run(r, p, &op[i]);
}

if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
*p++ = LF;
}

record_len = p - line;

record_size_len = ngx_redislog_size_len(record_len);

p = line;

/*
* Redis append to time series command
*3
$6
APPEND
$nnn
key
$nnn
log record
*/
*p++ = '*';

//*p++ = log[l].has_arg1 ? '4' : '3';
/* SETEX */
if(ngx_strncmp(log[l].command.data, "SETEX", 5) == 0)
*p++ = '4';
else
*p++ = log[l].has_arg1 ? '5' : '3';

p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

*p++ = '$';
//*p++ = '0';

p = ngx_redislog_size(p, p + command_size_len, log[l].command.len);

p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

p = ngx_copy(p, log[l].command.data, log[l].command.len);

p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "EVALSHA", 7) == 0) {
*p++ = '$';
p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

*p++ = '$';
p = ngx_redislog_size(p, p + 1, 1);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, "1", 1);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
}
/*
*p++ = '$';
*p++ = '1';
*p++ = LF;
*p++ = '0';
*p++ = LF;
*/

*p++ = '$';

p = ngx_redislog_size(p, p + key_size_len, key.len);

p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

p = ngx_copy(p, key.data, key.len);

p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "SETEX", 5) == 0) {
*p++ = '$';
p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

}

*p++ = '$';

p = ngx_redislog_size(p, p + record_size_len, record_len);

p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

for(i = 0; i < log[l].format->ops->nelts; i++) {
p = op[i].run(r, p, &op[i]);
}

if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
*p++ = LF;
}

p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

peer = ngx_redislog_peers.elts;

peer += log[l].peer_idx;

ngx_http_redislog_append(*peer, line, p - line);
//ngx_http_redislog_append(*peer, line, p - line + 5);
}

return NGX_OK;
}

static u_char *ngx_redislog_size(u_char *p, u_char *q, size_t sz)
{
u_char *end = q;

while(p != q) {
*--q = (sz % 10 + '0');
sz /= 10;
}

return end;
}

static size_t ngx_redislog_size_len(size_t sz)
{
size_t len = 0;

while(sz != 0) {
sz /= 10;
len++;
}

return len;
}

static u_char*
ngx_redislog_buf_append(ngx_buf_t *buf, u_char *p, size_t *len)
{
size_t remaining = buf->end - buf->last;

if(remaining > *len) {
remaining = *len;
}

buf->last = ngx_copy(buf->last, p, remaining);
*len -= remaining;

return p + remaining;
}

static void
ngx_http_redislog_append(ngx_redislog_peer_t *peer, u_char *buf, size_t len)
{
u_char *p;
ngx_chain_t *last, *q;
size_t remaining;
ngx_uint_t num_busy = 0;

/*
* Find last busy buffer
*/
last = peer->busy;

while(last != NULL && last->next != NULL) {
last = last->next;
}

/*
* See if message fits into remaining space
*/
remaining = (last != NULL ? last->buf->end - last->buf->last : 0);

q = peer->free;

while(remaining <= len && q != NULL) {
remaining += (q->buf->end - q->buf->last);
q = q->next;
}

/*
* No memory for this message, discard it
*/
if(remaining < len) {
peer->discarded++;
return;
}

/*
* Append message to the buffers
*/
if(last != NULL) {
p = ngx_redislog_buf_append(last->buf, buf, &len);
}
else {
p = buf;
}

while(peer->free != NULL && len != 0) {
q = peer->free;

p = ngx_redislog_buf_append(q->buf, p, &len);

peer->free = peer->free->next;

q->next = NULL;

if(last == NULL) {
peer->busy = q;
}
else {
last->next = q;
}

last = q;
}

peer->num_queued++;

q = peer->busy;

while(q != NULL) {
num_busy++;
q = q->next;
}

if(!peer->flush_timer_set) {
peer->flush_timer.handler = ngx_redislog_flush_handler;
peer->flush_timer.data = peer;
peer->flush_timer.log = peer->conn.log;

ngx_add_timer(&peer->flush_timer, peer->conf->flush_timeout);

peer->flush_timer_set = 1;
}

if(num_busy >= 2) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, peer->conn.connection->log, 0,
"redislog num queued is now %ud, set read handler", peer->num_queued);

peer->conn.connection->read->handler = ngx_redislog_read_handler;
/*
* Send it
*/
ngx_http_redislog_send(peer);
}
}

static void
ngx_http_redislog_send(ngx_redislog_peer_t *p)
{
ngx_chain_t *written;
ngx_connection_t *c;
ngx_chain_t *dummy = NULL;

c = p->conn.connection;

if(c == NULL || c->fd == -1) {
return;
}

if(!c->write->ready) {
return;
}

if(p->flush_timer_set) {
ngx_del_timer(&p->flush_timer);
p->flush_timer_set = 0;
}

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog send handler");

if(p->busy != NULL) {
written = c->send_chain(c, p->busy, 0);

if(written == NGX_CHAIN_ERROR) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"redislog write error");
ngx_close_connection(c);
ngx_redislog_reconnect_peer(p);
return;
}

ngx_chain_update_chains(p->pool, &p->free, &p->busy, &dummy, 0);

if(written != NULL) {

if(!c->write->ready && !c->write->timer_set) {
ngx_add_timer(c->write, p->conf->write_timeout);
}

if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_close_connection(c);
ngx_redislog_reconnect_peer(p);
}

return;
}
}
}

static void ngx_redislog_auth_send(ngx_redislog_peer_t *peer)
{
ngx_connection_t *c;
ngx_str_t *password;
ssize_t n;

c = peer->conn.connection;

if(c == NULL || c->fd == -1) {
return;
}

password = &peer->conf->password;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog auth send handler");

n = c->send(c, peer->password_pos, password->len - (peer->password_pos - password->data));

if(n > 0) {
peer->password_pos += n;

if(peer->password_pos >= (password->data + password->len)) {
peer->send_handler = ngx_http_redislog_send;
ngx_http_redislog_send(peer);
}

return;
}

if(n == NGX_ERROR) {
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
return;
}

if(!c->write->timer_set) {
ngx_add_timer(c->write, peer->conf->write_timeout);
}

if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
return;
}
}

static void ngx_redislog_flush_handler(ngx_event_t *ev)
{
ngx_redislog_peer_t *peer = ev->data;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, peer->log, 0,
"redislog flush handler, set read handler");

peer->flush_timer_set = 0;
peer->conn.connection->read->handler = ngx_redislog_read_handler;

ngx_http_redislog_send(peer);
}

static void ngx_redislog_connected_handler(ngx_redislog_peer_t *peer)
{
ngx_connection_t *c;

c = peer->conn.connection;

ngx_del_timer(c->read);

/*
* Once the connection has been established, we need to
* reset the reconnect timeout to it's initial value
*/
peer->reconnect_timeout = peer->conf->reconnect_timeout;

if(peer->discarded != 0) {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"redislog peer \"%V\" discarded %ui messages",
&peer->conf->name, peer->discarded);

peer->discarded = 0;
}
}

static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t *peer, ngx_buf_t *buf)
{
u_char *p, *q;

p = buf->pos;
q = buf->last;

while(p != q) {
if(!peer->state) {
if(*p == '+' || *p == '-' || *p == ':') {
if(peer->conf->authenticate && !peer->authenticated) {
if(*p == '-') {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"redis authentication failure");
return NGX_ERROR;
}
peer->authenticated = 1;
}
else {
if(peer->num_queued) {
peer->num_queued--;
}
else {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"too many responses from redis");
return NGX_ERROR;
}
}
}
if(*p == '-') {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"redislog error");
}
peer->state++;
}
else {
if(*p == LF) {
peer->state = 0;
}
else if(peer->state == 1) {
if(*p == CR) {
peer->state++;
}
}
}

p++;
}

buf->pos = p;

return NGX_OK;
}

static void ngx_redislog_read_handler(ngx_event_t *rev)
{
ngx_connection_t *c;
ngx_redislog_peer_t *peer;
ngx_buf_t *buf;
ssize_t n, size;
ngx_int_t rc;

c = rev->data;
peer = c->data;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
"redislog read handler");

if(c->read->timer_set) {
ngx_del_timer(c->read);
}

if(rev->timedout || c->error || c->close) {
if(rev->timedout) {
ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
"redislog peer timed out");
}

if(rev->error) {
ngx_log_error(NGX_LOG_ERR, rev->log, 0,
"redislog peer connection error");
}

ngx_close_connection(c);

if(!c->close) {
ngx_redislog_reconnect_peer(peer);
}
return;
}

buf = peer->recv_buf;

for( ;; ) {
for( ;; ) {
if(buf->last == buf->end) {
break;
}

size = buf->end - buf->last;

n = c->recv(c, buf->last, size);

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog peer recv %z", n);

if(n == NGX_AGAIN) {
break;
}

if(n == 0) {
if(peer->num_queued != 0) {
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"redis closed the connection prematurely");
}
}

if(n == 0 || n == NGX_ERROR) {
c->error = 1;
goto reconnect;
}

buf->last += n;
}

rc = ngx_redislog_process_buf(peer, buf);

if(rc != NGX_OK) {
goto reconnect;
}

buf->pos = buf->last = buf->start;

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog num queued is now %ud", peer->num_queued);

if(peer->num_queued == 0) {
break;
}

if (!c->read->ready) {
if(ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto reconnect;
}

if(!c->read->timer_set) {
ngx_add_timer(c->read, peer->conf->read_timeout);
}

return;
}
}

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
"redislog set idle read handler");

c->read->handler = ngx_redislog_idle_read_handler;

return;

reconnect:
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
}

static void ngx_redislog_idle_read_handler(ngx_event_t *rev)
{
ngx_connection_t *c;
ngx_redislog_peer_t *peer;

int n;
char buf[1];
ngx_err_t err;

c = rev->data;
peer = c->data;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
"redislog idle read handler");

if(rev->timedout || c->error || c->close) {
if(rev->timedout) {
ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
"redislog peer timed out");
}

if(rev->error) {
ngx_log_error(NGX_LOG_ERR, rev->log, 0,
"redislog peer connection error");
}

ngx_close_connection(c);

if(!c->close) {
ngx_redislog_reconnect_peer(peer);
}
return;
}

#if (NGX_HAVE_KQUEUE)
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {

if(!rev->pending_eof) {
goto no_error;
}

rev->eof = 1;
c->error = 1;

if(rev->kq_errno) {
rev->error = 1;
}

goto reconnect;
}
#endif

n = recv(c->fd, buf, 1, MSG_PEEK);

err = ngx_socket_errno;

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, rev->log, err,
"redislog recv(): %d", n);

if(n > 0) {
goto no_error;
}

if(n == -1) {
if(err == NGX_EAGAIN) {
goto no_error;
}

rev->error = 1;

}
else {
err = 0;
}

rev->eof = 1;
c->error = 1;

ngx_log_error(NGX_LOG_ERR, rev->log, err,
"redislog connection error");

#if (NGX_HAVE_KQUEUE)
reconnect:
#endif
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
return;

no_error:
if(peer->connecting) {
ngx_redislog_connected_handler(peer);
peer->connecting = 0;
}
}

static void ngx_redislog_write_handler(ngx_event_t *wev)
{
ngx_connection_t *c;
ngx_redislog_peer_t *peer;

c = wev->data;
peer = c->data;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, wev->log, 0,
"redislog write handler");

if(wev->timedout || c->error || c->close) {
if(wev->timedout) {
ngx_log_error(NGX_LOG_ERR, wev->log, NGX_ETIMEDOUT,
"redislog peer timed out");
}

if(wev->error) {
ngx_log_error(NGX_LOG_ERR, wev->log, 0,
"redislog peer connection error");
}

ngx_close_connection(c);

if(!c->close) {
ngx_redislog_reconnect_peer(peer);
}
return;
}

if(peer->connecting) {
ngx_redislog_connected_handler(peer);
peer->connecting = 0;
}

if(c->write->timer_set) {
ngx_del_timer(c->write);
}

peer->send_handler(peer);
}

static ngx_int_t ngx_redislog_connect_peer(ngx_redislog_peer_t *peer)
{
ngx_int_t rc;

ngx_log_error(NGX_LOG_INFO, peer->log, 0,
"redislog connect peer \"%V\"", &peer->conf->name);

peer->conn.sockaddr = peer->conf->sockaddr;
peer->conn.socklen = peer->conf->socklen;
peer->conn.name = &peer->conf->name;
peer->conn.get = ngx_event_get_peer;
peer->conn.log = peer->log;
peer->conn.log_error = NGX_ERROR_ERR;

rc = ngx_event_connect_peer(&peer->conn);

if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) {
if(peer->conn.connection) {
ngx_close_connection(peer->conn.connection);
}

return NGX_ERROR;
}

peer->conn.connection->data = peer;
peer->conn.connection->pool = peer->pool;

peer->password_pos = peer->conf->password.data;
peer->authenticated = 0;

peer->conn.connection->read->handler = ngx_redislog_read_handler;
peer->conn.connection->write->handler = ngx_redislog_write_handler;

peer->send_handler = peer->conf->authenticate ? ngx_redislog_auth_send
: ngx_http_redislog_send;

ngx_add_timer(peer->conn.connection->read, peer->conf->connect_timeout);

peer->connecting = 1;

return NGX_OK;
}

static void ngx_redislog_connect_handler(ngx_event_t *ev)
{
ngx_int_t rc;
ngx_redislog_peer_t *peer = ev->data;

rc = ngx_redislog_connect_peer(peer);

if(rc != NGX_OK) {
ngx_redislog_reconnect_peer(peer);
}
}

static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p)
{
p->conn.connection = NULL;

p->reconnect_timer.handler = ngx_redislog_connect_handler;
p->reconnect_timer.data = p;
p->reconnect_timer.log = p->conn.log;

ngx_add_timer(&p->reconnect_timer, p->reconnect_timeout);

p->reconnect_timeout *= 2;

if(p->discarded != 0) {
ngx_log_error(NGX_LOG_ERR, p->log, 0,
"redislog peer \"%V\" discarded %ui messages",
&p->conf->name, p->discarded);

p->discarded = 0;
}
}

static ngx_int_t
ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line;

line = ngx_palloc(r->pool, sizeof("yyyy")-1);

if(line == NULL) {
return NGX_ERROR;
}

(void) ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);

v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;

v->data = line;
v->len = sizeof("yyyy")-1;

return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line, *p;

line = ngx_palloc(r->pool, sizeof("yyyymm")-1);

if(line == NULL) {
return NGX_ERROR;
}

p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
(void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);

v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;

v->data = line;
v->len = sizeof("yyyymm")-1;

return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line, *p;

line = ngx_palloc(r->pool, sizeof("yyyymmdd")-1);

if(line == NULL) {
return NGX_ERROR;
}

p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
(void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);

v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;

v->data = line;
v->len = sizeof("yyyymmdd")-1;

return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line, *p;

line = ngx_palloc(r->pool, sizeof("yyyymmddhh")-1);

if(line == NULL) {
return NGX_ERROR;
}

p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);
(void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 11, sizeof("hh")-1);

v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;

v->data = line;
v->len = sizeof("yyyymmddhh")-1;

return NGX_OK;
}

static void *
ngx_http_redislog_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_redislog_conf_t *conf;

conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_redislog_conf_t));
if (conf == NULL) {
return NGX_CONF_ERROR;
}

return conf;
}

static char *
ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_redislog_conf_t *prev = parent;
ngx_http_redislog_conf_t *conf = child;

if(conf->logs || conf->off) {
return NGX_CONF_OK;
}

conf->logs = prev->logs;
conf->off = prev->off;

return NGX_CONF_OK;
}

static void *
ngx_redislog_create_conf(ngx_cycle_t *cycle)
{
ngx_redislog_conf_t *slcf;

slcf = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_conf_t));
if(slcf == NULL) {
return NULL;
}

return slcf;
}

static ngx_int_t
ngx_http_redislog_add_variables(ngx_conf_t *cf)
{
ngx_http_variable_t *var, *v;

for (v = ngx_http_redislog_variables; v->name.len; v++) {

var = ngx_http_add_variable(cf, &v->name, v->flags);
if (var == NULL) {
return NGX_ERROR;
}

var->get_handler = v->get_handler;
var->data = v->data;
}

return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_find_peer_by_name(ngx_conf_t *cf, ngx_str_t *name)
{
ngx_redislog_conf_t *slcf;
ngx_redislog_peer_conf_t *pc;
ngx_uint_t i;

slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);

pc = slcf->peers->elts;

for(i = 0; i < slcf->peers->nelts; i++) {
if(pc[i].name.len == name->len
&& ngx_strncmp(pc[i].name.data, name->data, name->len) == 0)
{
return i;
}
}

return NGX_DECLINED;
}

static char *
ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_redislog_conf_t *slcf = conf;

ngx_uint_t i;
ngx_str_t *value, name, command, arg1, _if;
//ngx_str_t arg_num;

ngx_http_redislog_t *log;
ngx_http_log_fmt_t *fmt;
ngx_http_log_main_conf_t *lmcf;
ngx_int_t rc;
ngx_http_compile_complex_value_t ccv;
unsigned format_set;

format_set = 0;
value = cf->args->elts;

if (ngx_strcmp(value[1].data, "off") == 0) {
slcf->off = 1;
return NGX_CONF_OK;
}
slcf->off = 0;

if (slcf->logs == NULL) {
slcf->logs = ngx_array_create(cf->pool, 2, sizeof(ngx_http_redislog_t));
if (slcf->logs == NULL) {
return NGX_CONF_ERROR;
}
}

lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_log_module);

if(lmcf == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"redislog module requires log module to be compiled in");
return NGX_CONF_ERROR;
}

log = ngx_array_push(slcf->logs);
if (log == NULL) {
return NGX_CONF_ERROR;
}

ngx_memzero(log, sizeof(ngx_http_redislog_t));

log->peer_name = value[1];

rc = ngx_http_redislog_find_peer_by_name(cf, &log->peer_name);

if(rc == NGX_DECLINED) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"redislog peer %V is not defined", &log->peer_name);
return NGX_CONF_ERROR;
}

log->peer_idx = rc;

/*
* Create and compile key
*/
log->key = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
if(log->key == NULL) {
return NGX_CONF_ERROR;
}

ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));

ccv.cf = cf;
ccv.value = &value[2];
ccv.complex_value = log->key;

if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
return NGX_CONF_ERROR;
}

ngx_str_set(&command, "APPEND");
//ngx_str_set(&arg1, "");
ngx_str_set(&arg1, "test1");
//ngx_str_set(&arg_num, "0");
ngx_str_set(&name, "main");


if (cf->args->nelts >= 4) {
for (i = 3; i < cf->args->nelts; i++) {

if (ngx_strncmp(value[i].data, "format=", 7) == 0) {

format_set = 1;

name = value[i];

name.len -= 7;
name.data += 7;

if (ngx_strcmp(name.data, "combined") == 0) {
lmcf->combined_used = 1;
}
continue;
}

if (ngx_strncmp(value[i].data, "command=", 8) == 0) {

command = value[i];

command.len -= 8;
command.data += 8;

continue;
}

if (ngx_strncmp(value[i].data, "arg1=", 5) == 0) {

arg1 = value[i];

arg1.len -= 5;
arg1.data += 5;

log->has_arg1 = 1;

continue;
}

if (ngx_strncmp(value[i].data, "if=", 3) == 0) {
if(log->_if != NULL) {
continue;
}

_if = value[i];

_if.len -= 3;
_if.data += 3;

/*
* Create and compile if script
*/
log->_if = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
if(log->_if == NULL) {
return NGX_CONF_ERROR;
}

ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));

ccv.cf = cf;
ccv.value = &_if;
ccv.complex_value = log->_if;

if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
return NGX_CONF_ERROR;
}

continue;
}

if (ngx_strncmp(value[i].data, "ifnot=", 6) == 0) {
if(log->ifnot != NULL) {
continue;
}

_if = value[i];

_if.len -= 6;
_if.data += 6;

/*
* Create and compile if script
*/
log->ifnot = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
if(log->ifnot == NULL) {
return NGX_CONF_ERROR;
}

ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));

ccv.cf = cf;
ccv.value = &_if;
ccv.complex_value = log->ifnot;

if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
return NGX_CONF_ERROR;
}

continue;
}

ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid parameter \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
}

if(!format_set) {
name.len = sizeof(NGX_DEF_FORMAT) - 1;
name.data = (u_char *) NGX_DEF_FORMAT;
lmcf->combined_used = 1;

}

log->command = command;

if(log->has_arg1) {
log->arg1 = arg1;
}

fmt = lmcf->formats.elts;
for (i = 0; i < lmcf->formats.nelts; i++) {
if (fmt[i].name.len == name.len
&& ngx_strcasecmp(fmt[i].name.data, name.data) == 0)
{
log->format = &fmt[i];
goto done;
}
}

ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"unknown log format \"%V\"", &name);
return NGX_CONF_ERROR;

done:

return NGX_CONF_OK;
}

static char *
ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_str_t *value;
ngx_redislog_conf_t *slcf;
ngx_url_t u;
ngx_redislog_peer_conf_t *peer;
u_char *p;
size_t pass_size_len;

slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);

value = cf->args->elts;

ngx_memzero(&u, sizeof(ngx_url_t));

u.url = value[2];
u.default_port = 6379;
u.no_resolve = 0;

if(ngx_parse_url(cf->pool, &u) != NGX_OK) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%V: %s", &u.host, u.err);
return NGX_CONF_ERROR;
}

if(slcf->peers == NULL) {
slcf->peers = ngx_array_create(cf->pool, 2, sizeof(ngx_redislog_peer_conf_t));
if (slcf->peers == NULL) {
return NGX_CONF_ERROR;
}
}

peer = ngx_array_push(slcf->peers);
if(peer == NULL) {
return NGX_CONF_ERROR;
}

peer->name = value[1];
peer->sockaddr = u.addrs[0].sockaddr;
peer->socklen = u.addrs[0].socklen;

if(cf->args->nelts >= 4) {
/*
* Alloc space for authentication packet and create it
*/
pass_size_len = ngx_redislog_size_len(value[3].len);

peer->password.len = sizeof(NGX_REDIS_AUTH)-1 + 1 + pass_size_len
+ sizeof(CRLF)-1 + value[3].len + sizeof(CRLF)-1;

peer->password.data = ngx_palloc(cf->pool, peer->password.len);
if(peer->password.data == NULL) {
return NGX_CONF_ERROR;
}

p = ngx_copy(peer->password.data, NGX_REDIS_AUTH, sizeof(NGX_REDIS_AUTH)-1);

*p++ = '$';

p = ngx_redislog_size(p, p + pass_size_len, value[3].len);

p = ngx_copy(p, CRLF, sizeof(CRLF)-1);

p = ngx_copy(p, value[3].data, value[3].len);

p = ngx_copy(p, CRLF, sizeof(CRLF)-1);

peer->authenticate = 1;
}

peer->write_timeout = 30000;
peer->read_timeout = 30000;
peer->connect_timeout = 30000;
peer->reconnect_timeout = 5000;
peer->flush_timeout = 2000;
peer->ping_timeout = 30000;

peer->bufs.num = 200;
peer->bufs.size = 2048;
peer->recv_buf_size = 1024;

return NGX_CONF_OK;
}

static ngx_int_t
ngx_http_redislog_init(ngx_conf_t *cf)
{
ngx_http_core_main_conf_t *cmcf;
ngx_http_handler_pt *h;

cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);

h = ngx_array_push(&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers);
if (h == NULL) {
return NGX_ERROR;
}

*h = ngx_http_redislog_handler;

return NGX_OK;
}

static ngx_int_t
ngx_redislog_init_process(ngx_cycle_t *cycle)
{
ngx_int_t rc;
ngx_redislog_conf_t *slcf;
ngx_uint_t i;
ngx_redislog_peer_conf_t *pc;
ngx_redislog_peer_t *peer, **ppeer;

slcf = (ngx_redislog_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_redislog_module);

if(slcf->peers == NULL || slcf->peers->nelts == 0) {
return NGX_OK;
}

rc = ngx_array_init(&ngx_redislog_peers, cycle->pool,
slcf->peers->nelts, sizeof(ngx_redislog_peer_t*));

if(rc != NGX_OK) {
return rc;
}

pc = slcf->peers->elts;

for(i = 0; i < slcf->peers->nelts; i++) {
ppeer = ngx_array_push(&ngx_redislog_peers);

if(ppeer == NULL) {
return NGX_ERROR;
}

peer = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_peer_t));

if(peer == NULL) {
return NGX_ERROR;
}

peer->free = ngx_create_chain_of_bufs(cycle->pool, &pc[i].bufs);

if(peer->free == NULL) {
return NGX_ERROR;
}

peer->recv_buf = ngx_create_temp_buf(cycle->pool, pc[i].recv_buf_size);

if(peer->recv_buf == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

*ppeer = peer;

peer->pool = cycle->pool;
peer->conf = &pc[i];
peer->log = cycle->log;

peer->reconnect_timeout = pc[i].reconnect_timeout;

ngx_redislog_connect_peer(peer);
}

return NGX_OK;
}





This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at fivefilters.org/content-only/faq.php#publishers.


Комментариев нет:

Отправить комментарий