deliver events with the core thread pool set events-use-dispatch=true in switch.conf.xml to use the old way

This commit is contained in:
Anthony Minessale 2013-09-05 03:42:35 +05:00
parent 44b01bee6b
commit a5f2176ea4
5 changed files with 79 additions and 24 deletions

View File

@ -275,6 +275,7 @@ struct switch_runtime {
char *core_db_post_trans_execute;
char *core_db_inner_pre_trans_execute;
char *core_db_inner_post_trans_execute;
int events_use_dispatch;
};
extern struct switch_runtime runtime;

View File

@ -65,6 +65,7 @@ typedef struct switch_thread_data_s {
switch_thread_start_t func;
void *obj;
int alloc;
switch_memory_pool_t *pool;
} switch_thread_data_t;
typedef struct switch_hold_record_s {

View File

@ -1963,9 +1963,18 @@ static void switch_load_core_config(const char *file)
switch_core_min_idle_cpu(atof(val));
} else if (!strcasecmp(var, "tipping-point") && !zstr(val)) {
runtime.tipping_point = atoi(val);
} else if (!strcasecmp(var, "events-use-dispatch") && !zstr(val)) {
runtime.events_use_dispatch = 1;
} else if (!strcasecmp(var, "initial-event-threads") && !zstr(val)) {
int tmp = atoi(val);
int tmp;
if (!runtime.events_use_dispatch) {
runtime.events_use_dispatch = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
"Implicitly setting events-use-dispatch based on usage of this initial-event-threads parameter.\n");
}
tmp = atoi(val);
if (tmp > runtime.cpu_count / 2) {
tmp = runtime.cpu_count / 2;

View File

@ -1622,10 +1622,14 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
td->func(thread, td->obj);
if (td->alloc) {
if (td->pool) {
switch_memory_pool_t *pool = td->pool;
td = NULL;
switch_core_destroy_memory_pool(&pool);
} else if (td->alloc) {
free(td);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing\n", (long) thread);
switch_mutex_lock(session_manager.mutex);

View File

@ -36,6 +36,7 @@
#include <switch.h>
#include <switch_event.h>
#include "tpl.h"
#include "private/switch_core_pvt.h"
//#define SWITCH_EVENT_RECYCLE
#define DISPATCH_QUEUE_LEN 10000
@ -244,6 +245,34 @@ static int switch_events_match(switch_event_t *event, switch_event_node_t *node)
return match;
}
static void *SWITCH_THREAD_FUNC switch_event_deliver_thread(switch_thread_t *thread, void *obj)
{
switch_event_t *event = (switch_event_t *) obj;
switch_event_deliver(&event);
return NULL;
}
static void switch_event_deliver_thread_pool(switch_event_t **event)
{
switch_thread_data_t *td;
td = malloc(sizeof(*td));
switch_assert(td);
td->alloc = 1;
td->func = switch_event_deliver_thread;
td->obj = *event;
td->pool = NULL;
*event = NULL;
switch_thread_pool_launch_thread(&td);
}
static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj)
{
switch_queue_t *queue = (switch_queue_t *) obj;
@ -489,19 +518,22 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
SYSTEM_RUNNING = 0;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
if (runtime.events_use_dispatch) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
}
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
}
switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
switch_status_t st;
switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
switch_status_t st;
switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
}
}
x = 0;
@ -513,7 +545,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
last = THREAD_COUNT;
}
{
if (runtime.events_use_dispatch) {
void *pop = NULL;
switch_event_t *event = NULL;
@ -622,19 +654,21 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
//switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool);
switch_event_launch_dispatch_threads(1);
if (runtime.events_use_dispatch) {
switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool);
switch_event_launch_dispatch_threads(1);
}
//switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
//switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
//switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
while (!THREAD_COUNT) {
switch_cond_next();
if (runtime.events_use_dispatch) {
while (!THREAD_COUNT) {
switch_cond_next();
}
}
switch_mutex_lock(EVENT_QUEUE_MUTEX);
SYSTEM_RUNNING = 1;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
@ -1884,9 +1918,15 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con
(*event)->event_user_data = user_data;
}
if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
switch_event_destroy(event);
return SWITCH_STATUS_FALSE;
if (runtime.events_use_dispatch) {
if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
switch_event_destroy(event);
return SWITCH_STATUS_FALSE;
}
} else {
switch_event_deliver_thread_pool(event);
}
return SWITCH_STATUS_SUCCESS;