1 /*
2 * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "data_log.h"
12
13 #include <assert.h>
14
15 #include <algorithm>
16 #include <list>
17
18 #include "critical_section_wrapper.h"
19 #include "event_wrapper.h"
20 #include "file_wrapper.h"
21 #include "rw_lock_wrapper.h"
22 #include "thread_wrapper.h"
23
24 namespace webrtc {
25
26 DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
27 CriticalSectionWrapper::CreateCriticalSection());
28
29 DataLogImpl* DataLogImpl::instance_ = NULL;
30
31 // A Row contains cells, which are indexed by the column names as std::string.
32 // The string index is treated in a case sensitive way.
33 class Row {
34 public:
35 Row();
36 ~Row();
37
38 // Inserts a Container into the cell of the column specified with
39 // column_name.
40 // column_name is treated in a case sensitive way.
41 int InsertCell(const std::string& column_name,
42 const Container* value_container);
43
44 // Converts the value at the column specified by column_name to a string
45 // stored in value_string.
46 // column_name is treated in a case sensitive way.
47 void ToString(const std::string& column_name, std::string* value_string);
48
49 private:
50 // Collection of containers indexed by column name as std::string
51 typedef std::map<std::string, const Container*> CellMap;
52
53 CellMap cells_;
54 CriticalSectionWrapper* cells_lock_;
55 };
56
57 // A LogTable contains multiple rows, where only the latest row is active for
58 // editing. The rows are defined by the ColumnMap, which contains the name of
59 // each column and the length of the column (1 for one-value-columns and greater
60 // than 1 for multi-value-columns).
61 class LogTable {
62 public:
63 LogTable();
64 ~LogTable();
65
66 // Adds the column with name column_name to the table. The column will be a
67 // multi-value-column if multi_value_length is greater than 1.
68 // column_name is treated in a case sensitive way.
69 int AddColumn(const std::string& column_name, int multi_value_length);
70
71 // Buffers the current row while it is waiting to be written to file,
72 // which is done by a call to Flush(). A new row is available when the
73 // function returns
74 void NextRow();
75
76 // Inserts a Container into the cell of the column specified with
77 // column_name.
78 // column_name is treated in a case sensitive way.
79 int InsertCell(const std::string& column_name,
80 const Container* value_container);
81
82 // Creates a log file, named as specified in the string file_name, to
83 // where the table will be written when calling Flush().
84 int CreateLogFile(const std::string& file_name);
85
86 // Write all complete rows to file.
87 // May not be called by two threads simultaneously (doing so may result in
88 // a race condition). Will be called by the file_writer_thread_ when that
89 // thread is running.
90 void Flush();
91
92 private:
93 // Collection of multi_value_lengths indexed by column name as std::string
94 typedef std::map<std::string, int> ColumnMap;
95 typedef std::list<Row*> RowList;
96
97 ColumnMap columns_;
98 RowList rows_[2];
99 RowList* rows_history_;
100 RowList* rows_flush_;
101 Row* current_row_;
102 FileWrapper* file_;
103 bool write_header_;
104 CriticalSectionWrapper* table_lock_;
105 };
106
Row()107 Row::Row()
108 : cells_(),
109 cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
110 }
111
~Row()112 Row::~Row() {
113 for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
114 delete it->second;
115 // For maps all iterators (except the erased) are valid after an erase
116 cells_.erase(it++);
117 }
118 delete cells_lock_;
119 }
120
InsertCell(const std::string & column_name,const Container * value_container)121 int Row::InsertCell(const std::string& column_name,
122 const Container* value_container) {
123 CriticalSectionScoped synchronize(cells_lock_);
124 assert(cells_.count(column_name) == 0);
125 if (cells_.count(column_name) > 0)
126 return -1;
127 cells_[column_name] = value_container;
128 return 0;
129 }
130
ToString(const std::string & column_name,std::string * value_string)131 void Row::ToString(const std::string& column_name,
132 std::string* value_string) {
133 CriticalSectionScoped synchronize(cells_lock_);
134 const Container* container = cells_[column_name];
135 if (container == NULL) {
136 *value_string = "NaN,";
137 return;
138 }
139 container->ToString(value_string);
140 }
141
LogTable()142 LogTable::LogTable()
143 : columns_(),
144 rows_(),
145 rows_history_(&rows_[0]),
146 rows_flush_(&rows_[1]),
147 current_row_(new Row),
148 file_(FileWrapper::Create()),
149 write_header_(true),
150 table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
151 }
152
~LogTable()153 LogTable::~LogTable() {
154 for (RowList::iterator row_it = rows_history_->begin();
155 row_it != rows_history_->end();) {
156 delete *row_it;
157 row_it = rows_history_->erase(row_it);
158 }
159 for (ColumnMap::iterator col_it = columns_.begin();
160 col_it != columns_.end();) {
161 // For maps all iterators (except the erased) are valid after an erase
162 columns_.erase(col_it++);
163 }
164 if (file_ != NULL) {
165 file_->Flush();
166 file_->CloseFile();
167 delete file_;
168 }
169 delete current_row_;
170 delete table_lock_;
171 }
172
AddColumn(const std::string & column_name,int multi_value_length)173 int LogTable::AddColumn(const std::string& column_name,
174 int multi_value_length) {
175 assert(multi_value_length > 0);
176 if (!write_header_) {
177 // It's not allowed to add new columns after the header
178 // has been written.
179 assert(false);
180 return -1;
181 } else {
182 CriticalSectionScoped synchronize(table_lock_);
183 if (write_header_)
184 columns_[column_name] = multi_value_length;
185 else
186 return -1;
187 }
188 return 0;
189 }
190
NextRow()191 void LogTable::NextRow() {
192 CriticalSectionScoped sync_rows(table_lock_);
193 rows_history_->push_back(current_row_);
194 current_row_ = new Row;
195 }
196
InsertCell(const std::string & column_name,const Container * value_container)197 int LogTable::InsertCell(const std::string& column_name,
198 const Container* value_container) {
199 CriticalSectionScoped synchronize(table_lock_);
200 assert(columns_.count(column_name) > 0);
201 if (columns_.count(column_name) == 0)
202 return -1;
203 return current_row_->InsertCell(column_name, value_container);
204 }
205
CreateLogFile(const std::string & file_name)206 int LogTable::CreateLogFile(const std::string& file_name) {
207 if (file_name.length() == 0)
208 return -1;
209 if (file_->Open())
210 return -1;
211 file_->OpenFile(file_name.c_str(),
212 false, // Open with read/write permissions
213 false, // Don't wraparound and write at the beginning when
214 // the file is full
215 true); // Open as a text file
216 if (file_ == NULL)
217 return -1;
218 return 0;
219 }
220
Flush()221 void LogTable::Flush() {
222 ColumnMap::iterator column_it;
223 bool commit_header = false;
224 if (write_header_) {
225 CriticalSectionScoped synchronize(table_lock_);
226 if (write_header_) {
227 commit_header = true;
228 write_header_ = false;
229 }
230 }
231 if (commit_header) {
232 for (column_it = columns_.begin();
233 column_it != columns_.end(); ++column_it) {
234 if (column_it->second > 1) {
235 file_->WriteText("%s[%u],", column_it->first.c_str(),
236 column_it->second);
237 for (int i = 1; i < column_it->second; ++i)
238 file_->WriteText(",");
239 } else {
240 file_->WriteText("%s,", column_it->first.c_str());
241 }
242 }
243 if (columns_.size() > 0)
244 file_->WriteText("\n");
245 }
246
247 // Swap the list used for flushing with the list containing the row history
248 // and clear the history. We also create a local pointer to the new
249 // list used for flushing to avoid race conditions if another thread
250 // calls this function while we are writing.
251 // We don't want to block the list while we're writing to file.
252 {
253 CriticalSectionScoped synchronize(table_lock_);
254 RowList* tmp = rows_flush_;
255 rows_flush_ = rows_history_;
256 rows_history_ = tmp;
257 rows_history_->clear();
258 }
259
260 // Write all complete rows to file and delete them
261 for (RowList::iterator row_it = rows_flush_->begin();
262 row_it != rows_flush_->end();) {
263 for (column_it = columns_.begin();
264 column_it != columns_.end(); ++column_it) {
265 std::string row_string;
266 (*row_it)->ToString(column_it->first, &row_string);
267 file_->WriteText("%s", row_string.c_str());
268 }
269 if (columns_.size() > 0)
270 file_->WriteText("\n");
271 delete *row_it;
272 row_it = rows_flush_->erase(row_it);
273 }
274 }
275
CreateLog()276 int DataLog::CreateLog() {
277 return DataLogImpl::CreateLog();
278 }
279
ReturnLog()280 void DataLog::ReturnLog() {
281 return DataLogImpl::ReturnLog();
282 }
283
Combine(const std::string & table_name,int table_id)284 std::string DataLog::Combine(const std::string& table_name, int table_id) {
285 std::stringstream ss;
286 std::string combined_id = table_name;
287 std::string number_suffix;
288 ss << "_" << table_id;
289 ss >> number_suffix;
290 combined_id += number_suffix;
291 std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
292 ::tolower);
293 return combined_id;
294 }
295
AddTable(const std::string & table_name)296 int DataLog::AddTable(const std::string& table_name) {
297 DataLogImpl* data_log = DataLogImpl::StaticInstance();
298 if (data_log == NULL)
299 return -1;
300 return data_log->AddTable(table_name);
301 }
302
AddColumn(const std::string & table_name,const std::string & column_name,int multi_value_length)303 int DataLog::AddColumn(const std::string& table_name,
304 const std::string& column_name,
305 int multi_value_length) {
306 DataLogImpl* data_log = DataLogImpl::StaticInstance();
307 if (data_log == NULL)
308 return -1;
309 return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
310 column_name,
311 multi_value_length);
312 }
313
NextRow(const std::string & table_name)314 int DataLog::NextRow(const std::string& table_name) {
315 DataLogImpl* data_log = DataLogImpl::StaticInstance();
316 if (data_log == NULL)
317 return -1;
318 return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
319 }
320
DataLogImpl()321 DataLogImpl::DataLogImpl()
322 : counter_(1),
323 tables_(),
324 flush_event_(EventWrapper::Create()),
325 file_writer_thread_(NULL),
326 tables_lock_(RWLockWrapper::CreateRWLock()) {
327 }
328
~DataLogImpl()329 DataLogImpl::~DataLogImpl() {
330 StopThread();
331 Flush(); // Write any remaining rows
332 delete file_writer_thread_;
333 delete flush_event_;
334 for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
335 delete static_cast<LogTable*>(it->second);
336 // For maps all iterators (except the erased) are valid after an erase
337 tables_.erase(it++);
338 }
339 delete tables_lock_;
340 }
341
CreateLog()342 int DataLogImpl::CreateLog() {
343 CriticalSectionScoped synchronize(crit_sect_.get());
344 if (instance_ == NULL) {
345 instance_ = new DataLogImpl();
346 return instance_->Init();
347 } else {
348 ++instance_->counter_;
349 }
350 return 0;
351 }
352
Init()353 int DataLogImpl::Init() {
354 file_writer_thread_ = ThreadWrapper::CreateThread(
355 DataLogImpl::Run,
356 instance_,
357 kHighestPriority,
358 "DataLog");
359 if (file_writer_thread_ == NULL)
360 return -1;
361 unsigned int thread_id = 0;
362 bool success = file_writer_thread_->Start(thread_id);
363 if (!success)
364 return -1;
365 return 0;
366 }
367
StaticInstance()368 DataLogImpl* DataLogImpl::StaticInstance() {
369 return instance_;
370 }
371
ReturnLog()372 void DataLogImpl::ReturnLog() {
373 CriticalSectionScoped synchronize(crit_sect_.get());
374 if (instance_ && instance_->counter_ > 1) {
375 --instance_->counter_;
376 return;
377 }
378 delete instance_;
379 instance_ = NULL;
380 }
381
AddTable(const std::string & table_name)382 int DataLogImpl::AddTable(const std::string& table_name) {
383 WriteLockScoped synchronize(*tables_lock_);
384 // Make sure we don't add a table which already exists
385 if (tables_.count(table_name) > 0)
386 return -1;
387 tables_[table_name] = new LogTable();
388 if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
389 return -1;
390 return 0;
391 }
392
AddColumn(const std::string & table_name,const std::string & column_name,int multi_value_length)393 int DataLogImpl::AddColumn(const std::string& table_name,
394 const std::string& column_name,
395 int multi_value_length) {
396 ReadLockScoped synchronize(*tables_lock_);
397 if (tables_.count(table_name) == 0)
398 return -1;
399 return tables_[table_name]->AddColumn(column_name, multi_value_length);
400 }
401
InsertCell(const std::string & table_name,const std::string & column_name,const Container * value_container)402 int DataLogImpl::InsertCell(const std::string& table_name,
403 const std::string& column_name,
404 const Container* value_container) {
405 ReadLockScoped synchronize(*tables_lock_);
406 assert(tables_.count(table_name) > 0);
407 if (tables_.count(table_name) == 0)
408 return -1;
409 return tables_[table_name]->InsertCell(column_name, value_container);
410 }
411
NextRow(const std::string & table_name)412 int DataLogImpl::NextRow(const std::string& table_name) {
413 ReadLockScoped synchronize(*tables_lock_);
414 if (tables_.count(table_name) == 0)
415 return -1;
416 tables_[table_name]->NextRow();
417 if (file_writer_thread_ == NULL) {
418 // Write every row to file as they get complete.
419 tables_[table_name]->Flush();
420 } else {
421 // Signal a complete row
422 flush_event_->Set();
423 }
424 return 0;
425 }
426
Flush()427 void DataLogImpl::Flush() {
428 ReadLockScoped synchronize(*tables_lock_);
429 for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
430 it->second->Flush();
431 }
432 }
433
Run(void * obj)434 bool DataLogImpl::Run(void* obj) {
435 static_cast<DataLogImpl*>(obj)->Process();
436 return true;
437 }
438
Process()439 void DataLogImpl::Process() {
440 // Wait for a row to be complete
441 flush_event_->Wait(WEBRTC_EVENT_INFINITE);
442 Flush();
443 }
444
StopThread()445 void DataLogImpl::StopThread() {
446 if (file_writer_thread_ != NULL) {
447 file_writer_thread_->SetNotAlive();
448 flush_event_->Set();
449 // Call Stop() repeatedly, waiting for the Flush() call in Process() to
450 // finish.
451 while (!file_writer_thread_->Stop()) continue;
452 }
453 }
454
455 } // namespace webrtc
456