This commit is contained in:
Anthony Minessale 2010-07-21 02:46:35 -05:00
parent a7c31e6fe9
commit beec142c5c
2 changed files with 293 additions and 66 deletions

View File

@ -48,6 +48,7 @@ typedef enum {
static outbound_strategy_t default_strategy = NODE_STRATEGY_RINGALL;
static int marker = 1;
typedef struct {
int nelm;
@ -57,6 +58,24 @@ typedef struct {
switch_mutex_t *mutex;
} fifo_queue_t;
static int check_caller_outbound_call(const char *key);
static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
static void del_caller_outbound_call(const char *key);
static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause);
static int check_consumer_outbound_call(const char *key);
static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
static void del_consumer_outbound_call(const char *key);
static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause);
static int check_bridge_call(const char *key);
static void add_bridge_call(const char *key);
static void del_bridge_call(const char *key);
switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool)
{
fifo_queue_t *q;
@ -127,24 +146,34 @@ static int fifo_queue_size(fifo_queue_t *queue)
static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
{
int i;
int i, j;
switch_mutex_lock(queue->mutex);
if (queue->idx == 0) {
switch_mutex_unlock(queue->mutex);
*pop = NULL;
return SWITCH_STATUS_FALSE;
}
if (remove) {
*pop = queue->data[0];
} else {
switch_event_dup(pop, queue->data[0]);
for (j = 0; j < queue->idx; j++) {
const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
if (uuid && !check_caller_outbound_call(uuid)) {
if (remove) {
*pop = queue->data[j];
} else {
switch_event_dup(pop, queue->data[j]);
}
break;
}
}
if (j == queue->idx) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
}
if (remove) {
for (i = 1; i < queue->idx; i++) {
for (i = j+1; i < queue->idx; i++) {
queue->data[i-1] = queue->data[i];
queue->data[i] = NULL;
change_pos(queue->data[i-1], i);
@ -162,10 +191,15 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop,
static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
{
int i, j;
int i, j, force = 0;
switch_mutex_lock(queue->mutex);
if (name && *name == '+') {
name++;
force = 1;
}
if (queue->idx == 0 || zstr(name) || zstr(val)) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
@ -173,7 +207,8 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n
for (j = 0; j < queue->idx; j++) {
const char *j_val = switch_event_get_header(queue->data[j], name);
if (j_val && val && !strcmp(j_val, val)) {
const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
if (j_val && val && !strcmp(j_val, val) && (force || !check_caller_outbound_call(uuid))) {
if (remove) {
*pop = queue->data[j];
@ -502,8 +537,12 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi
}
static struct {
switch_hash_t *orig_hash;
switch_mutex_t *orig_mutex;
switch_hash_t *caller_orig_hash;
switch_hash_t *consumer_orig_hash;
switch_hash_t *bridge_hash;
switch_mutex_t *caller_orig_mutex;
switch_mutex_t *consumer_orig_mutex;
switch_mutex_t *bridge_mutex;
switch_hash_t *fifo_hash;
switch_mutex_t *mutex;
switch_mutex_t *sql_mutex;
@ -520,6 +559,115 @@ static struct {
} globals;
static int check_caller_outbound_call(const char *key)
{
int x = 0;
switch_mutex_lock(globals.caller_orig_mutex);
x = !!switch_core_hash_find(globals.caller_orig_hash, key);
switch_mutex_unlock(globals.caller_orig_mutex);
return x;
}
static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
{
switch_mutex_lock(globals.caller_orig_mutex);
switch_core_hash_insert(globals.caller_orig_hash, key, cancel_cause);
switch_mutex_unlock(globals.caller_orig_mutex);
}
static void del_caller_outbound_call(const char *key)
{
switch_mutex_lock(globals.caller_orig_mutex);
switch_core_hash_delete(globals.caller_orig_hash, key);
switch_mutex_unlock(globals.caller_orig_mutex);
}
static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause)
{
switch_call_cause_t *cancel_cause = NULL;
switch_mutex_lock(globals.caller_orig_mutex);
if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.caller_orig_hash, key))) {
*cancel_cause = cause;
}
switch_mutex_unlock(globals.caller_orig_mutex);
fifo_caller_del(key);
}
static int check_bridge_call(const char *key)
{
int x = 0;
switch_mutex_lock(globals.bridge_mutex);
x = !!switch_core_hash_find(globals.bridge_hash, key);
switch_mutex_unlock(globals.bridge_mutex);
return x;
}
static void add_bridge_call(const char *key)
{
switch_mutex_lock(globals.bridge_mutex);
switch_core_hash_insert(globals.bridge_hash, key, (void *)&marker);
switch_mutex_unlock(globals.bridge_mutex);
}
static void del_bridge_call(const char *key)
{
switch_mutex_lock(globals.bridge_mutex);
switch_core_hash_delete(globals.bridge_hash, key);
switch_mutex_unlock(globals.bridge_mutex);
}
static int check_consumer_outbound_call(const char *key)
{
int x = 0;
switch_mutex_lock(globals.consumer_orig_mutex);
x = !!switch_core_hash_find(globals.consumer_orig_hash, key);
switch_mutex_unlock(globals.consumer_orig_mutex);
return x;
}
static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
{
switch_mutex_lock(globals.consumer_orig_mutex);
switch_core_hash_insert(globals.consumer_orig_hash, key, cancel_cause);
switch_mutex_unlock(globals.consumer_orig_mutex);
}
static void del_consumer_outbound_call(const char *key)
{
switch_mutex_lock(globals.consumer_orig_mutex);
switch_core_hash_delete(globals.consumer_orig_hash, key);
switch_mutex_unlock(globals.consumer_orig_mutex);
}
static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause)
{
switch_call_cause_t *cancel_cause = NULL;
switch_mutex_lock(globals.consumer_orig_mutex);
if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.consumer_orig_hash, key))) {
*cancel_cause = cause;
}
switch_mutex_unlock(globals.consumer_orig_mutex);
}
switch_cache_db_handle_t *fifo_get_db_handle(void)
{
switch_cache_db_connection_options_t options = { {0} };
@ -633,7 +781,7 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", name);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt);
if (atoi(outbound_count) > 0) {
node->has_outbound = 1;
@ -686,6 +834,7 @@ struct callback_helper {
switch_memory_pool_t *pool;
struct call_helper *rows[MAX_ROWS];
int rowcount;
int ready;
};
@ -699,6 +848,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
consumer_session = session;
consumer_channel = switch_core_session_get_channel(consumer_session);
outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
switch (msg->message_id) {
case SWITCH_MESSAGE_INDICATE_BRIDGE:
@ -706,6 +856,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
if ((caller_session = switch_core_session_locate(msg->string_arg))) {
caller_channel = switch_core_session_get_channel(caller_session);
if (msg->message_id == SWITCH_MESSAGE_INDICATE_BRIDGE) {
cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
switch_core_session_soft_lock(caller_session, 5);
} else {
switch_core_session_soft_unlock(caller_session);
@ -723,8 +874,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
default:
goto end;
}
outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
switch (msg->message_id) {
case SWITCH_MESSAGE_INDICATE_BRIDGE:
@ -927,33 +1077,6 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
return SWITCH_STATUS_SUCCESS;
}
static void add_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
{
switch_mutex_lock(globals.orig_mutex);
switch_core_hash_insert(globals.orig_hash, key, cancel_cause);
switch_mutex_unlock(globals.orig_mutex);
}
static void del_outbound_call(const char *key)
{
switch_mutex_lock(globals.orig_mutex);
switch_core_hash_delete(globals.orig_hash, key);
switch_mutex_unlock(globals.orig_mutex);
}
static void cancel_outbound_call(const char *key, switch_call_cause_t cause)
{
switch_call_cause_t *cancel_cause = NULL;
switch_mutex_lock(globals.orig_mutex);
if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.orig_hash, key))) {
*cancel_cause = cause;
}
switch_mutex_unlock(globals.orig_mutex);
fifo_caller_del(key);
}
static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj)
{
@ -981,8 +1104,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
switch_call_cause_t cancel_cause = 0;
char *uuid_list = NULL;
int connected = 0;
int connected = 0, total = 0;
const char *codec;
struct call_helper *rows[MAX_ROWS] = { 0 };
int rowcount = 0;
switch_uuid_get(&uuid);
switch_uuid_format(uuid_str, &uuid);
@ -997,9 +1122,35 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
node = switch_core_hash_find(globals.fifo_hash, node_name);
switch_mutex_unlock(globals.mutex);
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
if (check_consumer_outbound_call(h->uuid) || check_bridge_call(h->uuid)) {
continue;
}
rows[rowcount++] = h;
add_consumer_outbound_call(h->uuid, &cancel_cause);
total++;
}
for (i = 0; i < rowcount; i++) {
struct call_helper *h = rows[i];
cbh->rows[i] = h;
}
cbh->rowcount = rowcount;
cbh->ready = 1;
if (!total) {
goto end;
}
if (node) {
switch_mutex_lock(node->mutex);
node->busy = switch_epoch_time_now(NULL) + 600;
//node->busy = switch_epoch_time_now(NULL) + 600;
node->ring_consumer_count = 1;
switch_mutex_unlock(node->mutex);
} else {
@ -1113,11 +1264,14 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
}
if (!total) goto end;
if ((codec = switch_event_get_header(pop, "variable_sip_use_codec_name"))) {
const char *rate = switch_event_get_header(pop, "variable_sip_use_codec_rate");
const char *ptime = switch_event_get_header(pop, "variable_sip_use_codec_ptime");
@ -1132,11 +1286,11 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "absolute_codec_string", nstr);
}
add_outbound_call(id, &cancel_cause);
add_caller_outbound_call(id, &cancel_cause);
status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, &cancel_cause);
del_outbound_call(id);
del_caller_outbound_call(id);
if (status != SWITCH_STATUS_SUCCESS || cause != SWITCH_CAUSE_SUCCESS) {
@ -1217,7 +1371,12 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_channel_set_caller_extension(channel, extension);
switch_channel_set_state(channel, CS_EXECUTE);
switch_channel_wait_for_state(channel, NULL, CS_EXECUTE);
switch_channel_wait_for_flag(channel, CF_BRIDGED, SWITCH_TRUE, 5000, NULL);
switch_core_session_rwunlock(session);
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
@ -1228,10 +1387,19 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
end:
cbh->ready = 1;
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
del_consumer_outbound_call(h->uuid);
}
switch_safe_free(originate_string);
switch_safe_free(uuid_list);
switch_event_destroy(&ovars);
if (ovars) {
switch_event_destroy(&ovars);
}
if (pop_dup) {
switch_event_destroy(&pop_dup);
@ -1240,7 +1408,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
if (node) {
switch_mutex_lock(node->mutex);
node->ring_consumer_count = 0;
node->busy = switch_epoch_time_now(NULL) + connected;
//node->busy = switch_epoch_time_now(NULL) + connected;
switch_mutex_unlock(node->mutex);
}
@ -1273,7 +1441,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
if (node) {
switch_mutex_lock(node->mutex);
node->ring_consumer_count++;
node->busy = switch_epoch_time_now(NULL) + 600;
//node->busy = switch_epoch_time_now(NULL) + 600;
switch_mutex_unlock(node->mutex);
}
@ -1375,7 +1543,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
if (node->ring_consumer_count-- < 0) {
node->ring_consumer_count = 0;
}
node->busy = switch_epoch_time_now(NULL) + connected;
//node->busy = switch_epoch_time_now(NULL) + connected;
switch_mutex_unlock(node->mutex);
}
switch_core_destroy_memory_pool(&h->pool);
@ -1484,10 +1652,15 @@ static void find_consumers(fifo_node_t *node)
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
if (cbh->rowcount) {
int sanity = 40;
switch_threadattr_create(&thd_attr, cbh->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
while(--sanity > 0 && !cbh->ready) {
switch_yield(100000);
}
}
}
@ -1517,16 +1690,18 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val)) {
switch_mutex_lock(node->mutex);
if (node->has_outbound && node->ready && switch_epoch_time_now(NULL) > node->busy) {
if (node->has_outbound && node->ready) {// && switch_epoch_time_now(NULL) > node->busy) {
ppl_waiting = node_consumer_wait_count(node);
consumer_total = node->consumer_count;
idle_consumers = node_idle_consumers(node);
/* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); */
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG5,
//"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n",
//node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count);
if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
find_consumers(node);
switch_yield(1000000);
}
}
switch_mutex_unlock(node->mutex);
@ -1579,7 +1754,7 @@ static void check_ocancel(switch_core_session_t *session)
//channel = switch_core_session_get_channel(session);
cancel_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
cancel_caller_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
}
@ -1693,6 +1868,13 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
}
SWITCH_STANDARD_API(fifo_check_bridge_function)
{
stream->write_function(stream, "%s", (cmd && check_bridge_call(cmd)) ? "true" : "false");
return SWITCH_STATUS_SUCCESS;
}
SWITCH_STANDARD_API(fifo_add_outbound_function)
{
char *data = NULL, *argv[4] = { 0 };
@ -1739,6 +1921,7 @@ static void dec_use_count(switch_channel_t *channel)
long now = (long) switch_epoch_time_now(NULL);
if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
del_bridge_call(outbound_id);
sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag where use_count > 0 and uuid='%q'",
now, now, outbound_id);
fifo_execute_sql(sql, globals.sql_mutex);
@ -1780,6 +1963,8 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
return;
}
add_bridge_call(data);
switch_channel_set_variable(channel, "fifo_outbound_uuid", data);
if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
@ -2344,15 +2529,29 @@ SWITCH_STANDARD_APP(fifo_function)
}
if (node) {
const char *varval;
const char *varval, *check = NULL;
check = switch_channel_get_variable(channel, "fifo_bridge_uuid_required");
if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
if (check_bridge_call(varval) && switch_true(check)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
switch_channel_get_name(channel));
goto done;
}
for (x = 0; x < MAX_PRI; x++) {
if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
cancel_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "+unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
cancel_caller_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
break;
}
}
if (!pop && switch_true(check)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
switch_channel_get_name(channel));
goto done;
}
}
if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) {
@ -2582,12 +2781,18 @@ SWITCH_STANDARD_APP(fifo_function)
}
if (outbound_id) {
cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
add_bridge_call(outbound_id);
sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'",
switch_epoch_time_now(NULL), outbound_id);
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
}
add_bridge_call(switch_core_session_get_uuid(other_session));
add_bridge_call(switch_core_session_get_uuid(session));
sql = switch_mprintf("insert into fifo_bridge "
"(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
"values ('%q','%q','%q','%q','%q','%q',%ld)",
@ -2604,6 +2809,7 @@ SWITCH_STANDARD_APP(fifo_function)
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session);
if (outbound_id) {
@ -2616,8 +2822,13 @@ SWITCH_STANDARD_APP(fifo_function)
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
del_bridge_call(outbound_id);
}
del_bridge_call(switch_core_session_get_uuid(session));
del_bridge_call(switch_core_session_get_uuid(other_session));
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
@ -3779,7 +3990,7 @@ static void fifo_member_del(char *fifo_name, char *originate_string)
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", node->name);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", node->name);
fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
if (atoi(outbound_count) > 0) {
node->has_outbound = 1;
@ -3895,8 +4106,12 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
switch_core_new_memory_pool(&globals.pool);
switch_core_hash_init(&globals.fifo_hash, globals.pool);
switch_core_hash_init(&globals.orig_hash, globals.pool);
switch_mutex_init(&globals.orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_core_hash_init(&globals.caller_orig_hash, globals.pool);
switch_core_hash_init(&globals.consumer_orig_hash, globals.pool);
switch_core_hash_init(&globals.bridge_hash, globals.pool);
switch_mutex_init(&globals.caller_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_mutex_init(&globals.consumer_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_mutex_init(&globals.bridge_mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_mutex_init(&globals.sql_mutex, SWITCH_MUTEX_NESTED, globals.pool);
@ -3919,11 +4134,13 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
SWITCH_ADD_API(commands_api_interface, "fifo", "Return data about a fifo", fifo_api_function, FIFO_API_SYNTAX);
SWITCH_ADD_API(commands_api_interface, "fifo_member", "Add members to a fifo", fifo_member_api_function, FIFO_MEMBER_API_SYNTAX);
SWITCH_ADD_API(commands_api_interface, "fifo_add_outbound", "Add outbound members to a fifo", fifo_add_outbound_function, "<node> <url> [<priority>]");
SWITCH_ADD_API(commands_api_interface, "fifo_check_bridge", "check if uuid is in a bridge", fifo_check_bridge_function, "<uuid>|<outbound_id>");
switch_console_set_complete("add fifo list");
switch_console_set_complete("add fifo list_verbose");
switch_console_set_complete("add fifo count");
switch_console_set_complete("add fifo has_outbound");
switch_console_set_complete("add fifo importance");
switch_console_set_complete("add fifo_check_bridge ::console::list_uuid");
start_node_thread(globals.pool);
@ -3960,12 +4177,13 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
node = (fifo_node_t *) val;
switch_thread_rwlock_wrlock(node->rwlock);
switch_mutex_lock(node->mutex);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
switch_mutex_unlock(node->mutex);
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock);

View File

@ -1627,7 +1627,8 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char *
{
switch_api_interface_t *api;
switch_status_t status;
char *myarg = NULL, *argp = NULL;
switch_assert(stream != NULL);
switch_assert(stream->data != NULL);
switch_assert(stream->write_function != NULL);
@ -1636,18 +1637,25 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char *
switch_event_create(&stream->param_event, SWITCH_EVENT_API);
}
if (arg) {
myarg = strdup(arg);
argp = myarg;
while(*argp == ' ') argp++;
while(end_of(argp) == ' ') end_of(argp) = '\0';
}
if (stream->param_event) {
if (cmd) {
switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command", cmd);
}
if (arg) {
switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command-Argument", arg);
if (argp) {
switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command-Argument", argp);
}
}
if (cmd && (api = switch_loadable_module_get_api_interface(cmd)) != 0) {
if ((status = api->function(arg, session, stream)) != SWITCH_STATUS_SUCCESS) {
if ((status = api->function(argp, session, stream)) != SWITCH_STATUS_SUCCESS) {
stream->write_function(stream, "COMMAND RETURNED ERROR!\n");
}
UNPROTECT_INTERFACE(api);
@ -1660,7 +1668,8 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char *
switch_event_fire(&stream->param_event);
}
switch_safe_free(myarg);
return status;
}