From ffa09cd1e7b1177de7a22a0bac54122c36bff125 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 20 Jun 2014 19:21:56 +0500 Subject: [PATCH] various verto cleanups and fix a race in double event delivery thread creation --- src/mod/endpoints/mod_verto/mod_verto.c | 26 +++++++++++++------------ src/switch_event.c | 14 +++++++++++-- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/mod/endpoints/mod_verto/mod_verto.c b/src/mod/endpoints/mod_verto/mod_verto.c index 543a138fa4..b63a9243c3 100644 --- a/src/mod/endpoints/mod_verto/mod_verto.c +++ b/src/mod/endpoints/mod_verto/mod_verto.c @@ -174,6 +174,7 @@ struct jsock_sub_node_head_s; typedef struct jsock_sub_node_s { jsock_t *jsock; + uint32_t serno; struct jsock_sub_node_head_s *head; struct jsock_sub_node_s *next; } jsock_sub_node_t; @@ -529,7 +530,9 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t if ((json_text = cJSON_PrintUnformatted(*json))) { if (jsock->profile->debug || globals.debug) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "WRITE %s [%s]\n", jsock->name, json_text); + char *log_text = cJSON_Print(*json); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "WRITE %s [%s]\n", jsock->name, log_text); + free(log_text); } switch_mutex_lock(jsock->write_mutex); ws_write_frame(&jsock->ws, WSOC_TEXT, json_text, strlen(json_text)); @@ -558,6 +561,7 @@ static void write_event(const char *event_channel, jsock_t *use_jsock, cJSON *ev if (!use_jsock || use_jsock == np->jsock) { params = cJSON_Duplicate(event, 1); + cJSON_AddItemToObject(params, "eventSerno", cJSON_CreateNumber(np->serno++)); msg = jrpc_new_req("verto.event", NULL, ¶ms); ws_write_json(np->jsock, &msg, SWITCH_TRUE); } @@ -1134,13 +1138,17 @@ static switch_status_t process_input(jsock_t *jsock, uint8_t *data, switch_ssize switch_status_t status = SWITCH_STATUS_SUCCESS; if (ascii) { - if (jsock->profile->debug || globals.debug) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "READ %s [%s]\n", jsock->name, ascii); - } json = cJSON_Parse(ascii); } if (json) { + + if (jsock->profile->debug || globals.debug) { + char *log_text = cJSON_Print(json); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "READ %s [%s]\n", jsock->name, log_text); + free(log_text); + } + if (json->type == cJSON_Array) { /* batch mode */ int i, len = cJSON_GetArraySize(json); @@ -3860,20 +3868,14 @@ static switch_call_cause_t verto_outgoing_channel(switch_core_session_t *session void verto_broadcast(const char *event_channel, cJSON *json, const char *key, switch_event_channel_id_t id) { - - { + if (globals.debug > 10) { char *json_text; if ((json_text = cJSON_Print(json))) { - if (globals.debug) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "EVENT BROADCAST %s %s\n", event_channel, json_text); - } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "EVENT BROADCAST %s %s\n", event_channel, json_text); free(json_text); } } - - - jsock_send_event(json); } diff --git a/src/switch_event.c b/src/switch_event.c index f55936fc46..a2a68fdb68 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -97,6 +97,7 @@ static switch_hash_t *CUSTOM_HASH = NULL; static int THREAD_COUNT = 0; static int DISPATCH_THREAD_COUNT = 0; static int EVENT_CHANNEL_DISPATCH_THREAD_COUNT = 0; +static int EVENT_CHANNEL_DISPATCH_THREAD_STARTING = 0; static int SYSTEM_RUNNING = 0; static uint64_t EVENT_SEQUENCE_NR = 0; #ifdef SWITCH_EVENT_RECYCLE @@ -2876,6 +2877,7 @@ static void *SWITCH_THREAD_FUNC switch_event_channel_deliver_thread(switch_threa switch_mutex_lock(EVENT_QUEUE_MUTEX); THREAD_COUNT++; EVENT_CHANNEL_DISPATCH_THREAD_COUNT++; + EVENT_CHANNEL_DISPATCH_THREAD_STARTING = 0; switch_mutex_unlock(EVENT_QUEUE_MUTEX); while(SYSTEM_RUNNING) { @@ -2911,6 +2913,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_channel_broadcast(const char *event { event_channel_data_t *ecd = NULL; switch_status_t status = SWITCH_STATUS_SUCCESS; + int launch = 0; if (!SYSTEM_RUNNING) { cJSON_Delete(*json); @@ -2927,7 +2930,14 @@ SWITCH_DECLARE(switch_status_t) switch_event_channel_broadcast(const char *event *json = NULL; - if (!EVENT_CHANNEL_DISPATCH_THREAD_COUNT && SYSTEM_RUNNING) { + switch_mutex_lock(EVENT_QUEUE_MUTEX); + if (!EVENT_CHANNEL_DISPATCH_THREAD_COUNT && !EVENT_CHANNEL_DISPATCH_THREAD_STARTING && SYSTEM_RUNNING) { + EVENT_CHANNEL_DISPATCH_THREAD_STARTING = 1; + launch = 1; + } + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + + if (launch) { switch_thread_data_t *td; if (!EVENT_CHANNEL_DISPATCH_QUEUE) { @@ -3189,7 +3199,7 @@ SWITCH_DECLARE(switch_status_t) switch_live_array_bootstrap(switch_live_array_t cJSON_AddItemToObject(msg, "eventChannel", cJSON_CreateString(la->event_channel)); cJSON_AddItemToObject(data, "action", cJSON_CreateString("bootObj")); cJSON_AddItemToObject(data, "name", cJSON_CreateString(la->name)); - cJSON_AddItemToObject(data, "wireSerno", cJSON_CreateNumber(la->serno++)); + cJSON_AddItemToObject(data, "wireSerno", cJSON_CreateNumber(-1)); if (sessid) { cJSON_AddItemToObject(msg, "sessid", cJSON_CreateString(sessid));