various verto cleanups and fix a race in double event delivery thread creation

This commit is contained in:
Anthony Minessale 2014-06-20 19:21:56 +05:00
parent faeb00368e
commit ffa09cd1e7
2 changed files with 26 additions and 14 deletions

View File

@ -174,6 +174,7 @@ struct jsock_sub_node_head_s;
typedef struct jsock_sub_node_s {
jsock_t *jsock;
uint32_t serno;
struct jsock_sub_node_head_s *head;
struct jsock_sub_node_s *next;
} jsock_sub_node_t;
@ -529,7 +530,9 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t
if ((json_text = cJSON_PrintUnformatted(*json))) {
if (jsock->profile->debug || globals.debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "WRITE %s [%s]\n", jsock->name, json_text);
char *log_text = cJSON_Print(*json);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "WRITE %s [%s]\n", jsock->name, log_text);
free(log_text);
}
switch_mutex_lock(jsock->write_mutex);
ws_write_frame(&jsock->ws, WSOC_TEXT, json_text, strlen(json_text));
@ -558,6 +561,7 @@ static void write_event(const char *event_channel, jsock_t *use_jsock, cJSON *ev
if (!use_jsock || use_jsock == np->jsock) {
params = cJSON_Duplicate(event, 1);
cJSON_AddItemToObject(params, "eventSerno", cJSON_CreateNumber(np->serno++));
msg = jrpc_new_req("verto.event", NULL, &params);
ws_write_json(np->jsock, &msg, SWITCH_TRUE);
}
@ -1134,13 +1138,17 @@ static switch_status_t process_input(jsock_t *jsock, uint8_t *data, switch_ssize
switch_status_t status = SWITCH_STATUS_SUCCESS;
if (ascii) {
if (jsock->profile->debug || globals.debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "READ %s [%s]\n", jsock->name, ascii);
}
json = cJSON_Parse(ascii);
}
if (json) {
if (jsock->profile->debug || globals.debug) {
char *log_text = cJSON_Print(json);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "READ %s [%s]\n", jsock->name, log_text);
free(log_text);
}
if (json->type == cJSON_Array) { /* batch mode */
int i, len = cJSON_GetArraySize(json);
@ -3860,20 +3868,14 @@ static switch_call_cause_t verto_outgoing_channel(switch_core_session_t *session
void verto_broadcast(const char *event_channel, cJSON *json, const char *key, switch_event_channel_id_t id)
{
{
if (globals.debug > 10) {
char *json_text;
if ((json_text = cJSON_Print(json))) {
if (globals.debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "EVENT BROADCAST %s %s\n", event_channel, json_text);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ALERT, "EVENT BROADCAST %s %s\n", event_channel, json_text);
free(json_text);
}
}
jsock_send_event(json);
}

View File

@ -97,6 +97,7 @@ static switch_hash_t *CUSTOM_HASH = NULL;
static int THREAD_COUNT = 0;
static int DISPATCH_THREAD_COUNT = 0;
static int EVENT_CHANNEL_DISPATCH_THREAD_COUNT = 0;
static int EVENT_CHANNEL_DISPATCH_THREAD_STARTING = 0;
static int SYSTEM_RUNNING = 0;
static uint64_t EVENT_SEQUENCE_NR = 0;
#ifdef SWITCH_EVENT_RECYCLE
@ -2876,6 +2877,7 @@ static void *SWITCH_THREAD_FUNC switch_event_channel_deliver_thread(switch_threa
switch_mutex_lock(EVENT_QUEUE_MUTEX);
THREAD_COUNT++;
EVENT_CHANNEL_DISPATCH_THREAD_COUNT++;
EVENT_CHANNEL_DISPATCH_THREAD_STARTING = 0;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
while(SYSTEM_RUNNING) {
@ -2911,6 +2913,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_channel_broadcast(const char *event
{
event_channel_data_t *ecd = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
int launch = 0;
if (!SYSTEM_RUNNING) {
cJSON_Delete(*json);
@ -2927,7 +2930,14 @@ SWITCH_DECLARE(switch_status_t) switch_event_channel_broadcast(const char *event
*json = NULL;
if (!EVENT_CHANNEL_DISPATCH_THREAD_COUNT && SYSTEM_RUNNING) {
switch_mutex_lock(EVENT_QUEUE_MUTEX);
if (!EVENT_CHANNEL_DISPATCH_THREAD_COUNT && !EVENT_CHANNEL_DISPATCH_THREAD_STARTING && SYSTEM_RUNNING) {
EVENT_CHANNEL_DISPATCH_THREAD_STARTING = 1;
launch = 1;
}
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
if (launch) {
switch_thread_data_t *td;
if (!EVENT_CHANNEL_DISPATCH_QUEUE) {
@ -3189,7 +3199,7 @@ SWITCH_DECLARE(switch_status_t) switch_live_array_bootstrap(switch_live_array_t
cJSON_AddItemToObject(msg, "eventChannel", cJSON_CreateString(la->event_channel));
cJSON_AddItemToObject(data, "action", cJSON_CreateString("bootObj"));
cJSON_AddItemToObject(data, "name", cJSON_CreateString(la->name));
cJSON_AddItemToObject(data, "wireSerno", cJSON_CreateNumber(la->serno++));
cJSON_AddItemToObject(data, "wireSerno", cJSON_CreateNumber(-1));
if (sessid) {
cJSON_AddItemToObject(msg, "sessid", cJSON_CreateString(sessid));