1 /*
2  *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/system_wrappers/include/data_log.h"
12 
13 #include <assert.h>
14 
15 #include <algorithm>
16 #include <list>
17 
18 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
19 #include "webrtc/system_wrappers/include/event_wrapper.h"
20 #include "webrtc/system_wrappers/include/file_wrapper.h"
21 #include "webrtc/system_wrappers/include/rw_lock_wrapper.h"
22 
23 namespace webrtc {
24 
25 DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
26   CriticalSectionWrapper::CreateCriticalSection());
27 
28 DataLogImpl* DataLogImpl::instance_ = NULL;
29 
30 // A Row contains cells, which are indexed by the column names as std::string.
31 // The string index is treated in a case sensitive way.
32 class Row {
33  public:
34   Row();
35   ~Row();
36 
37   // Inserts a Container into the cell of the column specified with
38   // column_name.
39   // column_name is treated in a case sensitive way.
40   int InsertCell(const std::string& column_name,
41                  const Container* value_container);
42 
43   // Converts the value at the column specified by column_name to a string
44   // stored in value_string.
45   // column_name is treated in a case sensitive way.
46   void ToString(const std::string& column_name, std::string* value_string);
47 
48  private:
49   // Collection of containers indexed by column name as std::string
50   typedef std::map<std::string, const Container*> CellMap;
51 
52   CellMap                   cells_;
53   CriticalSectionWrapper*   cells_lock_;
54 };
55 
56 // A LogTable contains multiple rows, where only the latest row is active for
57 // editing. The rows are defined by the ColumnMap, which contains the name of
58 // each column and the length of the column (1 for one-value-columns and greater
59 // than 1 for multi-value-columns).
60 class LogTable {
61  public:
62   LogTable();
63   ~LogTable();
64 
65   // Adds the column with name column_name to the table. The column will be a
66   // multi-value-column if multi_value_length is greater than 1.
67   // column_name is treated in a case sensitive way.
68   int AddColumn(const std::string& column_name, int multi_value_length);
69 
70   // Buffers the current row while it is waiting to be written to file,
71   // which is done by a call to Flush(). A new row is available when the
72   // function returns
73   void NextRow();
74 
75   // Inserts a Container into the cell of the column specified with
76   // column_name.
77   // column_name is treated in a case sensitive way.
78   int InsertCell(const std::string& column_name,
79                  const Container* value_container);
80 
81   // Creates a log file, named as specified in the string file_name, to
82   // where the table will be written when calling Flush().
83   int CreateLogFile(const std::string& file_name);
84 
85   // Write all complete rows to file.
86   // May not be called by two threads simultaneously (doing so may result in
87   // a race condition). Will be called by the file_writer_thread_ when that
88   // thread is running.
89   void Flush();
90 
91  private:
92   // Collection of multi_value_lengths indexed by column name as std::string
93   typedef std::map<std::string, int> ColumnMap;
94   typedef std::list<Row*> RowList;
95 
96   ColumnMap               columns_;
97   RowList                 rows_[2];
98   RowList*                rows_history_;
99   RowList*                rows_flush_;
100   Row*                    current_row_;
101   FileWrapper*            file_;
102   bool                    write_header_;
103   CriticalSectionWrapper* table_lock_;
104 };
105 
Row()106 Row::Row()
107   : cells_(),
108     cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
109 }
110 
~Row()111 Row::~Row() {
112   for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
113     delete it->second;
114     // For maps all iterators (except the erased) are valid after an erase
115     cells_.erase(it++);
116   }
117   delete cells_lock_;
118 }
119 
InsertCell(const std::string & column_name,const Container * value_container)120 int Row::InsertCell(const std::string& column_name,
121                     const Container* value_container) {
122   CriticalSectionScoped synchronize(cells_lock_);
123   assert(cells_.count(column_name) == 0);
124   if (cells_.count(column_name) > 0)
125     return -1;
126   cells_[column_name] = value_container;
127   return 0;
128 }
129 
ToString(const std::string & column_name,std::string * value_string)130 void Row::ToString(const std::string& column_name,
131                    std::string* value_string) {
132   CriticalSectionScoped synchronize(cells_lock_);
133   const Container* container = cells_[column_name];
134   if (container == NULL) {
135     *value_string = "NaN,";
136     return;
137   }
138   container->ToString(value_string);
139 }
140 
LogTable()141 LogTable::LogTable()
142   : columns_(),
143     rows_(),
144     rows_history_(&rows_[0]),
145     rows_flush_(&rows_[1]),
146     current_row_(new Row),
147     file_(FileWrapper::Create()),
148     write_header_(true),
149     table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
150 }
151 
~LogTable()152 LogTable::~LogTable() {
153   for (RowList::iterator row_it = rows_history_->begin();
154        row_it != rows_history_->end();) {
155     delete *row_it;
156     row_it = rows_history_->erase(row_it);
157   }
158   for (ColumnMap::iterator col_it = columns_.begin();
159        col_it != columns_.end();) {
160     // For maps all iterators (except the erased) are valid after an erase
161     columns_.erase(col_it++);
162   }
163   if (file_ != NULL) {
164     file_->Flush();
165     file_->CloseFile();
166     delete file_;
167   }
168   delete current_row_;
169   delete table_lock_;
170 }
171 
AddColumn(const std::string & column_name,int multi_value_length)172 int LogTable::AddColumn(const std::string& column_name,
173                         int multi_value_length) {
174   assert(multi_value_length > 0);
175   if (!write_header_) {
176     // It's not allowed to add new columns after the header
177     // has been written.
178     assert(false);
179     return -1;
180   } else {
181     CriticalSectionScoped synchronize(table_lock_);
182     if (write_header_)
183       columns_[column_name] = multi_value_length;
184     else
185       return -1;
186   }
187   return 0;
188 }
189 
NextRow()190 void LogTable::NextRow() {
191   CriticalSectionScoped sync_rows(table_lock_);
192   rows_history_->push_back(current_row_);
193   current_row_ = new Row;
194 }
195 
InsertCell(const std::string & column_name,const Container * value_container)196 int LogTable::InsertCell(const std::string& column_name,
197                          const Container* value_container) {
198   CriticalSectionScoped synchronize(table_lock_);
199   assert(columns_.count(column_name) > 0);
200   if (columns_.count(column_name) == 0)
201     return -1;
202   return current_row_->InsertCell(column_name, value_container);
203 }
204 
CreateLogFile(const std::string & file_name)205 int LogTable::CreateLogFile(const std::string& file_name) {
206   if (file_name.length() == 0)
207     return -1;
208   if (file_->Open())
209     return -1;
210   file_->OpenFile(file_name.c_str(),
211                   false,  // Open with read/write permissions
212                   false,  // Don't wraparound and write at the beginning when
213                           // the file is full
214                   true);  // Open as a text file
215   if (file_ == NULL)
216     return -1;
217   return 0;
218 }
219 
Flush()220 void LogTable::Flush() {
221   ColumnMap::iterator column_it;
222   bool commit_header = false;
223   if (write_header_) {
224     CriticalSectionScoped synchronize(table_lock_);
225     if (write_header_) {
226       commit_header = true;
227       write_header_ = false;
228     }
229   }
230   if (commit_header) {
231     for (column_it = columns_.begin();
232          column_it != columns_.end(); ++column_it) {
233       if (column_it->second > 1) {
234         file_->WriteText("%s[%u],", column_it->first.c_str(),
235                          column_it->second);
236         for (int i = 1; i < column_it->second; ++i)
237           file_->WriteText(",");
238       } else {
239         file_->WriteText("%s,", column_it->first.c_str());
240       }
241     }
242     if (columns_.size() > 0)
243       file_->WriteText("\n");
244   }
245 
246   // Swap the list used for flushing with the list containing the row history
247   // and clear the history. We also create a local pointer to the new
248   // list used for flushing to avoid race conditions if another thread
249   // calls this function while we are writing.
250   // We don't want to block the list while we're writing to file.
251   {
252     CriticalSectionScoped synchronize(table_lock_);
253     RowList* tmp = rows_flush_;
254     rows_flush_ = rows_history_;
255     rows_history_ = tmp;
256     rows_history_->clear();
257   }
258 
259   // Write all complete rows to file and delete them
260   for (RowList::iterator row_it = rows_flush_->begin();
261        row_it != rows_flush_->end();) {
262     for (column_it = columns_.begin();
263          column_it != columns_.end(); ++column_it) {
264       std::string row_string;
265       (*row_it)->ToString(column_it->first, &row_string);
266       file_->WriteText("%s", row_string.c_str());
267     }
268     if (columns_.size() > 0)
269       file_->WriteText("\n");
270     delete *row_it;
271     row_it = rows_flush_->erase(row_it);
272   }
273 }
274 
CreateLog()275 int DataLog::CreateLog() {
276   return DataLogImpl::CreateLog();
277 }
278 
ReturnLog()279 void DataLog::ReturnLog() {
280   return DataLogImpl::ReturnLog();
281 }
282 
Combine(const std::string & table_name,int table_id)283 std::string DataLog::Combine(const std::string& table_name, int table_id) {
284   std::stringstream ss;
285   std::string combined_id = table_name;
286   std::string number_suffix;
287   ss << "_" << table_id;
288   ss >> number_suffix;
289   combined_id += number_suffix;
290   std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
291                  ::tolower);
292   return combined_id;
293 }
294 
AddTable(const std::string & table_name)295 int DataLog::AddTable(const std::string& table_name) {
296   DataLogImpl* data_log = DataLogImpl::StaticInstance();
297   if (data_log == NULL)
298     return -1;
299   return data_log->AddTable(table_name);
300 }
301 
AddColumn(const std::string & table_name,const std::string & column_name,int multi_value_length)302 int DataLog::AddColumn(const std::string& table_name,
303                        const std::string& column_name,
304                        int multi_value_length) {
305   DataLogImpl* data_log = DataLogImpl::StaticInstance();
306   if (data_log == NULL)
307     return -1;
308   return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
309                                                             column_name,
310                                                             multi_value_length);
311 }
312 
NextRow(const std::string & table_name)313 int DataLog::NextRow(const std::string& table_name) {
314   DataLogImpl* data_log = DataLogImpl::StaticInstance();
315   if (data_log == NULL)
316     return -1;
317   return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
318 }
319 
DataLogImpl()320 DataLogImpl::DataLogImpl()
321     : counter_(1),
322       tables_(),
323       flush_event_(EventWrapper::Create()),
324       file_writer_thread_(
325           new rtc::PlatformThread(DataLogImpl::Run, instance_, "DataLog")),
326       tables_lock_(RWLockWrapper::CreateRWLock()) {}
327 
~DataLogImpl()328 DataLogImpl::~DataLogImpl() {
329   StopThread();
330   Flush();  // Write any remaining rows
331   delete flush_event_;
332   for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
333     delete static_cast<LogTable*>(it->second);
334     // For maps all iterators (except the erased) are valid after an erase
335     tables_.erase(it++);
336   }
337   delete tables_lock_;
338 }
339 
CreateLog()340 int DataLogImpl::CreateLog() {
341   CriticalSectionScoped synchronize(crit_sect_.get());
342   if (instance_ == NULL) {
343     instance_ = new DataLogImpl();
344     return instance_->Init();
345   } else {
346     ++instance_->counter_;
347   }
348   return 0;
349 }
350 
Init()351 int DataLogImpl::Init() {
352   file_writer_thread_->Start();
353   file_writer_thread_->SetPriority(rtc::kHighestPriority);
354   return 0;
355 }
356 
StaticInstance()357 DataLogImpl* DataLogImpl::StaticInstance() {
358   return instance_;
359 }
360 
ReturnLog()361 void DataLogImpl::ReturnLog() {
362   CriticalSectionScoped synchronize(crit_sect_.get());
363   if (instance_ && instance_->counter_ > 1) {
364     --instance_->counter_;
365     return;
366   }
367   delete instance_;
368   instance_ = NULL;
369 }
370 
AddTable(const std::string & table_name)371 int DataLogImpl::AddTable(const std::string& table_name) {
372   WriteLockScoped synchronize(*tables_lock_);
373   // Make sure we don't add a table which already exists
374   if (tables_.count(table_name) > 0)
375     return -1;
376   tables_[table_name] = new LogTable();
377   if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
378     return -1;
379   return 0;
380 }
381 
AddColumn(const std::string & table_name,const std::string & column_name,int multi_value_length)382 int DataLogImpl::AddColumn(const std::string& table_name,
383                            const std::string& column_name,
384                            int multi_value_length) {
385   ReadLockScoped synchronize(*tables_lock_);
386   if (tables_.count(table_name) == 0)
387     return -1;
388   return tables_[table_name]->AddColumn(column_name, multi_value_length);
389 }
390 
InsertCell(const std::string & table_name,const std::string & column_name,const Container * value_container)391 int DataLogImpl::InsertCell(const std::string& table_name,
392                             const std::string& column_name,
393                             const Container* value_container) {
394   ReadLockScoped synchronize(*tables_lock_);
395   assert(tables_.count(table_name) > 0);
396   if (tables_.count(table_name) == 0)
397     return -1;
398   return tables_[table_name]->InsertCell(column_name, value_container);
399 }
400 
NextRow(const std::string & table_name)401 int DataLogImpl::NextRow(const std::string& table_name) {
402   ReadLockScoped synchronize(*tables_lock_);
403   if (tables_.count(table_name) == 0)
404     return -1;
405   tables_[table_name]->NextRow();
406   // Signal a complete row
407   flush_event_->Set();
408   return 0;
409 }
410 
Flush()411 void DataLogImpl::Flush() {
412   ReadLockScoped synchronize(*tables_lock_);
413   for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
414     it->second->Flush();
415   }
416 }
417 
Run(void * obj)418 bool DataLogImpl::Run(void* obj) {
419   static_cast<DataLogImpl*>(obj)->Process();
420   return true;
421 }
422 
Process()423 void DataLogImpl::Process() {
424   // Wait for a row to be complete
425   flush_event_->Wait(WEBRTC_EVENT_INFINITE);
426   Flush();
427 }
428 
StopThread()429 void DataLogImpl::StopThread() {
430   flush_event_->Set();
431   file_writer_thread_->Stop();
432 }
433 
434 }  // namespace webrtc
435