1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/rtp_rtcp/source/receive_statistics_impl.h"
12 
13 #include <math.h>
14 
15 #include "webrtc/base/scoped_ptr.h"
16 #include "webrtc/modules/rtp_rtcp/source/bitrate.h"
17 #include "webrtc/modules/rtp_rtcp/source/time_util.h"
18 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
19 
20 namespace webrtc {
21 
22 const int64_t kStatisticsTimeoutMs = 8000;
23 const int64_t kStatisticsProcessIntervalMs = 1000;
24 
~StreamStatistician()25 StreamStatistician::~StreamStatistician() {}
26 
StreamStatisticianImpl(Clock * clock,RtcpStatisticsCallback * rtcp_callback,StreamDataCountersCallback * rtp_callback)27 StreamStatisticianImpl::StreamStatisticianImpl(
28     Clock* clock,
29     RtcpStatisticsCallback* rtcp_callback,
30     StreamDataCountersCallback* rtp_callback)
31     : clock_(clock),
32       stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
33       incoming_bitrate_(clock, NULL),
34       ssrc_(0),
35       max_reordering_threshold_(kDefaultMaxReorderingThreshold),
36       jitter_q4_(0),
37       cumulative_loss_(0),
38       jitter_q4_transmission_time_offset_(0),
39       last_receive_time_ms_(0),
40       last_received_timestamp_(0),
41       last_received_transmission_time_offset_(0),
42       received_seq_first_(0),
43       received_seq_max_(0),
44       received_seq_wraps_(0),
45       received_packet_overhead_(12),
46       last_report_inorder_packets_(0),
47       last_report_old_packets_(0),
48       last_report_seq_max_(0),
49       rtcp_callback_(rtcp_callback),
50       rtp_callback_(rtp_callback) {}
51 
IncomingPacket(const RTPHeader & header,size_t packet_length,bool retransmitted)52 void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
53                                             size_t packet_length,
54                                             bool retransmitted) {
55   UpdateCounters(header, packet_length, retransmitted);
56   NotifyRtpCallback();
57 }
58 
UpdateCounters(const RTPHeader & header,size_t packet_length,bool retransmitted)59 void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
60                                             size_t packet_length,
61                                             bool retransmitted) {
62   CriticalSectionScoped cs(stream_lock_.get());
63   bool in_order = InOrderPacketInternal(header.sequenceNumber);
64   ssrc_ = header.ssrc;
65   incoming_bitrate_.Update(packet_length);
66   receive_counters_.transmitted.AddPacket(packet_length, header);
67   if (!in_order && retransmitted) {
68     receive_counters_.retransmitted.AddPacket(packet_length, header);
69   }
70 
71   if (receive_counters_.transmitted.packets == 1) {
72     received_seq_first_ = header.sequenceNumber;
73     receive_counters_.first_packet_time_ms = clock_->TimeInMilliseconds();
74   }
75 
76   // Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6
77   // are received, 4 will be ignored.
78   if (in_order) {
79     // Current time in samples.
80     NtpTime receive_time(*clock_);
81 
82     // Wrong if we use RetransmitOfOldPacket.
83     if (receive_counters_.transmitted.packets > 1 &&
84         received_seq_max_ > header.sequenceNumber) {
85       // Wrap around detected.
86       received_seq_wraps_++;
87     }
88     // New max.
89     received_seq_max_ = header.sequenceNumber;
90 
91     // If new time stamp and more than one in-order packet received, calculate
92     // new jitter statistics.
93     if (header.timestamp != last_received_timestamp_ &&
94         (receive_counters_.transmitted.packets -
95          receive_counters_.retransmitted.packets) > 1) {
96       UpdateJitter(header, receive_time);
97     }
98     last_received_timestamp_ = header.timestamp;
99     last_receive_time_ntp_ = receive_time;
100     last_receive_time_ms_ = clock_->TimeInMilliseconds();
101   }
102 
103   size_t packet_oh = header.headerLength + header.paddingLength;
104 
105   // Our measured overhead. Filter from RFC 5104 4.2.1.2:
106   // avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
107   received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
108 }
109 
UpdateJitter(const RTPHeader & header,NtpTime receive_time)110 void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
111                                           NtpTime receive_time) {
112   uint32_t receive_time_rtp =
113       NtpToRtp(receive_time, header.payload_type_frequency);
114   uint32_t last_receive_time_rtp =
115       NtpToRtp(last_receive_time_ntp_, header.payload_type_frequency);
116   int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
117       (header.timestamp - last_received_timestamp_);
118 
119   time_diff_samples = abs(time_diff_samples);
120 
121   // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
122   // If this happens, don't update jitter value. Use 5 secs video frequency
123   // as the threshold.
124   if (time_diff_samples < 450000) {
125     // Note we calculate in Q4 to avoid using float.
126     int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
127     jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
128   }
129 
130   // Extended jitter report, RFC 5450.
131   // Actual network jitter, excluding the source-introduced jitter.
132   int32_t time_diff_samples_ext =
133     (receive_time_rtp - last_receive_time_rtp) -
134     ((header.timestamp +
135       header.extension.transmissionTimeOffset) -
136      (last_received_timestamp_ +
137       last_received_transmission_time_offset_));
138 
139   time_diff_samples_ext = abs(time_diff_samples_ext);
140 
141   if (time_diff_samples_ext < 450000) {
142     int32_t jitter_diffQ4TransmissionTimeOffset =
143       (time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
144     jitter_q4_transmission_time_offset_ +=
145       ((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
146   }
147 }
148 
NotifyRtpCallback()149 void StreamStatisticianImpl::NotifyRtpCallback() {
150   StreamDataCounters data;
151   uint32_t ssrc;
152   {
153     CriticalSectionScoped cs(stream_lock_.get());
154     data = receive_counters_;
155     ssrc = ssrc_;
156   }
157   rtp_callback_->DataCountersUpdated(data, ssrc);
158 }
159 
NotifyRtcpCallback()160 void StreamStatisticianImpl::NotifyRtcpCallback() {
161   RtcpStatistics data;
162   uint32_t ssrc;
163   {
164     CriticalSectionScoped cs(stream_lock_.get());
165     data = last_reported_statistics_;
166     ssrc = ssrc_;
167   }
168   rtcp_callback_->StatisticsUpdated(data, ssrc);
169 }
170 
FecPacketReceived(const RTPHeader & header,size_t packet_length)171 void StreamStatisticianImpl::FecPacketReceived(const RTPHeader& header,
172                                                size_t packet_length) {
173   {
174     CriticalSectionScoped cs(stream_lock_.get());
175     receive_counters_.fec.AddPacket(packet_length, header);
176   }
177   NotifyRtpCallback();
178 }
179 
SetMaxReorderingThreshold(int max_reordering_threshold)180 void StreamStatisticianImpl::SetMaxReorderingThreshold(
181     int max_reordering_threshold) {
182   CriticalSectionScoped cs(stream_lock_.get());
183   max_reordering_threshold_ = max_reordering_threshold;
184 }
185 
GetStatistics(RtcpStatistics * statistics,bool reset)186 bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
187                                            bool reset) {
188   {
189     CriticalSectionScoped cs(stream_lock_.get());
190     if (received_seq_first_ == 0 &&
191         receive_counters_.transmitted.payload_bytes == 0) {
192       // We have not received anything.
193       return false;
194     }
195 
196     if (!reset) {
197       if (last_report_inorder_packets_ == 0) {
198         // No report.
199         return false;
200       }
201       // Just get last report.
202       *statistics = last_reported_statistics_;
203       return true;
204     }
205 
206     *statistics = CalculateRtcpStatistics();
207   }
208 
209   NotifyRtcpCallback();
210 
211   return true;
212 }
213 
CalculateRtcpStatistics()214 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
215   RtcpStatistics stats;
216 
217   if (last_report_inorder_packets_ == 0) {
218     // First time we send a report.
219     last_report_seq_max_ = received_seq_first_ - 1;
220   }
221 
222   // Calculate fraction lost.
223   uint16_t exp_since_last = (received_seq_max_ - last_report_seq_max_);
224 
225   if (last_report_seq_max_ > received_seq_max_) {
226     // Can we assume that the seq_num can't go decrease over a full RTCP period?
227     exp_since_last = 0;
228   }
229 
230   // Number of received RTP packets since last report, counts all packets but
231   // not re-transmissions.
232   uint32_t rec_since_last =
233       (receive_counters_.transmitted.packets -
234        receive_counters_.retransmitted.packets) - last_report_inorder_packets_;
235 
236   // With NACK we don't know the expected retransmissions during the last
237   // second. We know how many "old" packets we have received. We just count
238   // the number of old received to estimate the loss, but it still does not
239   // guarantee an exact number since we run this based on time triggered by
240   // sending of an RTP packet. This should have a minimum effect.
241 
242   // With NACK we don't count old packets as received since they are
243   // re-transmitted. We use RTT to decide if a packet is re-ordered or
244   // re-transmitted.
245   uint32_t retransmitted_packets =
246       receive_counters_.retransmitted.packets - last_report_old_packets_;
247   rec_since_last += retransmitted_packets;
248 
249   int32_t missing = 0;
250   if (exp_since_last > rec_since_last) {
251     missing = (exp_since_last - rec_since_last);
252   }
253   uint8_t local_fraction_lost = 0;
254   if (exp_since_last) {
255     // Scale 0 to 255, where 255 is 100% loss.
256     local_fraction_lost =
257         static_cast<uint8_t>(255 * missing / exp_since_last);
258   }
259   stats.fraction_lost = local_fraction_lost;
260 
261   // We need a counter for cumulative loss too.
262   // TODO(danilchap): Ensure cumulative loss is below maximum value of 2^24.
263   cumulative_loss_ += missing;
264   stats.cumulative_lost = cumulative_loss_;
265   stats.extended_max_sequence_number =
266       (received_seq_wraps_ << 16) + received_seq_max_;
267   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
268   stats.jitter = jitter_q4_ >> 4;
269 
270   // Store this report.
271   last_reported_statistics_ = stats;
272 
273   // Only for report blocks in RTCP SR and RR.
274   last_report_inorder_packets_ =
275       receive_counters_.transmitted.packets -
276       receive_counters_.retransmitted.packets;
277   last_report_old_packets_ = receive_counters_.retransmitted.packets;
278   last_report_seq_max_ = received_seq_max_;
279 
280   return stats;
281 }
282 
GetDataCounters(size_t * bytes_received,uint32_t * packets_received) const283 void StreamStatisticianImpl::GetDataCounters(
284     size_t* bytes_received, uint32_t* packets_received) const {
285   CriticalSectionScoped cs(stream_lock_.get());
286   if (bytes_received) {
287     *bytes_received = receive_counters_.transmitted.payload_bytes +
288                       receive_counters_.transmitted.header_bytes +
289                       receive_counters_.transmitted.padding_bytes;
290   }
291   if (packets_received) {
292     *packets_received = receive_counters_.transmitted.packets;
293   }
294 }
295 
GetReceiveStreamDataCounters(StreamDataCounters * data_counters) const296 void StreamStatisticianImpl::GetReceiveStreamDataCounters(
297     StreamDataCounters* data_counters) const {
298   CriticalSectionScoped cs(stream_lock_.get());
299   *data_counters = receive_counters_;
300 }
301 
BitrateReceived() const302 uint32_t StreamStatisticianImpl::BitrateReceived() const {
303   CriticalSectionScoped cs(stream_lock_.get());
304   return incoming_bitrate_.BitrateNow();
305 }
306 
ProcessBitrate()307 void StreamStatisticianImpl::ProcessBitrate() {
308   CriticalSectionScoped cs(stream_lock_.get());
309   incoming_bitrate_.Process();
310 }
311 
LastReceiveTimeNtp(uint32_t * secs,uint32_t * frac) const312 void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs,
313                                                 uint32_t* frac) const {
314   CriticalSectionScoped cs(stream_lock_.get());
315   *secs = last_receive_time_ntp_.seconds();
316   *frac = last_receive_time_ntp_.fractions();
317 }
318 
IsRetransmitOfOldPacket(const RTPHeader & header,int64_t min_rtt) const319 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
320     const RTPHeader& header, int64_t min_rtt) const {
321   CriticalSectionScoped cs(stream_lock_.get());
322   if (InOrderPacketInternal(header.sequenceNumber)) {
323     return false;
324   }
325   uint32_t frequency_khz = header.payload_type_frequency / 1000;
326   assert(frequency_khz > 0);
327 
328   int64_t time_diff_ms = clock_->TimeInMilliseconds() -
329       last_receive_time_ms_;
330 
331   // Diff in time stamp since last received in order.
332   uint32_t timestamp_diff = header.timestamp - last_received_timestamp_;
333   uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
334 
335   int64_t max_delay_ms = 0;
336   if (min_rtt == 0) {
337     // Jitter standard deviation in samples.
338     float jitter_std = sqrt(static_cast<float>(jitter_q4_ >> 4));
339 
340     // 2 times the standard deviation => 95% confidence.
341     // And transform to milliseconds by dividing by the frequency in kHz.
342     max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
343 
344     // Min max_delay_ms is 1.
345     if (max_delay_ms == 0) {
346       max_delay_ms = 1;
347     }
348   } else {
349     max_delay_ms = (min_rtt / 3) + 1;
350   }
351   return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
352 }
353 
IsPacketInOrder(uint16_t sequence_number) const354 bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const {
355   CriticalSectionScoped cs(stream_lock_.get());
356   return InOrderPacketInternal(sequence_number);
357 }
358 
InOrderPacketInternal(uint16_t sequence_number) const359 bool StreamStatisticianImpl::InOrderPacketInternal(
360     uint16_t sequence_number) const {
361   // First packet is always in order.
362   if (last_receive_time_ms_ == 0)
363     return true;
364 
365   if (IsNewerSequenceNumber(sequence_number, received_seq_max_)) {
366     return true;
367   } else {
368     // If we have a restart of the remote side this packet is still in order.
369     return !IsNewerSequenceNumber(sequence_number, received_seq_max_ -
370                                   max_reordering_threshold_);
371   }
372 }
373 
Create(Clock * clock)374 ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) {
375   return new ReceiveStatisticsImpl(clock);
376 }
377 
ReceiveStatisticsImpl(Clock * clock)378 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
379     : clock_(clock),
380       receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
381       last_rate_update_ms_(0),
382       rtcp_stats_callback_(NULL),
383       rtp_stats_callback_(NULL) {}
384 
~ReceiveStatisticsImpl()385 ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
386   while (!statisticians_.empty()) {
387     delete statisticians_.begin()->second;
388     statisticians_.erase(statisticians_.begin());
389   }
390 }
391 
IncomingPacket(const RTPHeader & header,size_t packet_length,bool retransmitted)392 void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
393                                            size_t packet_length,
394                                            bool retransmitted) {
395   StreamStatisticianImpl* impl;
396   {
397     CriticalSectionScoped cs(receive_statistics_lock_.get());
398     StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
399     if (it != statisticians_.end()) {
400       impl = it->second;
401     } else {
402       impl = new StreamStatisticianImpl(clock_, this, this);
403       statisticians_[header.ssrc] = impl;
404     }
405   }
406   // StreamStatisticianImpl instance is created once and only destroyed when
407   // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
408   // it's own locking so don't hold receive_statistics_lock_ (potential
409   // deadlock).
410   impl->IncomingPacket(header, packet_length, retransmitted);
411 }
412 
FecPacketReceived(const RTPHeader & header,size_t packet_length)413 void ReceiveStatisticsImpl::FecPacketReceived(const RTPHeader& header,
414                                               size_t packet_length) {
415   CriticalSectionScoped cs(receive_statistics_lock_.get());
416   StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
417   // Ignore FEC if it is the first packet.
418   if (it != statisticians_.end()) {
419     it->second->FecPacketReceived(header, packet_length);
420   }
421 }
422 
GetActiveStatisticians() const423 StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
424   CriticalSectionScoped cs(receive_statistics_lock_.get());
425   StatisticianMap active_statisticians;
426   for (StatisticianImplMap::const_iterator it = statisticians_.begin();
427        it != statisticians_.end(); ++it) {
428     uint32_t secs;
429     uint32_t frac;
430     it->second->LastReceiveTimeNtp(&secs, &frac);
431     if (clock_->CurrentNtpInMilliseconds() -
432         Clock::NtpToMs(secs, frac) < kStatisticsTimeoutMs) {
433       active_statisticians[it->first] = it->second;
434     }
435   }
436   return active_statisticians;
437 }
438 
GetStatistician(uint32_t ssrc) const439 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
440     uint32_t ssrc) const {
441   CriticalSectionScoped cs(receive_statistics_lock_.get());
442   StatisticianImplMap::const_iterator it = statisticians_.find(ssrc);
443   if (it == statisticians_.end())
444     return NULL;
445   return it->second;
446 }
447 
SetMaxReorderingThreshold(int max_reordering_threshold)448 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
449     int max_reordering_threshold) {
450   CriticalSectionScoped cs(receive_statistics_lock_.get());
451   for (StatisticianImplMap::iterator it = statisticians_.begin();
452        it != statisticians_.end(); ++it) {
453     it->second->SetMaxReorderingThreshold(max_reordering_threshold);
454   }
455 }
456 
Process()457 int32_t ReceiveStatisticsImpl::Process() {
458   CriticalSectionScoped cs(receive_statistics_lock_.get());
459   for (StatisticianImplMap::iterator it = statisticians_.begin();
460        it != statisticians_.end(); ++it) {
461     it->second->ProcessBitrate();
462   }
463   last_rate_update_ms_ = clock_->TimeInMilliseconds();
464   return 0;
465 }
466 
TimeUntilNextProcess()467 int64_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
468   CriticalSectionScoped cs(receive_statistics_lock_.get());
469   int64_t time_since_last_update = clock_->TimeInMilliseconds() -
470       last_rate_update_ms_;
471   return std::max<int64_t>(
472       kStatisticsProcessIntervalMs - time_since_last_update, 0);
473 }
474 
RegisterRtcpStatisticsCallback(RtcpStatisticsCallback * callback)475 void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
476     RtcpStatisticsCallback* callback) {
477   CriticalSectionScoped cs(receive_statistics_lock_.get());
478   if (callback != NULL)
479     assert(rtcp_stats_callback_ == NULL);
480   rtcp_stats_callback_ = callback;
481 }
482 
StatisticsUpdated(const RtcpStatistics & statistics,uint32_t ssrc)483 void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
484                                               uint32_t ssrc) {
485   CriticalSectionScoped cs(receive_statistics_lock_.get());
486   if (rtcp_stats_callback_)
487     rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc);
488 }
489 
CNameChanged(const char * cname,uint32_t ssrc)490 void ReceiveStatisticsImpl::CNameChanged(const char* cname, uint32_t ssrc) {
491   CriticalSectionScoped cs(receive_statistics_lock_.get());
492   if (rtcp_stats_callback_)
493     rtcp_stats_callback_->CNameChanged(cname, ssrc);
494 }
495 
RegisterRtpStatisticsCallback(StreamDataCountersCallback * callback)496 void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback(
497     StreamDataCountersCallback* callback) {
498   CriticalSectionScoped cs(receive_statistics_lock_.get());
499   if (callback != NULL)
500     assert(rtp_stats_callback_ == NULL);
501   rtp_stats_callback_ = callback;
502 }
503 
DataCountersUpdated(const StreamDataCounters & stats,uint32_t ssrc)504 void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats,
505                                                 uint32_t ssrc) {
506   CriticalSectionScoped cs(receive_statistics_lock_.get());
507   if (rtp_stats_callback_) {
508     rtp_stats_callback_->DataCountersUpdated(stats, ssrc);
509   }
510 }
511 
IncomingPacket(const RTPHeader & rtp_header,size_t packet_length,bool retransmitted)512 void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header,
513                                            size_t packet_length,
514                                            bool retransmitted) {}
515 
FecPacketReceived(const RTPHeader & header,size_t packet_length)516 void NullReceiveStatistics::FecPacketReceived(const RTPHeader& header,
517                                               size_t packet_length) {}
518 
GetActiveStatisticians() const519 StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const {
520   return StatisticianMap();
521 }
522 
GetStatistician(uint32_t ssrc) const523 StreamStatistician* NullReceiveStatistics::GetStatistician(
524     uint32_t ssrc) const {
525   return NULL;
526 }
527 
SetMaxReorderingThreshold(int max_reordering_threshold)528 void NullReceiveStatistics::SetMaxReorderingThreshold(
529     int max_reordering_threshold) {}
530 
TimeUntilNextProcess()531 int64_t NullReceiveStatistics::TimeUntilNextProcess() { return 0; }
532 
Process()533 int32_t NullReceiveStatistics::Process() { return 0; }
534 
RegisterRtcpStatisticsCallback(RtcpStatisticsCallback * callback)535 void NullReceiveStatistics::RegisterRtcpStatisticsCallback(
536     RtcpStatisticsCallback* callback) {}
537 
RegisterRtpStatisticsCallback(StreamDataCountersCallback * callback)538 void NullReceiveStatistics::RegisterRtpStatisticsCallback(
539     StreamDataCountersCallback* callback) {}
540 
541 }  // namespace webrtc
542