diff --git a/src/include/switch_rtp.h b/src/include/switch_rtp.h index 53767cda89..9f4cc51045 100644 --- a/src/include/switch_rtp.h +++ b/src/include/switch_rtp.h @@ -139,6 +139,13 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s */ SWITCH_DECLARE(void) switch_rtp_kill_socket(switch_rtp_t *rtp_session); +/*! + \brief Test if an RTP session is ready + \param rtp_session an RTP session to test + \return a true value if it's ready +*/ +SWITCH_DECLARE(uint8_t) switch_rtp_ready(switch_rtp_t *rtp_session); + /*! \brief Destroy an RTP session \param rtp_session an RTP session to destroy diff --git a/src/mod/endpoints/mod_dingaling/mod_dingaling.c b/src/mod/endpoints/mod_dingaling/mod_dingaling.c index 6a46314664..e551d6f9b3 100644 --- a/src/mod/endpoints/mod_dingaling/mod_dingaling.c +++ b/src/mod/endpoints/mod_dingaling/mod_dingaling.c @@ -286,7 +286,7 @@ static int activate_rtp(struct private_object *tech_pvt) int ms = 20; switch_rtp_flag_t flags; - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { return 1; } @@ -695,7 +695,7 @@ static switch_status_t channel_on_hangup(switch_core_session_t *session) ldl_session_destroy(&tech_pvt->dlsession); } - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { switch_rtp_destroy(&tech_pvt->rtp_session); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NUKE RTP\n"); tech_pvt->rtp_session = NULL; @@ -734,7 +734,7 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int } - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { switch_rtp_kill_socket(tech_pvt->rtp_session); } @@ -1629,7 +1629,7 @@ static ldl_status handle_signalling(ldl_handle_t *handle, ldl_session_t *dlsessi if (*msg == '+') { switch_channel_queue_dtmf(channel, msg + 1); switch_set_flag_locked(tech_pvt, TFLAG_DTMF); - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { switch_rtp_set_flag(tech_pvt->rtp_session, SWITCH_RTP_FLAG_BREAK); } } diff --git a/src/mod/endpoints/mod_exosip/mod_exosip.c b/src/mod/endpoints/mod_exosip/mod_exosip.c index 86f5492229..eb721660d9 100644 --- a/src/mod/endpoints/mod_exosip/mod_exosip.c +++ b/src/mod/endpoints/mod_exosip/mod_exosip.c @@ -490,7 +490,7 @@ static void deactivate_rtp(struct private_object *tech_pvt) { int loops = 0;//, sock = -1; - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { while (loops < 10 && (switch_test_flag(tech_pvt, TFLAG_READING) || switch_test_flag(tech_pvt, TFLAG_WRITING))) { switch_yield(10000); loops++; @@ -587,7 +587,7 @@ static switch_status_t activate_rtp(struct private_object *tech_pvt) key, &err, switch_core_session_get_pool(tech_pvt->session)); - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { uint8_t vad_in = switch_test_flag(tech_pvt, TFLAG_VAD_IN) ? 1 : 0; uint8_t vad_out = switch_test_flag(tech_pvt, TFLAG_VAD_OUT) ? 1 : 0; uint8_t inb = switch_test_flag(tech_pvt, TFLAG_OUTBOUND) ? 0 : 1; @@ -840,7 +840,7 @@ static switch_status_t exosip_kill_channel(switch_core_session_t *session, int s switch_clear_flag_locked(tech_pvt, TFLAG_IO); switch_set_flag_locked(tech_pvt, TFLAG_BYE); - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { switch_rtp_kill_socket(tech_pvt->rtp_session); } diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.c b/src/mod/endpoints/mod_sofia/mod_sofia.c index b2dedc40ae..50516b78ad 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.c +++ b/src/mod/endpoints/mod_sofia/mod_sofia.c @@ -579,7 +579,7 @@ static void deactivate_rtp(private_object_t *tech_pvt) { int loops = 0;//, sock = -1; - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { while (loops < 10 && (switch_test_flag(tech_pvt, TFLAG_READING) || switch_test_flag(tech_pvt, TFLAG_WRITING))) { switch_yield(10000); loops++; @@ -606,7 +606,7 @@ static switch_status_t activate_rtp(private_object_t *tech_pvt) assert(tech_pvt->codecs[tech_pvt->codec_index] != NULL); - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { return SWITCH_STATUS_SUCCESS; } @@ -680,7 +680,7 @@ static switch_status_t activate_rtp(private_object_t *tech_pvt) &err, switch_core_session_get_pool(tech_pvt->session)); - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { uint8_t vad_in = switch_test_flag(tech_pvt, TFLAG_VAD_IN) ? 1 : 0; uint8_t vad_out = switch_test_flag(tech_pvt, TFLAG_VAD_OUT) ? 1 : 0; uint8_t inb = switch_test_flag(tech_pvt, TFLAG_OUTBOUND) ? 0 : 1; @@ -906,7 +906,7 @@ static switch_status_t sofia_kill_channel(switch_core_session_t *session, int si switch_clear_flag_locked(tech_pvt, TFLAG_IO); switch_set_flag_locked(tech_pvt, TFLAG_HUP); - if (tech_pvt->rtp_session) { + if (switch_rtp_ready(tech_pvt->rtp_session)) { switch_rtp_kill_socket(tech_pvt->rtp_session); } diff --git a/src/switch_core.c b/src/switch_core.c index d9d87a26d2..9d87821545 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1242,7 +1242,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi *frame = NULL; while (switch_channel_test_flag(session->channel, CF_HOLD)) { - return SWITCH_STATUS_BREAK; + status = SWITCH_STATUS_BREAK; + goto done; } if (session->endpoint_interface->io_routines->read_frame) { @@ -1261,15 +1262,20 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi } } - if (status != SWITCH_STATUS_SUCCESS || !(*frame)) { - return status; + if (status != SWITCH_STATUS_SUCCESS) { + goto done; + } + + if (!(*frame)) { + goto done; } assert(session != NULL); assert(*frame != NULL); if (switch_test_flag(*frame, SFF_CNG)) { - return SWITCH_STATUS_SUCCESS; + status = SWITCH_STATUS_SUCCESS; + goto done; } if ((session->read_codec && (*frame)->codec && session->read_codec->implementation != (*frame)->codec->implementation)) { @@ -1318,7 +1324,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi default: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Codec %s decoder error!\n", session->read_codec->codec_interface->interface_name); - return status; + goto done; } } if (session->read_resampler) { @@ -1346,7 +1352,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi switch_buffer_create(session->pool, &session->raw_read_buffer, bytes); } if (!switch_buffer_write(session->raw_read_buffer, read_frame->data, read_frame->datalen)) { - return SWITCH_STATUS_MEMERR; + status = SWITCH_STATUS_MEMERR; + goto done; } } @@ -1403,6 +1410,11 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi } } + done: + if (!(*frame)) { + status = SWITCH_STATUS_FALSE; + } + return status; } diff --git a/src/switch_rtp.c b/src/switch_rtp.c index e7fd95f361..68f810ac29 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -170,6 +170,7 @@ struct switch_rtp { struct switch_rtp_rfc2833_data dtmf_data; uint8_t mini; switch_payload_t te; + switch_mutex_t *flag_mutex; }; static int global_init = 0; @@ -332,9 +333,9 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK)) { switch_socket_opt_set(rtp_session->sock, APR_SO_NONBLOCK, TRUE); - switch_set_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK); + switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_NOBLOCK); } - switch_set_flag(rtp_session, SWITCH_RTP_FLAG_IO); + switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_IO); return SWITCH_STATUS_SUCCESS; } @@ -378,9 +379,13 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session return SWITCH_STATUS_MEMERR; } + + rtp_session->pool = pool; rtp_session->flags = flags; rtp_session->te = 101; + + switch_mutex_init(&rtp_session->flag_mutex, SWITCH_MUTEX_NESTED, rtp_session->pool); switch_mutex_init(&rtp_session->dtmf_data.dtmf_mutex, SWITCH_MUTEX_NESTED, rtp_session->pool); switch_buffer_create(rtp_session->pool, &rtp_session->dtmf_data.dtmf_buffer, 128); /* for from address on recvfrom calls */ @@ -390,7 +395,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session if (crypto_key) { int len; - switch_set_flag(rtp_session, SWITCH_RTP_FLAG_SECURE); + switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_SECURE); crypto_policy_set_rtp_default(&policy.rtp); crypto_policy_set_rtcp_default(&policy.rtcp); policy.ssrc.type = ssrc_any_inbound; @@ -502,7 +507,7 @@ SWITCH_DECLARE(switch_rtp_t *)switch_rtp_new(char *rx_host, if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_MINI)) { switch_rtp_miniframe_probe(rtp_session); - switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_MINI); + switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_MINI); } return rtp_session; @@ -537,24 +542,33 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_ice(switch_rtp_t *rtp_sessio SWITCH_DECLARE(void) switch_rtp_kill_socket(switch_rtp_t *rtp_session) { apr_socket_shutdown(rtp_session->sock, APR_SHUTDOWN_READWRITE); - switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_IO); + switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_IO); } +SWITCH_DECLARE(uint8_t) switch_rtp_ready(switch_rtp_t *rtp_session) +{ + return switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) ? 1 : 0; +} SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session) { + + if (!switch_test_flag((*rtp_session), SWITCH_RTP_FLAG_IO)) { + return; + } + switch_rtp_kill_socket(*rtp_session); switch_socket_close((*rtp_session)->sock); if (switch_test_flag((*rtp_session), SWITCH_RTP_FLAG_VAD)) { switch_rtp_disable_vad(*rtp_session); } + if (switch_test_flag((*rtp_session), SWITCH_RTP_FLAG_SECURE)) { srtp_dealloc((*rtp_session)->recv_ctx); srtp_dealloc((*rtp_session)->send_ctx); } - *rtp_session = NULL; return; } @@ -591,7 +605,7 @@ SWITCH_DECLARE(void) switch_rtp_set_invald_handler(switch_rtp_t *rtp_session, sw SWITCH_DECLARE(void) switch_rtp_set_flag(switch_rtp_t *rtp_session, switch_rtp_flag_t flags) { - switch_set_flag(rtp_session, flags); + switch_set_flag_locked(rtp_session, flags); } @@ -605,7 +619,7 @@ SWITCH_DECLARE(uint8_t) switch_rtp_test_flag(switch_rtp_t *rtp_session, switch_r SWITCH_DECLARE(void) switch_rtp_clear_flag(switch_rtp_t *rtp_session, switch_rtp_flag_t flags) { - switch_clear_flag(rtp_session, flags); + switch_clear_flag_locked(rtp_session, flags); } @@ -698,7 +712,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock, 0, (void *)&rtp_session->recv_msg, &bytes); if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_BREAK)) { - switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_BREAK); + switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_BREAK); return 0; } @@ -727,7 +741,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ uint32_t effective_size = (uint32_t)(bytes - sizeof(srtp_mini_hdr_t)); if (rtp_session->recv_msg.header.pt == RTP_MAGIC_NUMBER) { if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_MINI)) { - switch_set_flag(rtp_session, SWITCH_RTP_FLAG_MINI); + switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_MINI); rtp_session->rpacket_size = ntohl(rtp_session->recv_msg.header.ts); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "YAY MINI-RTP! %d\n", rtp_session->rpacket_size); switch_rtp_miniframe_probe(rtp_session); @@ -802,7 +816,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ switch_rtp_set_remote_address(rtp_session, tx_host, rtp_session->from_addr->port, &err); } } - switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_AUTOADJ); + switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_AUTOADJ); } } @@ -1231,7 +1245,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_disable_vad(switch_rtp_t *rtp_session return SWITCH_STATUS_GENERR; } switch_core_codec_destroy(&rtp_session->vad_data.vad_codec); - switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_VAD); + switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_VAD); return SWITCH_STATUS_SUCCESS; } @@ -1268,7 +1282,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_enable_vad(switch_rtp_t *rtp_session, rtp_session->vad_data.start = 0; rtp_session->vad_data.next_scan = time(NULL); rtp_session->vad_data.scan_freq = 0; - switch_set_flag(rtp_session, SWITCH_RTP_FLAG_VAD); + switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_VAD); switch_set_flag(&rtp_session->vad_data, SWITCH_VAD_FLAG_CNG); return SWITCH_STATUS_SUCCESS; }