/*
* Copyright (C) 2012 Valery Kholodkov
* 2014 Alexander V Makkoveev
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <nginx.h>
#define NGX_SOCKETLOG_FACILITY_LOCAL7 23
#define NGX_SOCKETLOG_SEVERITY_INFO 6
#define NGX_REDIS_APPEND "*3" CRLF "$6" CRLF "APPEND" CRLF
#define NGX_REDIS_AUTH "*2" CRLF "$4" CRLF "AUTH" CRLF
//#define NGX_DEF_FORMAT "combined"
#define NGX_DEF_FORMAT "main"
#define IF_DEBUG 0
#define IF_DEBUG_2 0
//*****************************************************************************
typedef struct ngx_http_log_op_s ngx_http_log_op_t;
typedef u_char *(*ngx_http_log_op_run_pt) (ngx_http_request_t *r, u_char *buf,
ngx_http_log_op_t *op);
typedef size_t (*ngx_http_log_op_getlen_pt) (ngx_http_request_t *r,
uintptr_t data);
struct ngx_redislog_peer;
typedef void (*ngx_redislog_send_handler_pt)(struct ngx_redislog_peer*);
struct ngx_http_log_op_s {
size_t len;
ngx_http_log_op_getlen_pt getlen;
ngx_http_log_op_run_pt run;
uintptr_t data;
};
typedef struct {
ngx_str_t name;
#if defined nginx_version && nginx_version >= 7018
ngx_array_t *flushes;
#endif
ngx_array_t *ops; /* array of ngx_http_log_op_t */
} ngx_http_log_fmt_t;
typedef struct {
ngx_array_t formats; /* array of ngx_http_log_fmt_t */
ngx_uint_t combined_used; /* unsigned combined_used:1 */
} ngx_http_log_main_conf_t;
typedef struct {
ngx_str_t name;
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_msec_t write_timeout;
ngx_msec_t read_timeout;
ngx_msec_t connect_timeout;
ngx_msec_t reconnect_timeout;
ngx_msec_t flush_timeout;
ngx_msec_t ping_timeout;
ngx_bufs_t bufs;
size_t recv_buf_size;
ngx_str_t password;
unsigned authenticate:1;
} ngx_redislog_peer_conf_t;
typedef struct {
ngx_array_t *peers;
} ngx_redislog_conf_t;
typedef struct ngx_redislog_peer {
ngx_redislog_peer_conf_t *conf;
ngx_peer_connection_t conn;
ngx_event_t reconnect_timer;
ngx_event_t flush_timer;
ngx_event_t ping_timer;
ngx_log_t *log;
ngx_pool_t *pool;
ngx_chain_t *busy;
ngx_chain_t *free;
ngx_buf_t *recv_buf;
ngx_uint_t discarded;
ngx_uint_t reconnect_timeout;
ngx_uint_t num_queued;
ngx_uint_t state;
u_char *password_pos;
ngx_redislog_send_handler_pt send_handler;
unsigned connecting:1;
unsigned authenticated:1;
unsigned flush_timer_set:1;
} ngx_redislog_peer_t;
typedef struct {
ngx_str_t peer_name;
ngx_uint_t peer_idx;
ngx_http_log_fmt_t *format;
ngx_http_complex_value_t *key;
ngx_str_t command;
ngx_str_t arg1;
//***
//ngx_str_t arg_num;
//***
ngx_http_complex_value_t *_if;
ngx_http_complex_value_t *ifnot;
unsigned has_arg1;
} ngx_http_redislog_t;
typedef struct {
ngx_array_t *logs; /* array of ngx_http_redislog_t */
unsigned off;
} ngx_http_redislog_conf_t;
static ngx_array_t ngx_redislog_peers;
static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p);
static void ngx_http_redislog_append(ngx_redislog_peer_t *p, u_char *buf, size_t len);
static void ngx_http_redislog_send(ngx_redislog_peer_t *p);
static void ngx_redislog_flush_handler(ngx_event_t*);
static u_char *ngx_redislog_size(u_char*, u_char*, size_t);
static size_t ngx_redislog_size_len(size_t);
static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t*, ngx_buf_t*);
static void ngx_redislog_read_handler(ngx_event_t *rev);
static void ngx_redislog_idle_read_handler(ngx_event_t *rev);
static char *ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static void *ngx_http_redislog_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent,
void *child);
static ngx_int_t ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static void *ngx_redislog_create_conf(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_redislog_add_variables(ngx_conf_t *cf);
static ngx_int_t ngx_http_redislog_init(ngx_conf_t *cf);
static ngx_int_t ngx_redislog_init_process(ngx_cycle_t *cycle);
static ngx_command_t ngx_http_redislog_commands[] = {
{ ngx_string("access_redislog"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_SIF_CONF|NGX_HTTP_LIF_CONF
|NGX_HTTP_LMT_CONF|NGX_CONF_TAKE1234,
ngx_http_redislog_set_log,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_http_module_t ngx_http_redislog_module_ctx = {
ngx_http_redislog_add_variables, /* preconfiguration */
ngx_http_redislog_init, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
ngx_http_redislog_create_loc_conf, /* create location configration */
ngx_http_redislog_merge_loc_conf /* merge location configration */
};
extern ngx_module_t ngx_http_log_module;
ngx_module_t ngx_http_redislog_module = {
NGX_MODULE_V1,
&ngx_http_redislog_module_ctx, /* module context */
ngx_http_redislog_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_command_t ngx_redislog_commands[] = {
{ ngx_string("redislog"),
NGX_MAIN_CONF|NGX_CONF_TAKE23,
ngx_http_redislog_command,
0,
0,
NULL },
ngx_null_command
};
static ngx_core_module_t ngx_redislog_module_ctx = {
ngx_string("redislog"),
ngx_redislog_create_conf,
NULL
};
ngx_module_t ngx_core_redislog_module = {
NGX_MODULE_V1,
&ngx_redislog_module_ctx, /* module context */
ngx_redislog_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
ngx_redislog_init_process, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_http_variable_t ngx_http_redislog_variables[] = {
{ ngx_string("redislog_yyyy"), NULL, ngx_http_redislog_yyyy_variable, 0,
0, 0 },
{ ngx_string("redislog_yyyymm"), NULL, ngx_http_redislog_yyyymm_variable, 0,
0, 0 },
{ ngx_string("redislog_yyyymmdd"), NULL, ngx_http_redislog_yyyymmdd_variable, 0,
0, 0 },
{ ngx_string("redislog_yyyymmddhh"), NULL, ngx_http_redislog_yyyymmddhh_variable, 0,
0, 0 },
{ ngx_null_string, NULL, NULL, 0, 0, 0 }
};
//-----------------------------------------------------------------------------
ngx_int_t
ngx_http_redislog_handler(ngx_http_request_t *r)
{
u_char *line, *p;
size_t len, command_size_len, arg1_size_len, record_len, key_size_len, record_size_len;
ngx_uint_t i, l;
ngx_str_t key, _if, ifnot;
ngx_http_redislog_t *log;
ngx_http_log_op_t *op;
ngx_http_redislog_conf_t *slcf;
time_t time;
ngx_tm_t tm;
ngx_redislog_peer_t **peer;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http redislog handler");
slcf = ngx_http_get_module_loc_conf(r, ngx_http_redislog_module);
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"redislog conf=%p, off=%ud, logs=%p", slcf, slcf->off, slcf->logs);
if(slcf->off || slcf->logs == NULL) {
return NGX_OK;
}
time = ngx_time();
ngx_gmtime(time, &tm);
log = slcf->logs->elts;
for (l = 0; l < slcf->logs->nelts; l++) {
#if defined nginx_version && nginx_version >= 7018
ngx_http_script_flush_no_cacheable_variables(r, log[l].format->flushes);
#endif
len = 0;
op = log[l].format->ops->elts;
for (i = 0; i < log[l].format->ops->nelts; i++) {
if (op[i].len == 0) {
len += op[i].getlen(r, op[i].data);
} else {
len += op[i].len;
}
}
if(log[l].ifnot != NULL) {
if(ngx_http_complex_value(r, log[l].ifnot, &ifnot) != NGX_OK) {
return NGX_ERROR;
}
if(ifnot.len && (ifnot.len != 1 || ifnot.data[0] != '0')) {
continue;
}
}
if(log[l]._if != NULL) {
if(ngx_http_complex_value(r, log[l]._if, &_if) != NGX_OK) {
return NGX_ERROR;
}
if(!_if.len || (_if.len == 1 && _if.data[0] == '0')) {
continue;
}
}
if(ngx_http_complex_value(r, log[l].key, &key) != NGX_OK) {
return NGX_ERROR;
}
command_size_len = ngx_redislog_size_len(log[l].command.len);
key_size_len = ngx_redislog_size_len(key.len);
if(log[l].arg1.len) {
arg1_size_len = ngx_redislog_size_len(log[l].arg1.len);
}
else {
arg1_size_len = 0;
}
len += 2 + sizeof(CRLF) - 1 + 1 + command_size_len + 1 + sizeof(CRLF) - 1
+ command_size_len + sizeof(CRLF) - 1 + log[l].command.len + sizeof(CRLF) - 1
+ key_size_len + sizeof(CRLF) - 1 + key.len + sizeof(CRLF) - 1
+ 1 + NGX_OFF_T_LEN + sizeof(CRLF) - 1 + sizeof(CRLF) - 1;
if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
len++;
}
if(log[l].has_arg1) {
len += arg1_size_len + sizeof(CRLF) - 1 + log[l].arg1.len + sizeof(CRLF) - 1;
}
#if defined nginx_version && nginx_version >= 7003
line = ngx_pnalloc(r->pool, len);
#else
line = ngx_palloc(r->pool, len);
#endif
if (line == NULL) {
return NGX_ERROR;
}
p = line;
for(i = 0; i < log[l].format->ops->nelts; i++) {
p = op[i].run(r, p, &op[i]);
}
if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
*p++ = LF;
}
record_len = p - line;
record_size_len = ngx_redislog_size_len(record_len);
p = line;
/*
* Redis append to time series command
*3
$6
APPEND
$nnn
key
$nnn
log record
*/
*p++ = '*';
//*p++ = log[l].has_arg1 ? '4' : '3';
/* SETEX */
if(ngx_strncmp(log[l].command.data, "SETEX", 5) == 0)
*p++ = '4';
else
*p++ = log[l].has_arg1 ? '5' : '3';
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
*p++ = '$';
//*p++ = '0';
p = ngx_redislog_size(p, p + command_size_len, log[l].command.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, log[l].command.data, log[l].command.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "EVALSHA", 7) == 0) {
*p++ = '$';
p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
*p++ = '$';
p = ngx_redislog_size(p, p + 1, 1);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, "1", 1);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
}
/*
*p++ = '$';
*p++ = '1';
*p++ = LF;
*p++ = '0';
*p++ = LF;
*/
*p++ = '$';
p = ngx_redislog_size(p, p + key_size_len, key.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, key.data, key.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "SETEX", 5) == 0) {
*p++ = '$';
p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
}
*p++ = '$';
p = ngx_redislog_size(p, p + record_size_len, record_len);
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
for(i = 0; i < log[l].format->ops->nelts; i++) {
p = op[i].run(r, p, &op[i]);
}
if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
*p++ = LF;
}
p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
peer = ngx_redislog_peers.elts;
peer += log[l].peer_idx;
ngx_http_redislog_append(*peer, line, p - line);
//ngx_http_redislog_append(*peer, line, p - line + 5);
}
return NGX_OK;
}
static u_char *ngx_redislog_size(u_char *p, u_char *q, size_t sz)
{
u_char *end = q;
while(p != q) {
*--q = (sz % 10 + '0');
sz /= 10;
}
return end;
}
static size_t ngx_redislog_size_len(size_t sz)
{
size_t len = 0;
while(sz != 0) {
sz /= 10;
len++;
}
return len;
}
static u_char*
ngx_redislog_buf_append(ngx_buf_t *buf, u_char *p, size_t *len)
{
size_t remaining = buf->end - buf->last;
if(remaining > *len) {
remaining = *len;
}
buf->last = ngx_copy(buf->last, p, remaining);
*len -= remaining;
return p + remaining;
}
static void
ngx_http_redislog_append(ngx_redislog_peer_t *peer, u_char *buf, size_t len)
{
u_char *p;
ngx_chain_t *last, *q;
size_t remaining;
ngx_uint_t num_busy = 0;
/*
* Find last busy buffer
*/
last = peer->busy;
while(last != NULL && last->next != NULL) {
last = last->next;
}
/*
* See if message fits into remaining space
*/
remaining = (last != NULL ? last->buf->end - last->buf->last : 0);
q = peer->free;
while(remaining <= len && q != NULL) {
remaining += (q->buf->end - q->buf->last);
q = q->next;
}
/*
* No memory for this message, discard it
*/
if(remaining < len) {
peer->discarded++;
return;
}
/*
* Append message to the buffers
*/
if(last != NULL) {
p = ngx_redislog_buf_append(last->buf, buf, &len);
}
else {
p = buf;
}
while(peer->free != NULL && len != 0) {
q = peer->free;
p = ngx_redislog_buf_append(q->buf, p, &len);
peer->free = peer->free->next;
q->next = NULL;
if(last == NULL) {
peer->busy = q;
}
else {
last->next = q;
}
last = q;
}
peer->num_queued++;
q = peer->busy;
while(q != NULL) {
num_busy++;
q = q->next;
}
if(!peer->flush_timer_set) {
peer->flush_timer.handler = ngx_redislog_flush_handler;
peer->flush_timer.data = peer;
peer->flush_timer.log = peer->conn.log;
ngx_add_timer(&peer->flush_timer, peer->conf->flush_timeout);
peer->flush_timer_set = 1;
}
if(num_busy >= 2) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, peer->conn.connection->log, 0,
"redislog num queued is now %ud, set read handler", peer->num_queued);
peer->conn.connection->read->handler = ngx_redislog_read_handler;
/*
* Send it
*/
ngx_http_redislog_send(peer);
}
}
static void
ngx_http_redislog_send(ngx_redislog_peer_t *p)
{
ngx_chain_t *written;
ngx_connection_t *c;
ngx_chain_t *dummy = NULL;
c = p->conn.connection;
if(c == NULL || c->fd == -1) {
return;
}
if(!c->write->ready) {
return;
}
if(p->flush_timer_set) {
ngx_del_timer(&p->flush_timer);
p->flush_timer_set = 0;
}
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog send handler");
if(p->busy != NULL) {
written = c->send_chain(c, p->busy, 0);
if(written == NGX_CHAIN_ERROR) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"redislog write error");
ngx_close_connection(c);
ngx_redislog_reconnect_peer(p);
return;
}
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &dummy, 0);
if(written != NULL) {
if(!c->write->ready && !c->write->timer_set) {
ngx_add_timer(c->write, p->conf->write_timeout);
}
if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_close_connection(c);
ngx_redislog_reconnect_peer(p);
}
return;
}
}
}
static void ngx_redislog_auth_send(ngx_redislog_peer_t *peer)
{
ngx_connection_t *c;
ngx_str_t *password;
ssize_t n;
c = peer->conn.connection;
if(c == NULL || c->fd == -1) {
return;
}
password = &peer->conf->password;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog auth send handler");
n = c->send(c, peer->password_pos, password->len - (peer->password_pos - password->data));
if(n > 0) {
peer->password_pos += n;
if(peer->password_pos >= (password->data + password->len)) {
peer->send_handler = ngx_http_redislog_send;
ngx_http_redislog_send(peer);
}
return;
}
if(n == NGX_ERROR) {
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
return;
}
if(!c->write->timer_set) {
ngx_add_timer(c->write, peer->conf->write_timeout);
}
if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
return;
}
}
static void ngx_redislog_flush_handler(ngx_event_t *ev)
{
ngx_redislog_peer_t *peer = ev->data;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, peer->log, 0,
"redislog flush handler, set read handler");
peer->flush_timer_set = 0;
peer->conn.connection->read->handler = ngx_redislog_read_handler;
ngx_http_redislog_send(peer);
}
static void ngx_redislog_connected_handler(ngx_redislog_peer_t *peer)
{
ngx_connection_t *c;
c = peer->conn.connection;
ngx_del_timer(c->read);
/*
* Once the connection has been established, we need to
* reset the reconnect timeout to it's initial value
*/
peer->reconnect_timeout = peer->conf->reconnect_timeout;
if(peer->discarded != 0) {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"redislog peer \"%V\" discarded %ui messages",
&peer->conf->name, peer->discarded);
peer->discarded = 0;
}
}
static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t *peer, ngx_buf_t *buf)
{
u_char *p, *q;
p = buf->pos;
q = buf->last;
while(p != q) {
if(!peer->state) {
if(*p == '+' || *p == '-' || *p == ':') {
if(peer->conf->authenticate && !peer->authenticated) {
if(*p == '-') {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"redis authentication failure");
return NGX_ERROR;
}
peer->authenticated = 1;
}
else {
if(peer->num_queued) {
peer->num_queued--;
}
else {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"too many responses from redis");
return NGX_ERROR;
}
}
}
if(*p == '-') {
ngx_log_error(NGX_LOG_ERR, peer->log, 0,
"redislog error");
}
peer->state++;
}
else {
if(*p == LF) {
peer->state = 0;
}
else if(peer->state == 1) {
if(*p == CR) {
peer->state++;
}
}
}
p++;
}
buf->pos = p;
return NGX_OK;
}
static void ngx_redislog_read_handler(ngx_event_t *rev)
{
ngx_connection_t *c;
ngx_redislog_peer_t *peer;
ngx_buf_t *buf;
ssize_t n, size;
ngx_int_t rc;
c = rev->data;
peer = c->data;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
"redislog read handler");
if(c->read->timer_set) {
ngx_del_timer(c->read);
}
if(rev->timedout || c->error || c->close) {
if(rev->timedout) {
ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
"redislog peer timed out");
}
if(rev->error) {
ngx_log_error(NGX_LOG_ERR, rev->log, 0,
"redislog peer connection error");
}
ngx_close_connection(c);
if(!c->close) {
ngx_redislog_reconnect_peer(peer);
}
return;
}
buf = peer->recv_buf;
for( ;; ) {
for( ;; ) {
if(buf->last == buf->end) {
break;
}
size = buf->end - buf->last;
n = c->recv(c, buf->last, size);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog peer recv %z", n);
if(n == NGX_AGAIN) {
break;
}
if(n == 0) {
if(peer->num_queued != 0) {
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"redis closed the connection prematurely");
}
}
if(n == 0 || n == NGX_ERROR) {
c->error = 1;
goto reconnect;
}
buf->last += n;
}
rc = ngx_redislog_process_buf(peer, buf);
if(rc != NGX_OK) {
goto reconnect;
}
buf->pos = buf->last = buf->start;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"redislog num queued is now %ud", peer->num_queued);
if(peer->num_queued == 0) {
break;
}
if (!c->read->ready) {
if(ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto reconnect;
}
if(!c->read->timer_set) {
ngx_add_timer(c->read, peer->conf->read_timeout);
}
return;
}
}
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
"redislog set idle read handler");
c->read->handler = ngx_redislog_idle_read_handler;
return;
reconnect:
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
}
static void ngx_redislog_idle_read_handler(ngx_event_t *rev)
{
ngx_connection_t *c;
ngx_redislog_peer_t *peer;
int n;
char buf[1];
ngx_err_t err;
c = rev->data;
peer = c->data;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
"redislog idle read handler");
if(rev->timedout || c->error || c->close) {
if(rev->timedout) {
ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
"redislog peer timed out");
}
if(rev->error) {
ngx_log_error(NGX_LOG_ERR, rev->log, 0,
"redislog peer connection error");
}
ngx_close_connection(c);
if(!c->close) {
ngx_redislog_reconnect_peer(peer);
}
return;
}
#if (NGX_HAVE_KQUEUE)
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
if(!rev->pending_eof) {
goto no_error;
}
rev->eof = 1;
c->error = 1;
if(rev->kq_errno) {
rev->error = 1;
}
goto reconnect;
}
#endif
n = recv(c->fd, buf, 1, MSG_PEEK);
err = ngx_socket_errno;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, rev->log, err,
"redislog recv(): %d", n);
if(n > 0) {
goto no_error;
}
if(n == -1) {
if(err == NGX_EAGAIN) {
goto no_error;
}
rev->error = 1;
}
else {
err = 0;
}
rev->eof = 1;
c->error = 1;
ngx_log_error(NGX_LOG_ERR, rev->log, err,
"redislog connection error");
#if (NGX_HAVE_KQUEUE)
reconnect:
#endif
ngx_close_connection(c);
ngx_redislog_reconnect_peer(peer);
return;
no_error:
if(peer->connecting) {
ngx_redislog_connected_handler(peer);
peer->connecting = 0;
}
}
static void ngx_redislog_write_handler(ngx_event_t *wev)
{
ngx_connection_t *c;
ngx_redislog_peer_t *peer;
c = wev->data;
peer = c->data;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, wev->log, 0,
"redislog write handler");
if(wev->timedout || c->error || c->close) {
if(wev->timedout) {
ngx_log_error(NGX_LOG_ERR, wev->log, NGX_ETIMEDOUT,
"redislog peer timed out");
}
if(wev->error) {
ngx_log_error(NGX_LOG_ERR, wev->log, 0,
"redislog peer connection error");
}
ngx_close_connection(c);
if(!c->close) {
ngx_redislog_reconnect_peer(peer);
}
return;
}
if(peer->connecting) {
ngx_redislog_connected_handler(peer);
peer->connecting = 0;
}
if(c->write->timer_set) {
ngx_del_timer(c->write);
}
peer->send_handler(peer);
}
static ngx_int_t ngx_redislog_connect_peer(ngx_redislog_peer_t *peer)
{
ngx_int_t rc;
ngx_log_error(NGX_LOG_INFO, peer->log, 0,
"redislog connect peer \"%V\"", &peer->conf->name);
peer->conn.sockaddr = peer->conf->sockaddr;
peer->conn.socklen = peer->conf->socklen;
peer->conn.name = &peer->conf->name;
peer->conn.get = ngx_event_get_peer;
peer->conn.log = peer->log;
peer->conn.log_error = NGX_ERROR_ERR;
rc = ngx_event_connect_peer(&peer->conn);
if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) {
if(peer->conn.connection) {
ngx_close_connection(peer->conn.connection);
}
return NGX_ERROR;
}
peer->conn.connection->data = peer;
peer->conn.connection->pool = peer->pool;
peer->password_pos = peer->conf->password.data;
peer->authenticated = 0;
peer->conn.connection->read->handler = ngx_redislog_read_handler;
peer->conn.connection->write->handler = ngx_redislog_write_handler;
peer->send_handler = peer->conf->authenticate ? ngx_redislog_auth_send
: ngx_http_redislog_send;
ngx_add_timer(peer->conn.connection->read, peer->conf->connect_timeout);
peer->connecting = 1;
return NGX_OK;
}
static void ngx_redislog_connect_handler(ngx_event_t *ev)
{
ngx_int_t rc;
ngx_redislog_peer_t *peer = ev->data;
rc = ngx_redislog_connect_peer(peer);
if(rc != NGX_OK) {
ngx_redislog_reconnect_peer(peer);
}
}
static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p)
{
p->conn.connection = NULL;
p->reconnect_timer.handler = ngx_redislog_connect_handler;
p->reconnect_timer.data = p;
p->reconnect_timer.log = p->conn.log;
ngx_add_timer(&p->reconnect_timer, p->reconnect_timeout);
p->reconnect_timeout *= 2;
if(p->discarded != 0) {
ngx_log_error(NGX_LOG_ERR, p->log, 0,
"redislog peer \"%V\" discarded %ui messages",
&p->conf->name, p->discarded);
p->discarded = 0;
}
}
static ngx_int_t
ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line;
line = ngx_palloc(r->pool, sizeof("yyyy")-1);
if(line == NULL) {
return NGX_ERROR;
}
(void) ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;
v->data = line;
v->len = sizeof("yyyy")-1;
return NGX_OK;
}
static ngx_int_t
ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line, *p;
line = ngx_palloc(r->pool, sizeof("yyyymm")-1);
if(line == NULL) {
return NGX_ERROR;
}
p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
(void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;
v->data = line;
v->len = sizeof("yyyymm")-1;
return NGX_OK;
}
static ngx_int_t
ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line, *p;
line = ngx_palloc(r->pool, sizeof("yyyymmdd")-1);
if(line == NULL) {
return NGX_ERROR;
}
p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
(void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);
v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;
v->data = line;
v->len = sizeof("yyyymmdd")-1;
return NGX_OK;
}
static ngx_int_t
ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
u_char *line, *p;
line = ngx_palloc(r->pool, sizeof("yyyymmddhh")-1);
if(line == NULL) {
return NGX_ERROR;
}
p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);
(void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 11, sizeof("hh")-1);
v->valid = 1;
v->no_cacheable = 1;
v->not_found = 0;
v->data = line;
v->len = sizeof("yyyymmddhh")-1;
return NGX_OK;
}
static void *
ngx_http_redislog_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_redislog_conf_t *conf;
conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_redislog_conf_t));
if (conf == NULL) {
return NGX_CONF_ERROR;
}
return conf;
}
static char *
ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_redislog_conf_t *prev = parent;
ngx_http_redislog_conf_t *conf = child;
if(conf->logs || conf->off) {
return NGX_CONF_OK;
}
conf->logs = prev->logs;
conf->off = prev->off;
return NGX_CONF_OK;
}
static void *
ngx_redislog_create_conf(ngx_cycle_t *cycle)
{
ngx_redislog_conf_t *slcf;
slcf = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_conf_t));
if(slcf == NULL) {
return NULL;
}
return slcf;
}
static ngx_int_t
ngx_http_redislog_add_variables(ngx_conf_t *cf)
{
ngx_http_variable_t *var, *v;
for (v = ngx_http_redislog_variables; v->name.len; v++) {
var = ngx_http_add_variable(cf, &v->name, v->flags);
if (var == NULL) {
return NGX_ERROR;
}
var->get_handler = v->get_handler;
var->data = v->data;
}
return NGX_OK;
}
static ngx_int_t
ngx_http_redislog_find_peer_by_name(ngx_conf_t *cf, ngx_str_t *name)
{
ngx_redislog_conf_t *slcf;
ngx_redislog_peer_conf_t *pc;
ngx_uint_t i;
slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);
pc = slcf->peers->elts;
for(i = 0; i < slcf->peers->nelts; i++) {
if(pc[i].name.len == name->len
&& ngx_strncmp(pc[i].name.data, name->data, name->len) == 0)
{
return i;
}
}
return NGX_DECLINED;
}
static char *
ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_redislog_conf_t *slcf = conf;
ngx_uint_t i;
ngx_str_t *value, name, command, arg1, _if;
//ngx_str_t arg_num;
ngx_http_redislog_t *log;
ngx_http_log_fmt_t *fmt;
ngx_http_log_main_conf_t *lmcf;
ngx_int_t rc;
ngx_http_compile_complex_value_t ccv;
unsigned format_set;
format_set = 0;
value = cf->args->elts;
if (ngx_strcmp(value[1].data, "off") == 0) {
slcf->off = 1;
return NGX_CONF_OK;
}
slcf->off = 0;
if (slcf->logs == NULL) {
slcf->logs = ngx_array_create(cf->pool, 2, sizeof(ngx_http_redislog_t));
if (slcf->logs == NULL) {
return NGX_CONF_ERROR;
}
}
lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_log_module);
if(lmcf == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"redislog module requires log module to be compiled in");
return NGX_CONF_ERROR;
}
log = ngx_array_push(slcf->logs);
if (log == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(log, sizeof(ngx_http_redislog_t));
log->peer_name = value[1];
rc = ngx_http_redislog_find_peer_by_name(cf, &log->peer_name);
if(rc == NGX_DECLINED) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"redislog peer %V is not defined", &log->peer_name);
return NGX_CONF_ERROR;
}
log->peer_idx = rc;
/*
* Create and compile key
*/
log->key = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
if(log->key == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
ccv.cf = cf;
ccv.value = &value[2];
ccv.complex_value = log->key;
if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
return NGX_CONF_ERROR;
}
ngx_str_set(&command, "APPEND");
//ngx_str_set(&arg1, "");
ngx_str_set(&arg1, "test1");
//ngx_str_set(&arg_num, "0");
ngx_str_set(&name, "main");
if (cf->args->nelts >= 4) {
for (i = 3; i < cf->args->nelts; i++) {
if (ngx_strncmp(value[i].data, "format=", 7) == 0) {
format_set = 1;
name = value[i];
name.len -= 7;
name.data += 7;
if (ngx_strcmp(name.data, "combined") == 0) {
lmcf->combined_used = 1;
}
continue;
}
if (ngx_strncmp(value[i].data, "command=", 8) == 0) {
command = value[i];
command.len -= 8;
command.data += 8;
continue;
}
if (ngx_strncmp(value[i].data, "arg1=", 5) == 0) {
arg1 = value[i];
arg1.len -= 5;
arg1.data += 5;
log->has_arg1 = 1;
continue;
}
if (ngx_strncmp(value[i].data, "if=", 3) == 0) {
if(log->_if != NULL) {
continue;
}
_if = value[i];
_if.len -= 3;
_if.data += 3;
/*
* Create and compile if script
*/
log->_if = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
if(log->_if == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
ccv.cf = cf;
ccv.value = &_if;
ccv.complex_value = log->_if;
if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
return NGX_CONF_ERROR;
}
continue;
}
if (ngx_strncmp(value[i].data, "ifnot=", 6) == 0) {
if(log->ifnot != NULL) {
continue;
}
_if = value[i];
_if.len -= 6;
_if.data += 6;
/*
* Create and compile if script
*/
log->ifnot = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
if(log->ifnot == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
ccv.cf = cf;
ccv.value = &_if;
ccv.complex_value = log->ifnot;
if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
return NGX_CONF_ERROR;
}
continue;
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid parameter \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
}
if(!format_set) {
name.len = sizeof(NGX_DEF_FORMAT) - 1;
name.data = (u_char *) NGX_DEF_FORMAT;
lmcf->combined_used = 1;
}
log->command = command;
if(log->has_arg1) {
log->arg1 = arg1;
}
fmt = lmcf->formats.elts;
for (i = 0; i < lmcf->formats.nelts; i++) {
if (fmt[i].name.len == name.len
&& ngx_strcasecmp(fmt[i].name.data, name.data) == 0)
{
log->format = &fmt[i];
goto done;
}
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"unknown log format \"%V\"", &name);
return NGX_CONF_ERROR;
done:
return NGX_CONF_OK;
}
static char *
ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_str_t *value;
ngx_redislog_conf_t *slcf;
ngx_url_t u;
ngx_redislog_peer_conf_t *peer;
u_char *p;
size_t pass_size_len;
slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);
value = cf->args->elts;
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[2];
u.default_port = 6379;
u.no_resolve = 0;
if(ngx_parse_url(cf->pool, &u) != NGX_OK) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%V: %s", &u.host, u.err);
return NGX_CONF_ERROR;
}
if(slcf->peers == NULL) {
slcf->peers = ngx_array_create(cf->pool, 2, sizeof(ngx_redislog_peer_conf_t));
if (slcf->peers == NULL) {
return NGX_CONF_ERROR;
}
}
peer = ngx_array_push(slcf->peers);
if(peer == NULL) {
return NGX_CONF_ERROR;
}
peer->name = value[1];
peer->sockaddr = u.addrs[0].sockaddr;
peer->socklen = u.addrs[0].socklen;
if(cf->args->nelts >= 4) {
/*
* Alloc space for authentication packet and create it
*/
pass_size_len = ngx_redislog_size_len(value[3].len);
peer->password.len = sizeof(NGX_REDIS_AUTH)-1 + 1 + pass_size_len
+ sizeof(CRLF)-1 + value[3].len + sizeof(CRLF)-1;
peer->password.data = ngx_palloc(cf->pool, peer->password.len);
if(peer->password.data == NULL) {
return NGX_CONF_ERROR;
}
p = ngx_copy(peer->password.data, NGX_REDIS_AUTH, sizeof(NGX_REDIS_AUTH)-1);
*p++ = '$';
p = ngx_redislog_size(p, p + pass_size_len, value[3].len);
p = ngx_copy(p, CRLF, sizeof(CRLF)-1);
p = ngx_copy(p, value[3].data, value[3].len);
p = ngx_copy(p, CRLF, sizeof(CRLF)-1);
peer->authenticate = 1;
}
peer->write_timeout = 30000;
peer->read_timeout = 30000;
peer->connect_timeout = 30000;
peer->reconnect_timeout = 5000;
peer->flush_timeout = 2000;
peer->ping_timeout = 30000;
peer->bufs.num = 200;
peer->bufs.size = 2048;
peer->recv_buf_size = 1024;
return NGX_CONF_OK;
}
static ngx_int_t
ngx_http_redislog_init(ngx_conf_t *cf)
{
ngx_http_core_main_conf_t *cmcf;
ngx_http_handler_pt *h;
cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);
h = ngx_array_push(&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers);
if (h == NULL) {
return NGX_ERROR;
}
*h = ngx_http_redislog_handler;
return NGX_OK;
}
static ngx_int_t
ngx_redislog_init_process(ngx_cycle_t *cycle)
{
ngx_int_t rc;
ngx_redislog_conf_t *slcf;
ngx_uint_t i;
ngx_redislog_peer_conf_t *pc;
ngx_redislog_peer_t *peer, **ppeer;
slcf = (ngx_redislog_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_redislog_module);
if(slcf->peers == NULL || slcf->peers->nelts == 0) {
return NGX_OK;
}
rc = ngx_array_init(&ngx_redislog_peers, cycle->pool,
slcf->peers->nelts, sizeof(ngx_redislog_peer_t*));
if(rc != NGX_OK) {
return rc;
}
pc = slcf->peers->elts;
for(i = 0; i < slcf->peers->nelts; i++) {
ppeer = ngx_array_push(&ngx_redislog_peers);
if(ppeer == NULL) {
return NGX_ERROR;
}
peer = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_peer_t));
if(peer == NULL) {
return NGX_ERROR;
}
peer->free = ngx_create_chain_of_bufs(cycle->pool, &pc[i].bufs);
if(peer->free == NULL) {
return NGX_ERROR;
}
peer->recv_buf = ngx_create_temp_buf(cycle->pool, pc[i].recv_buf_size);
if(peer->recv_buf == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
*ppeer = peer;
peer->pool = cycle->pool;
peer->conf = &pc[i];
peer->log = cycle->log;
peer->reconnect_timeout = pc[i].reconnect_timeout;
ngx_redislog_connect_peer(peer);
}
return NGX_OK;
}
This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at fivefilters.org/content-only/faq.php#publishers.
Комментариев нет:
Отправить комментарий