From 49f9bd01bfb0614787f76fed29ea8a06e2c13a12 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Sun, 17 Nov 2013 06:51:33 +0500 Subject: [PATCH] fix some contention in rtmp --- src/mod/endpoints/mod_rtmp/mod_rtmp.c | 146 +++++++++++++++++++------- src/mod/endpoints/mod_rtmp/mod_rtmp.h | 2 + 2 files changed, 111 insertions(+), 37 deletions(-) diff --git a/src/mod/endpoints/mod_rtmp/mod_rtmp.c b/src/mod/endpoints/mod_rtmp/mod_rtmp.c index e25f38b861..4e6e10db8f 100644 --- a/src/mod/endpoints/mod_rtmp/mod_rtmp.c +++ b/src/mod/endpoints/mod_rtmp/mod_rtmp.c @@ -40,7 +40,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown); -SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, NULL); +SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime); +SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, mod_rtmp_runtime); static switch_status_t config_profile(rtmp_profile_t *profile, switch_bool_t reload); static switch_xml_config_item_t *get_instructions(rtmp_profile_t *profile); @@ -732,7 +733,7 @@ rtmp_session_t *rtmp_session_locate(const char *uuid) { rtmp_session_t *rsession = switch_core_hash_find_rdlock(rtmp_globals.session_hash, uuid, rtmp_globals.session_rwlock); - if (!rsession || rsession->state == RS_DESTROY) { + if (!rsession || rsession->state >= RS_DESTROY) { return NULL; } @@ -812,53 +813,95 @@ switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **n return SWITCH_STATUS_SUCCESS; } +static void rtmp_garbage_colletor(void) +{ + switch_hash_index_t *hi; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP Garbage Collection\n"); + + + switch_thread_rwlock_wrlock(rtmp_globals.session_rwlock); + + top: + + for (hi = switch_hash_first(NULL, rtmp_globals.session_hash); hi; hi = switch_hash_next(hi)) { + void *val; + const void *key; + switch_ssize_t keylen; + rtmp_session_t *rsession; + + switch_hash_this(hi, &key, &keylen, &val); + rsession = (rtmp_session_t *) val; + + if (rsession->state == RS_DESTROY) { + if (rtmp_real_session_destroy(&rsession) == SWITCH_STATUS_SUCCESS) { + goto top; + } + } + } + + switch_thread_rwlock_unlock(rtmp_globals.session_rwlock); +} + switch_status_t rtmp_session_destroy(rtmp_session_t **rsession) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + + switch_mutex_lock(rtmp_globals.mutex); + if (rsession && *rsession) { + (*rsession)->state = RS_DESTROY; + *rsession = NULL; + status = SWITCH_STATUS_SUCCESS; + } + switch_mutex_unlock(rtmp_globals.mutex); + + return status; +} + +switch_status_t rtmp_real_session_destroy(rtmp_session_t **rsession) { switch_hash_index_t *hi; switch_event_t *event; - - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) { - rtmp_event_fill(*rsession, event); - switch_event_fire(&event); - } - - switch_core_hash_delete_wrlock(rtmp_globals.session_hash, (*rsession)->uuid, rtmp_globals.session_rwlock); - switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock); - rtmp_clear_registration(*rsession, NULL, NULL); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid); - - (*rsession)->state = RS_DESTROY; + int sess = 0; switch_thread_rwlock_rdlock((*rsession)->session_rwlock); for (hi = switch_hash_first(NULL, (*rsession)->session_hash); hi; hi = switch_hash_next(hi)) { void *val; const void *key; switch_ssize_t keylen; - rtmp_private_t *tech_pvt; switch_channel_t *channel; switch_core_session_t *session; - switch_hash_this(hi, &key, &keylen, &val); - tech_pvt = (rtmp_private_t *)val; - - /* At this point we don't know if the session still exists, so request a fresh pointer to it from the core. */ - if ( (session = switch_core_session_locate((char *)key)) != NULL ) { - /* - * This is here so that if the FS session still exists and has the FS session write(or read) lock, then we won't destroy the rsession - * until the FS session is finished with it. But if the rsession is able to get the FS session - * write lock, before the FS session is hungup, then once the FS session does get the write lock - * the rsession pointer will be null, and the FS session will never try and touch the already destroyed rsession. - */ + switch_hash_this(hi, &key, &keylen, &val); + + /* If there are any sessions attached, abort the destroy operation */ + if ((session = switch_core_session_locate((char *)key)) != NULL ) { channel = switch_core_session_get_channel(session); - tech_pvt = switch_core_session_get_private(session); - if ( tech_pvt && tech_pvt->rtmp_session ) { - tech_pvt->rtmp_session = NULL; - } switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); switch_core_session_rwunlock(session); + sess++; } } switch_thread_rwlock_unlock((*rsession)->session_rwlock); + + if (sess) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p still busy.\n", (*rsession)->uuid, (void *) *rsession); + return SWITCH_STATUS_FALSE; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p will be destroyed.\n", (*rsession)->uuid, (void *) *rsession); + + + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) { + rtmp_event_fill(*rsession, event); + switch_event_fire(&event); + } + + switch_core_hash_delete(rtmp_globals.session_hash, (*rsession)->uuid); + switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock); + rtmp_clear_registration(*rsession, NULL, NULL); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid); + switch_mutex_lock((*rsession)->profile->mutex); if ( (*rsession)->profile->calls < 1 ) { @@ -1137,7 +1180,7 @@ static void rtmp_clear_reg_auth(rtmp_session_t *rsession, const char *auth, cons switch_thread_rwlock_wrlock(rsession->profile->reg_rwlock); if ((reg = switch_core_hash_find(rsession->profile->reg_hash, auth))) { for (; reg; reg = reg->next) { - if (!strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) { + if (!zstr(reg->uuid) && !strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) { switch_event_t *event; if (prev) { prev->next = reg->next; @@ -1495,6 +1538,20 @@ done: return SWITCH_STATUS_SUCCESS; } +static const char *state2name(int state) +{ + switch(state) { + case RS_HANDSHAKE: + return "HANDSHAKE"; + case RS_HANDSHAKE2: + return "HANDSHAKE2"; + case RS_ESTABLISHED: + return "ESTABLISHED"; + default: + return "DESTROY (PENDING)"; + } +} + #define RTMP_FUNCTION_SYNTAX "profile [profilename] [start | stop | rescan | restart]\nstatus profile [profilename]\nstatus profile [profilename] [reg | sessions]\nsession [session_id] [kill | login [user@domain] | logout [user@domain]]" SWITCH_STANDARD_API(rtmp_function) { @@ -1571,7 +1628,7 @@ SWITCH_STANDARD_API(rtmp_function) { switch_hash_index_t *hi; stream->write_function(stream, "\nSessions:\n"); - stream->write_function(stream, "uuid,address,user,domain,flashVer\n"); + stream->write_function(stream, "uuid,address,user,domain,flashVer,state\n"); switch_thread_rwlock_rdlock(profile->session_rwlock); for (hi = switch_hash_first(NULL, profile->session_hash); hi; hi = switch_hash_next(hi)) { void *val; @@ -1581,11 +1638,11 @@ SWITCH_STANDARD_API(rtmp_function) switch_hash_this(hi, &key, &keylen, &val); item = (rtmp_session_t *)val; - stream->write_function(stream, "%s,%s:%d,%s,%s,%s\n", - item->uuid, item->remote_address, item->remote_port, - item->account ? item->account->user : NULL, - item->account ? item->account->domain : NULL, - item->flashVer); + stream->write_function(stream, "%s,%s:%d,%s,%s,%s,%s\n", + item->uuid, item->remote_address, item->remote_port, + item->account ? item->account->user : NULL, + item->account ? item->account->domain : NULL, + item->flashVer, state2name(item->state)); } switch_thread_rwlock_unlock(profile->session_rwlock); @@ -1862,6 +1919,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load) switch_xml_free(xml); } } + + rtmp_globals.running = 1; return SWITCH_STATUS_SUCCESS; } @@ -1892,9 +1951,22 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown) switch_core_hash_destroy(&rtmp_globals.session_hash); switch_core_hash_destroy(&rtmp_globals.invoke_hash); + rtmp_globals.running = 0; + return SWITCH_STATUS_SUCCESS; } +SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime) +{ + + while(rtmp_globals.running) { + rtmp_garbage_colletor(); + switch_yield(10000000); + } + + return SWITCH_STATUS_TERM; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/src/mod/endpoints/mod_rtmp/mod_rtmp.h b/src/mod/endpoints/mod_rtmp/mod_rtmp.h index 8a92c5b427..95eb17f017 100644 --- a/src/mod/endpoints/mod_rtmp/mod_rtmp.h +++ b/src/mod/endpoints/mod_rtmp/mod_rtmp.h @@ -336,6 +336,7 @@ struct mod_rtmp_globals { switch_hash_t *session_hash; switch_thread_rwlock_t *session_rwlock; switch_hash_t *invoke_hash; + int running; }; extern struct mod_rtmp_globals rtmp_globals; @@ -605,6 +606,7 @@ void rtmp_profile_release(rtmp_profile_t *profile); switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtmp_io_t **new_io, switch_memory_pool_t *pool); switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **newsession); switch_status_t rtmp_session_destroy(rtmp_session_t **session); +switch_status_t rtmp_real_session_destroy(rtmp_session_t **session); /**** Protocol ****/ void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize);