fix some contention in rtmp

This commit is contained in:
Anthony Minessale 2013-11-17 06:51:33 +05:00
parent 8f51875e3e
commit 49f9bd01bf
2 changed files with 111 additions and 37 deletions

View File

@ -40,7 +40,8 @@
SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown);
SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, NULL);
SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime);
SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, mod_rtmp_runtime);
static switch_status_t config_profile(rtmp_profile_t *profile, switch_bool_t reload);
static switch_xml_config_item_t *get_instructions(rtmp_profile_t *profile);
@ -732,7 +733,7 @@ rtmp_session_t *rtmp_session_locate(const char *uuid)
{
rtmp_session_t *rsession = switch_core_hash_find_rdlock(rtmp_globals.session_hash, uuid, rtmp_globals.session_rwlock);
if (!rsession || rsession->state == RS_DESTROY) {
if (!rsession || rsession->state >= RS_DESTROY) {
return NULL;
}
@ -812,53 +813,95 @@ switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **n
return SWITCH_STATUS_SUCCESS;
}
static void rtmp_garbage_colletor(void)
{
switch_hash_index_t *hi;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP Garbage Collection\n");
switch_thread_rwlock_wrlock(rtmp_globals.session_rwlock);
top:
for (hi = switch_hash_first(NULL, rtmp_globals.session_hash); hi; hi = switch_hash_next(hi)) {
void *val;
const void *key;
switch_ssize_t keylen;
rtmp_session_t *rsession;
switch_hash_this(hi, &key, &keylen, &val);
rsession = (rtmp_session_t *) val;
if (rsession->state == RS_DESTROY) {
if (rtmp_real_session_destroy(&rsession) == SWITCH_STATUS_SUCCESS) {
goto top;
}
}
}
switch_thread_rwlock_unlock(rtmp_globals.session_rwlock);
}
switch_status_t rtmp_session_destroy(rtmp_session_t **rsession)
{
switch_status_t status = SWITCH_STATUS_FALSE;
switch_mutex_lock(rtmp_globals.mutex);
if (rsession && *rsession) {
(*rsession)->state = RS_DESTROY;
*rsession = NULL;
status = SWITCH_STATUS_SUCCESS;
}
switch_mutex_unlock(rtmp_globals.mutex);
return status;
}
switch_status_t rtmp_real_session_destroy(rtmp_session_t **rsession)
{
switch_hash_index_t *hi;
switch_event_t *event;
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
rtmp_event_fill(*rsession, event);
switch_event_fire(&event);
}
switch_core_hash_delete_wrlock(rtmp_globals.session_hash, (*rsession)->uuid, rtmp_globals.session_rwlock);
switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
rtmp_clear_registration(*rsession, NULL, NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
(*rsession)->state = RS_DESTROY;
int sess = 0;
switch_thread_rwlock_rdlock((*rsession)->session_rwlock);
for (hi = switch_hash_first(NULL, (*rsession)->session_hash); hi; hi = switch_hash_next(hi)) {
void *val;
const void *key;
switch_ssize_t keylen;
rtmp_private_t *tech_pvt;
switch_channel_t *channel;
switch_core_session_t *session;
switch_hash_this(hi, &key, &keylen, &val);
tech_pvt = (rtmp_private_t *)val;
/* At this point we don't know if the session still exists, so request a fresh pointer to it from the core. */
if ( (session = switch_core_session_locate((char *)key)) != NULL ) {
/*
* This is here so that if the FS session still exists and has the FS session write(or read) lock, then we won't destroy the rsession
* until the FS session is finished with it. But if the rsession is able to get the FS session
* write lock, before the FS session is hungup, then once the FS session does get the write lock
* the rsession pointer will be null, and the FS session will never try and touch the already destroyed rsession.
*/
switch_hash_this(hi, &key, &keylen, &val);
/* If there are any sessions attached, abort the destroy operation */
if ((session = switch_core_session_locate((char *)key)) != NULL ) {
channel = switch_core_session_get_channel(session);
tech_pvt = switch_core_session_get_private(session);
if ( tech_pvt && tech_pvt->rtmp_session ) {
tech_pvt->rtmp_session = NULL;
}
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
switch_core_session_rwunlock(session);
sess++;
}
}
switch_thread_rwlock_unlock((*rsession)->session_rwlock);
if (sess) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p still busy.\n", (*rsession)->uuid, (void *) *rsession);
return SWITCH_STATUS_FALSE;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p will be destroyed.\n", (*rsession)->uuid, (void *) *rsession);
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
rtmp_event_fill(*rsession, event);
switch_event_fire(&event);
}
switch_core_hash_delete(rtmp_globals.session_hash, (*rsession)->uuid);
switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
rtmp_clear_registration(*rsession, NULL, NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
switch_mutex_lock((*rsession)->profile->mutex);
if ( (*rsession)->profile->calls < 1 ) {
@ -1137,7 +1180,7 @@ static void rtmp_clear_reg_auth(rtmp_session_t *rsession, const char *auth, cons
switch_thread_rwlock_wrlock(rsession->profile->reg_rwlock);
if ((reg = switch_core_hash_find(rsession->profile->reg_hash, auth))) {
for (; reg; reg = reg->next) {
if (!strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
if (!zstr(reg->uuid) && !strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
switch_event_t *event;
if (prev) {
prev->next = reg->next;
@ -1495,6 +1538,20 @@ done:
return SWITCH_STATUS_SUCCESS;
}
static const char *state2name(int state)
{
switch(state) {
case RS_HANDSHAKE:
return "HANDSHAKE";
case RS_HANDSHAKE2:
return "HANDSHAKE2";
case RS_ESTABLISHED:
return "ESTABLISHED";
default:
return "DESTROY (PENDING)";
}
}
#define RTMP_FUNCTION_SYNTAX "profile [profilename] [start | stop | rescan | restart]\nstatus profile [profilename]\nstatus profile [profilename] [reg | sessions]\nsession [session_id] [kill | login [user@domain] | logout [user@domain]]"
SWITCH_STANDARD_API(rtmp_function)
{
@ -1571,7 +1628,7 @@ SWITCH_STANDARD_API(rtmp_function)
{
switch_hash_index_t *hi;
stream->write_function(stream, "\nSessions:\n");
stream->write_function(stream, "uuid,address,user,domain,flashVer\n");
stream->write_function(stream, "uuid,address,user,domain,flashVer,state\n");
switch_thread_rwlock_rdlock(profile->session_rwlock);
for (hi = switch_hash_first(NULL, profile->session_hash); hi; hi = switch_hash_next(hi)) {
void *val;
@ -1581,11 +1638,11 @@ SWITCH_STANDARD_API(rtmp_function)
switch_hash_this(hi, &key, &keylen, &val);
item = (rtmp_session_t *)val;
stream->write_function(stream, "%s,%s:%d,%s,%s,%s\n",
item->uuid, item->remote_address, item->remote_port,
item->account ? item->account->user : NULL,
item->account ? item->account->domain : NULL,
item->flashVer);
stream->write_function(stream, "%s,%s:%d,%s,%s,%s,%s\n",
item->uuid, item->remote_address, item->remote_port,
item->account ? item->account->user : NULL,
item->account ? item->account->domain : NULL,
item->flashVer, state2name(item->state));
}
switch_thread_rwlock_unlock(profile->session_rwlock);
@ -1862,6 +1919,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load)
switch_xml_free(xml);
}
}
rtmp_globals.running = 1;
return SWITCH_STATUS_SUCCESS;
}
@ -1892,9 +1951,22 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown)
switch_core_hash_destroy(&rtmp_globals.session_hash);
switch_core_hash_destroy(&rtmp_globals.invoke_hash);
rtmp_globals.running = 0;
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime)
{
while(rtmp_globals.running) {
rtmp_garbage_colletor();
switch_yield(10000000);
}
return SWITCH_STATUS_TERM;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -336,6 +336,7 @@ struct mod_rtmp_globals {
switch_hash_t *session_hash;
switch_thread_rwlock_t *session_rwlock;
switch_hash_t *invoke_hash;
int running;
};
extern struct mod_rtmp_globals rtmp_globals;
@ -605,6 +606,7 @@ void rtmp_profile_release(rtmp_profile_t *profile);
switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtmp_io_t **new_io, switch_memory_pool_t *pool);
switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **newsession);
switch_status_t rtmp_session_destroy(rtmp_session_t **session);
switch_status_t rtmp_real_session_destroy(rtmp_session_t **session);
/**** Protocol ****/
void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize);