1 /*
2  *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "data_log.h"
12 
13 #include <assert.h>
14 
15 #include <algorithm>
16 #include <list>
17 
18 #include "critical_section_wrapper.h"
19 #include "event_wrapper.h"
20 #include "file_wrapper.h"
21 #include "rw_lock_wrapper.h"
22 #include "thread_wrapper.h"
23 
24 namespace webrtc {
25 
26 DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
27   CriticalSectionWrapper::CreateCriticalSection());
28 
29 DataLogImpl* DataLogImpl::instance_ = NULL;
30 
31 // A Row contains cells, which are indexed by the column names as std::string.
32 // The string index is treated in a case sensitive way.
33 class Row {
34  public:
35   Row();
36   ~Row();
37 
38   // Inserts a Container into the cell of the column specified with
39   // column_name.
40   // column_name is treated in a case sensitive way.
41   int InsertCell(const std::string& column_name,
42                  const Container* value_container);
43 
44   // Converts the value at the column specified by column_name to a string
45   // stored in value_string.
46   // column_name is treated in a case sensitive way.
47   void ToString(const std::string& column_name, std::string* value_string);
48 
49  private:
50   // Collection of containers indexed by column name as std::string
51   typedef std::map<std::string, const Container*> CellMap;
52 
53   CellMap                   cells_;
54   CriticalSectionWrapper*   cells_lock_;
55 };
56 
57 // A LogTable contains multiple rows, where only the latest row is active for
58 // editing. The rows are defined by the ColumnMap, which contains the name of
59 // each column and the length of the column (1 for one-value-columns and greater
60 // than 1 for multi-value-columns).
61 class LogTable {
62  public:
63   LogTable();
64   ~LogTable();
65 
66   // Adds the column with name column_name to the table. The column will be a
67   // multi-value-column if multi_value_length is greater than 1.
68   // column_name is treated in a case sensitive way.
69   int AddColumn(const std::string& column_name, int multi_value_length);
70 
71   // Buffers the current row while it is waiting to be written to file,
72   // which is done by a call to Flush(). A new row is available when the
73   // function returns
74   void NextRow();
75 
76   // Inserts a Container into the cell of the column specified with
77   // column_name.
78   // column_name is treated in a case sensitive way.
79   int InsertCell(const std::string& column_name,
80                  const Container* value_container);
81 
82   // Creates a log file, named as specified in the string file_name, to
83   // where the table will be written when calling Flush().
84   int CreateLogFile(const std::string& file_name);
85 
86   // Write all complete rows to file.
87   // May not be called by two threads simultaneously (doing so may result in
88   // a race condition). Will be called by the file_writer_thread_ when that
89   // thread is running.
90   void Flush();
91 
92  private:
93   // Collection of multi_value_lengths indexed by column name as std::string
94   typedef std::map<std::string, int> ColumnMap;
95   typedef std::list<Row*> RowList;
96 
97   ColumnMap               columns_;
98   RowList                 rows_[2];
99   RowList*                rows_history_;
100   RowList*                rows_flush_;
101   Row*                    current_row_;
102   FileWrapper*            file_;
103   bool                    write_header_;
104   CriticalSectionWrapper* table_lock_;
105 };
106 
Row()107 Row::Row()
108   : cells_(),
109     cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
110 }
111 
~Row()112 Row::~Row() {
113   for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
114     delete it->second;
115     // For maps all iterators (except the erased) are valid after an erase
116     cells_.erase(it++);
117   }
118   delete cells_lock_;
119 }
120 
InsertCell(const std::string & column_name,const Container * value_container)121 int Row::InsertCell(const std::string& column_name,
122                     const Container* value_container) {
123   CriticalSectionScoped synchronize(cells_lock_);
124   assert(cells_.count(column_name) == 0);
125   if (cells_.count(column_name) > 0)
126     return -1;
127   cells_[column_name] = value_container;
128   return 0;
129 }
130 
ToString(const std::string & column_name,std::string * value_string)131 void Row::ToString(const std::string& column_name,
132                    std::string* value_string) {
133   CriticalSectionScoped synchronize(cells_lock_);
134   const Container* container = cells_[column_name];
135   if (container == NULL) {
136     *value_string = "NaN,";
137     return;
138   }
139   container->ToString(value_string);
140 }
141 
LogTable()142 LogTable::LogTable()
143   : columns_(),
144     rows_(),
145     rows_history_(&rows_[0]),
146     rows_flush_(&rows_[1]),
147     current_row_(new Row),
148     file_(FileWrapper::Create()),
149     write_header_(true),
150     table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
151 }
152 
~LogTable()153 LogTable::~LogTable() {
154   for (RowList::iterator row_it = rows_history_->begin();
155        row_it != rows_history_->end();) {
156     delete *row_it;
157     row_it = rows_history_->erase(row_it);
158   }
159   for (ColumnMap::iterator col_it = columns_.begin();
160        col_it != columns_.end();) {
161     // For maps all iterators (except the erased) are valid after an erase
162     columns_.erase(col_it++);
163   }
164   if (file_ != NULL) {
165     file_->Flush();
166     file_->CloseFile();
167     delete file_;
168   }
169   delete current_row_;
170   delete table_lock_;
171 }
172 
AddColumn(const std::string & column_name,int multi_value_length)173 int LogTable::AddColumn(const std::string& column_name,
174                         int multi_value_length) {
175   assert(multi_value_length > 0);
176   if (!write_header_) {
177     // It's not allowed to add new columns after the header
178     // has been written.
179     assert(false);
180     return -1;
181   } else {
182     CriticalSectionScoped synchronize(table_lock_);
183     if (write_header_)
184       columns_[column_name] = multi_value_length;
185     else
186       return -1;
187   }
188   return 0;
189 }
190 
NextRow()191 void LogTable::NextRow() {
192   CriticalSectionScoped sync_rows(table_lock_);
193   rows_history_->push_back(current_row_);
194   current_row_ = new Row;
195 }
196 
InsertCell(const std::string & column_name,const Container * value_container)197 int LogTable::InsertCell(const std::string& column_name,
198                          const Container* value_container) {
199   CriticalSectionScoped synchronize(table_lock_);
200   assert(columns_.count(column_name) > 0);
201   if (columns_.count(column_name) == 0)
202     return -1;
203   return current_row_->InsertCell(column_name, value_container);
204 }
205 
CreateLogFile(const std::string & file_name)206 int LogTable::CreateLogFile(const std::string& file_name) {
207   if (file_name.length() == 0)
208     return -1;
209   if (file_->Open())
210     return -1;
211   file_->OpenFile(file_name.c_str(),
212                   false,  // Open with read/write permissions
213                   false,  // Don't wraparound and write at the beginning when
214                           // the file is full
215                   true);  // Open as a text file
216   if (file_ == NULL)
217     return -1;
218   return 0;
219 }
220 
Flush()221 void LogTable::Flush() {
222   ColumnMap::iterator column_it;
223   bool commit_header = false;
224   if (write_header_) {
225     CriticalSectionScoped synchronize(table_lock_);
226     if (write_header_) {
227       commit_header = true;
228       write_header_ = false;
229     }
230   }
231   if (commit_header) {
232     for (column_it = columns_.begin();
233          column_it != columns_.end(); ++column_it) {
234       if (column_it->second > 1) {
235         file_->WriteText("%s[%u],", column_it->first.c_str(),
236                          column_it->second);
237         for (int i = 1; i < column_it->second; ++i)
238           file_->WriteText(",");
239       } else {
240         file_->WriteText("%s,", column_it->first.c_str());
241       }
242     }
243     if (columns_.size() > 0)
244       file_->WriteText("\n");
245   }
246 
247   // Swap the list used for flushing with the list containing the row history
248   // and clear the history. We also create a local pointer to the new
249   // list used for flushing to avoid race conditions if another thread
250   // calls this function while we are writing.
251   // We don't want to block the list while we're writing to file.
252   {
253     CriticalSectionScoped synchronize(table_lock_);
254     RowList* tmp = rows_flush_;
255     rows_flush_ = rows_history_;
256     rows_history_ = tmp;
257     rows_history_->clear();
258   }
259 
260   // Write all complete rows to file and delete them
261   for (RowList::iterator row_it = rows_flush_->begin();
262        row_it != rows_flush_->end();) {
263     for (column_it = columns_.begin();
264          column_it != columns_.end(); ++column_it) {
265       std::string row_string;
266       (*row_it)->ToString(column_it->first, &row_string);
267       file_->WriteText("%s", row_string.c_str());
268     }
269     if (columns_.size() > 0)
270       file_->WriteText("\n");
271     delete *row_it;
272     row_it = rows_flush_->erase(row_it);
273   }
274 }
275 
CreateLog()276 int DataLog::CreateLog() {
277   return DataLogImpl::CreateLog();
278 }
279 
ReturnLog()280 void DataLog::ReturnLog() {
281   return DataLogImpl::ReturnLog();
282 }
283 
Combine(const std::string & table_name,int table_id)284 std::string DataLog::Combine(const std::string& table_name, int table_id) {
285   std::stringstream ss;
286   std::string combined_id = table_name;
287   std::string number_suffix;
288   ss << "_" << table_id;
289   ss >> number_suffix;
290   combined_id += number_suffix;
291   std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
292                  ::tolower);
293   return combined_id;
294 }
295 
AddTable(const std::string & table_name)296 int DataLog::AddTable(const std::string& table_name) {
297   DataLogImpl* data_log = DataLogImpl::StaticInstance();
298   if (data_log == NULL)
299     return -1;
300   return data_log->AddTable(table_name);
301 }
302 
AddColumn(const std::string & table_name,const std::string & column_name,int multi_value_length)303 int DataLog::AddColumn(const std::string& table_name,
304                        const std::string& column_name,
305                        int multi_value_length) {
306   DataLogImpl* data_log = DataLogImpl::StaticInstance();
307   if (data_log == NULL)
308     return -1;
309   return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
310                                                             column_name,
311                                                             multi_value_length);
312 }
313 
NextRow(const std::string & table_name)314 int DataLog::NextRow(const std::string& table_name) {
315   DataLogImpl* data_log = DataLogImpl::StaticInstance();
316   if (data_log == NULL)
317     return -1;
318   return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
319 }
320 
DataLogImpl()321 DataLogImpl::DataLogImpl()
322   : counter_(1),
323     tables_(),
324     flush_event_(EventWrapper::Create()),
325     file_writer_thread_(NULL),
326     tables_lock_(RWLockWrapper::CreateRWLock()) {
327 }
328 
~DataLogImpl()329 DataLogImpl::~DataLogImpl() {
330   StopThread();
331   Flush();  // Write any remaining rows
332   delete file_writer_thread_;
333   delete flush_event_;
334   for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
335     delete static_cast<LogTable*>(it->second);
336     // For maps all iterators (except the erased) are valid after an erase
337     tables_.erase(it++);
338   }
339   delete tables_lock_;
340 }
341 
CreateLog()342 int DataLogImpl::CreateLog() {
343   CriticalSectionScoped synchronize(crit_sect_.get());
344   if (instance_ == NULL) {
345     instance_ = new DataLogImpl();
346     return instance_->Init();
347   } else {
348     ++instance_->counter_;
349   }
350   return 0;
351 }
352 
Init()353 int DataLogImpl::Init() {
354   file_writer_thread_ = ThreadWrapper::CreateThread(
355                           DataLogImpl::Run,
356                           instance_,
357                           kHighestPriority,
358                           "DataLog");
359   if (file_writer_thread_ == NULL)
360     return -1;
361   unsigned int thread_id = 0;
362   bool success = file_writer_thread_->Start(thread_id);
363   if (!success)
364     return -1;
365   return 0;
366 }
367 
StaticInstance()368 DataLogImpl* DataLogImpl::StaticInstance() {
369   return instance_;
370 }
371 
ReturnLog()372 void DataLogImpl::ReturnLog() {
373   CriticalSectionScoped synchronize(crit_sect_.get());
374   if (instance_ && instance_->counter_ > 1) {
375     --instance_->counter_;
376     return;
377   }
378   delete instance_;
379   instance_ = NULL;
380 }
381 
AddTable(const std::string & table_name)382 int DataLogImpl::AddTable(const std::string& table_name) {
383   WriteLockScoped synchronize(*tables_lock_);
384   // Make sure we don't add a table which already exists
385   if (tables_.count(table_name) > 0)
386     return -1;
387   tables_[table_name] = new LogTable();
388   if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
389     return -1;
390   return 0;
391 }
392 
AddColumn(const std::string & table_name,const std::string & column_name,int multi_value_length)393 int DataLogImpl::AddColumn(const std::string& table_name,
394                            const std::string& column_name,
395                            int multi_value_length) {
396   ReadLockScoped synchronize(*tables_lock_);
397   if (tables_.count(table_name) == 0)
398     return -1;
399   return tables_[table_name]->AddColumn(column_name, multi_value_length);
400 }
401 
InsertCell(const std::string & table_name,const std::string & column_name,const Container * value_container)402 int DataLogImpl::InsertCell(const std::string& table_name,
403                             const std::string& column_name,
404                             const Container* value_container) {
405   ReadLockScoped synchronize(*tables_lock_);
406   assert(tables_.count(table_name) > 0);
407   if (tables_.count(table_name) == 0)
408     return -1;
409   return tables_[table_name]->InsertCell(column_name, value_container);
410 }
411 
NextRow(const std::string & table_name)412 int DataLogImpl::NextRow(const std::string& table_name) {
413   ReadLockScoped synchronize(*tables_lock_);
414   if (tables_.count(table_name) == 0)
415     return -1;
416   tables_[table_name]->NextRow();
417   if (file_writer_thread_ == NULL) {
418     // Write every row to file as they get complete.
419     tables_[table_name]->Flush();
420   } else {
421     // Signal a complete row
422     flush_event_->Set();
423   }
424   return 0;
425 }
426 
Flush()427 void DataLogImpl::Flush() {
428   ReadLockScoped synchronize(*tables_lock_);
429   for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
430     it->second->Flush();
431   }
432 }
433 
Run(void * obj)434 bool DataLogImpl::Run(void* obj) {
435   static_cast<DataLogImpl*>(obj)->Process();
436   return true;
437 }
438 
Process()439 void DataLogImpl::Process() {
440   // Wait for a row to be complete
441   flush_event_->Wait(WEBRTC_EVENT_INFINITE);
442   Flush();
443 }
444 
StopThread()445 void DataLogImpl::StopThread() {
446   if (file_writer_thread_ != NULL) {
447     file_writer_thread_->SetNotAlive();
448     flush_event_->Set();
449     // Call Stop() repeatedly, waiting for the Flush() call in Process() to
450     // finish.
451     while (!file_writer_thread_->Stop()) continue;
452   }
453 }
454 
455 }  // namespace webrtc
456