diff --git a/libs/stfu/stfu.c b/libs/stfu/stfu.c index d2c443a9e7..f00dfafa39 100644 --- a/libs/stfu/stfu.c +++ b/libs/stfu/stfu.c @@ -102,7 +102,10 @@ struct stfu_instance { int32_t ts_offset; int32_t ts_drift; - + int32_t max_drift; + uint32_t drift_dropped_packets; + uint32_t drift_max_dropped; + int32_t ts_diff; int32_t last_ts_diff; int32_t same_ts; @@ -258,7 +261,7 @@ stfu_status_t stfu_n_resize(stfu_instance_t *i, uint32_t qlen) return s; } -stfu_instance_t *stfu_n_init(uint32_t qlen, uint32_t max_qlen, uint32_t samples_per_packet, uint32_t samples_per_second) +stfu_instance_t *stfu_n_init(uint32_t qlen, uint32_t max_qlen, uint32_t samples_per_packet, uint32_t samples_per_second, uint32_t max_drift_ms) { struct stfu_instance *i; @@ -277,6 +280,12 @@ stfu_instance_t *stfu_n_init(uint32_t qlen, uint32_t max_qlen, uint32_t samples_ stfu_n_init_aqueue(&i->b_queue, qlen); stfu_n_init_aqueue(&i->c_queue, qlen); + i->max_drift = (int32_t)(max_drift_ms * (samples_per_second / 1000) * -1); + + if (max_drift_ms && samples_per_packet) { + i->drift_max_dropped = (samples_per_second * 2) / samples_per_packet; + } + i->in_queue = &i->a_queue; i->out_queue = &i->b_queue; i->old_queue = &i->c_queue; @@ -396,6 +405,9 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint32_t pt, void if (i->last_ts_diff == i->ts_diff) { if (++i->same_ts == 5) { i->samples_per_packet = i->ts_diff; + if (i->max_drift && i->samples_per_packet) { + i->drift_max_dropped = (i->samples_per_second * 2) / i->samples_per_packet; + } } } else { i->same_ts = 0; @@ -409,11 +421,24 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint32_t pt, void } } - if (timer_ts && ts && !i->ts_offset) { - i->ts_offset = timer_ts - ts; - } + if (timer_ts) { + if (ts && !i->ts_offset) { + i->ts_offset = timer_ts - ts; + } - i->ts_drift = ts + (i->ts_offset - timer_ts); + i->ts_drift = ts + (i->ts_offset - timer_ts); + + if (i->max_drift) { + if (i->ts_drift < i->max_drift) { + if (++i->drift_dropped_packets < i->drift_max_dropped) { + stfu_log(STFU_LOG_EMERG, "%s TOO LATE !!! %u \n\n\n", i->name, ts); + return STFU_ITS_TOO_LATE; + } + } else { + i->drift_dropped_packets = 0; + } + } + } if (i->sync_in) { good_ts = 1; @@ -493,12 +518,12 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint32_t pt, void if (stfu_log != null_logger && i->debug) { - stfu_log(STFU_LOG_EMERG, "I: %s %u i=%u/%u - g:%u/%u c:%u/%u b:%u - %u:%u - %u %d %u %u %d %d %d\n", i->name, + stfu_log(STFU_LOG_EMERG, "I: %s %u i=%u/%u - g:%u/%u c:%u/%u b:%u - %u:%u - %u %d %u %u %d %d %d/%d\n", i->name, i->qlen, i->period_packet_in_count, i->period_time, i->consecutive_good_count, i->decrement_time, i->period_clean_count, i->decrement_time, i->consecutive_bad_count, ts, ts / i->samples_per_packet, i->period_missing_count, i->period_need_range_avg, - i->last_wr_ts, ts, i->diff, i->diff_total / least1(i->period_packet_in_count), i->ts_drift); + i->last_wr_ts, ts, i->diff, i->diff_total / least1(i->period_packet_in_count), i->ts_drift, i->max_drift); } if (last || i->in_queue->array_len == i->in_queue->array_size) { @@ -597,7 +622,7 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) if (!i->ready) { if (stfu_log != null_logger && i->debug) { - stfu_log(STFU_LOG_EMERG, "%s XXXSKIP\n", i->name); + stfu_log(STFU_LOG_EMERG, "%s JITTERBUFFER NOT READY: IGNORING FRAME\n", i->name); } return NULL; } @@ -612,7 +637,7 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) } if (i->cur_ts == 0) { if (stfu_log != null_logger && i->debug) { - stfu_log(STFU_LOG_EMERG, "%s XXXPUNT\n", i->name); + stfu_log(STFU_LOG_EMERG, "%s JITTERBUFFER ERROR: PUNTING\n", i->name); return NULL; } } diff --git a/libs/stfu/stfu.h b/libs/stfu/stfu.h index b92bb3f839..d8fbf447af 100644 --- a/libs/stfu/stfu.h +++ b/libs/stfu/stfu.h @@ -179,7 +179,7 @@ typedef void (*stfu_n_call_me_t)(stfu_instance_t *i, void *); void stfu_n_report(stfu_instance_t *i, stfu_report_t *r); void stfu_n_destroy(stfu_instance_t **i); -stfu_instance_t *stfu_n_init(uint32_t qlen, uint32_t max_qlen, uint32_t samples_per_packet, uint32_t samples_per_second); +stfu_instance_t *stfu_n_init(uint32_t qlen, uint32_t max_qlen, uint32_t samples_per_packet, uint32_t samples_per_second, uint32_t max_drift_ms); stfu_status_t stfu_n_resize(stfu_instance_t *i, uint32_t qlen); stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint32_t pt, void *data, size_t datalen, uint32_t timer_ts, int last); stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i);