From 57aba47f6e51e067f593fb32ccc43c015ae63d8d Mon Sep 17 00:00:00 2001 From: Sergey Safarov Date: Mon, 9 Mar 2015 10:55:28 +0300 Subject: [PATCH] FS-7354: Filter feature ported from mod_event_socket to mod_erlang_event --- .../mod_erlang_event/handle_msg.c | 83 +++++++++++++++++++ .../mod_erlang_event/mod_erlang_event.c | 65 +++++++++++++++ .../mod_erlang_event/mod_erlang_event.h | 2 + 3 files changed, 150 insertions(+) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 0302a320f4..8461579a60 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -313,6 +313,87 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu return SWITCH_STATUS_SUCCESS; } +static switch_status_t handle_msg_filter(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf) +{ + char atom[MAXATOMLEN]; + char reply[MAXATOMLEN]= ""; + char *header_name = NULL; + char *header_val = NULL; + + if (arity == 1) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } else { + int i = 0; + + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "filter_command_processing_log"); + ei_x_encode_list_header(rbuf, arity - 1); + + switch_thread_rwlock_wrlock(listener->event_rwlock); + + switch_mutex_lock(listener->filter_mutex); + if (!listener->filters) { + switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE); + switch_clear_flag(listener->filters, EF_UNIQ_HEADERS); + } + + for (i = 1; i < arity; i++) { + if (!ei_decode_atom(buf->buff, &buf->index, atom)) { + header_name=atom; + + while (header_name && *header_name && *header_name == ' ') + header_name++; + + if ((header_val = strchr(atom, ' '))) { + *header_val++ = '\0'; + } + + if (!strcasecmp(header_name, "delete") && header_val) { + header_name = header_val; + if ((header_val = strchr(header_name, ' '))) { + *header_val++ = '\0'; + } + if (!strcasecmp(header_name, "all")) { + switch_event_destroy(&listener->filters); + switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE); + } else { + switch_event_del_header_val(listener->filters, header_name, header_val); + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter deleted. [%s]=[%s]", header_name, switch_str_nil(header_val)); + ei_x_encode_tuple_header(rbuf, 3); + _ei_x_encode_string(rbuf, "deleted"); + _ei_x_encode_string(rbuf, header_name); + _ei_x_encode_string(rbuf, switch_str_nil(header_val)); + } else if (header_val) { + if (!strcasecmp(header_name, "add")) { + header_name = header_val; + if ((header_val = strchr(header_name, ' '))) { + *header_val++ = '\0'; + } + } + switch_event_add_header_string(listener->filters, SWITCH_STACK_BOTTOM, header_name, header_val); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter added. [%s]=[%s]", header_name, header_val); + ei_x_encode_tuple_header(rbuf, 3); + _ei_x_encode_string(rbuf, "added"); + _ei_x_encode_string(rbuf, header_name); + _ei_x_encode_string(rbuf, header_val); + } else { + switch_snprintf(reply, MAXATOMLEN, "-ERR invalid syntax"); + ei_x_encode_atom(rbuf, "-ERR invalid syntax"); + } + } + } + + switch_mutex_unlock(listener->filter_mutex); + switch_thread_rwlock_unlock(listener->event_rwlock); + + ei_x_encode_empty_list(rbuf); + } + return SWITCH_STATUS_SUCCESS; +} + static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf) { char atom[MAXATOMLEN]; @@ -975,6 +1056,8 @@ static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg * msg, ret = handle_msg_set_log_level(listener, arity, buf, rbuf); } else if (!strncmp(tupletag, "event", MAXATOMLEN)) { ret = handle_msg_event(listener, arity, buf, rbuf); + } else if (!strncmp(tupletag, "filter", MAXATOMLEN)) { + ret = handle_msg_filter(listener, arity, buf, rbuf); } else if (!strncmp(tupletag, "session_event", MAXATOMLEN)) { ret = handle_msg_session_event(listener, msg, arity, buf, rbuf); } else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) { diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 0220964607..28575f0b8e 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -215,6 +215,70 @@ static void event_handler(switch_event_t *event) } } + if (send) { + switch_mutex_lock(l->filter_mutex); + + if (l->filters && l->filters->headers) { + switch_event_header_t *hp; + const char *hval; + + for (hp = l->filters->headers; hp; hp = hp->next) { + if ((hval = switch_event_get_header(event, hp->name))) { + const char *comp_to = hp->value; + int pos = 1, cmp = 0; + + while (comp_to && *comp_to) { + if (*comp_to == '+') { + pos = 1; + } else if (*comp_to == '-') { + pos = 0; + } else if (*comp_to != ' ') { + break; + } + comp_to++; + } + + if (!(comp_to && *comp_to)) { + if (pos) { + send = 1; + continue; + } else { + send = 0; + break; + } + } + + if (*hp->value == '/') { + switch_regex_t *re = NULL; + int ovector[30]; + cmp = !!switch_regex_perform(hval, comp_to, &re, ovector, sizeof(ovector) / sizeof(ovector[0])); + switch_regex_safe_free(re); + } else { + cmp = !strcasecmp(hval, comp_to); + } + + if (cmp) { + if (pos) { + send = 1; + } else { + send = 0; + break; + } + } else { + if (pos) { + send = 0; + break; + } else { + send = 1; + } + } + } + } + } + + switch_mutex_unlock(l->filter_mutex); + } + switch_thread_rwlock_unlock(l->event_rwlock); if (send) { @@ -1273,6 +1337,7 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) listener->level = SWITCH_LOG_DEBUG; switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool); + switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_thread_rwlock_create(&listener->rwlock, pool); switch_thread_rwlock_create(&listener->event_rwlock, pool); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 719210dca8..967318b1d6 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -126,6 +126,8 @@ struct listener { switch_memory_pool_t *pool; switch_mutex_t *flag_mutex; switch_mutex_t *sock_mutex; + switch_mutex_t *filter_mutex; + switch_event_t *filters; char *ebuf; uint32_t flags; switch_log_level_t level;