1 // sttable.h
2 
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 // Copyright 2005-2010 Google, Inc.
16 // Author: allauzen@google.com (Cyril Allauzen)
17 //
18 // \file
19 // A generic string-to-type table file format
20 //
21 // This is not meant as a generalization of SSTable. This is more of
22 // a simple replacement for SSTable in order to provide an open-source
23 // implementation of the FAR format for the external version of the
24 // FST Library.
25 
26 #ifndef FST_EXTENSIONS_FAR_STTABLE_H_
27 #define FST_EXTENSIONS_FAR_STTABLE_H_
28 
29 #include <algorithm>
30 #include <iostream>
31 #include <fstream>
32 #include <sstream>
33 #include <fst/util.h>
34 
35 namespace fst {
36 
37 static const int32 kSTTableMagicNumber = 2125656924;
38 static const int32 kSTTableFileVersion = 1;
39 
40 // String-to-type table writing class for object of type 'T' using functor 'W'
41 // to write an object of type 'T' from a stream. 'W' must conform to the
42 // following interface:
43 //
44 //   struct Writer {
45 //     void operator()(ostream &, const T &) const;
46 //   };
47 //
48 template <class T, class W>
49 class STTableWriter {
50  public:
51   typedef T EntryType;
52   typedef W EntryWriter;
53 
STTableWriter(const string & filename)54   explicit STTableWriter(const string &filename)
55       : stream_(filename.c_str(), ofstream::out | ofstream::binary),
56         error_(false) {
57     WriteType(stream_, kSTTableMagicNumber);
58     WriteType(stream_, kSTTableFileVersion);
59     if (!stream_) {
60       FSTERROR() << "STTableWriter::STTableWriter: error writing to file: "
61                  << filename;
62       error_=true;
63     }
64   }
65 
Create(const string & filename)66   static STTableWriter<T, W> *Create(const string &filename) {
67     if (filename.empty()) {
68       LOG(ERROR) << "STTableWriter: writing to standard out unsupported.";
69       return 0;
70     }
71     return new STTableWriter<T, W>(filename);
72   }
73 
Add(const string & key,const T & t)74   void Add(const string &key, const T &t) {
75     if (key == "") {
76       FSTERROR() << "STTableWriter::Add: key empty: " << key;
77       error_ = true;
78     } else if (key < last_key_) {
79       FSTERROR() << "STTableWriter::Add: key disorder: " << key;
80       error_ = true;
81     }
82     if (error_) return;
83     last_key_ = key;
84     positions_.push_back(stream_.tellp());
85     WriteType(stream_, key);
86     entry_writer_(stream_, t);
87   }
88 
Error()89   bool Error() const { return error_; }
90 
~STTableWriter()91   ~STTableWriter() {
92     WriteType(stream_, positions_);
93     WriteType(stream_, static_cast<int64>(positions_.size()));
94   }
95 
96  private:
97   EntryWriter entry_writer_;  // Write functor for 'EntryType'
98   ofstream stream_;           // Output stream
99   vector<int64> positions_;   // Position in file of each key-entry pair
100   string last_key_;           // Last key
101   bool error_;
102 
103   DISALLOW_COPY_AND_ASSIGN(STTableWriter);
104 };
105 
106 
107 // String-to-type table reading class for object of type 'T' using functor 'R'
108 // to read an object of type 'T' form a stream. 'R' must conform to the
109 // following interface:
110 //
111 //   struct Reader {
112 //     T *operator()(istream &) const;
113 //   };
114 //
115 template <class T, class R>
116 class STTableReader {
117  public:
118   typedef T EntryType;
119   typedef R EntryReader;
120 
STTableReader(const vector<string> & filenames)121   explicit STTableReader(const vector<string> &filenames)
122       : sources_(filenames), entry_(0), error_(false) {
123     compare_ = new Compare(&keys_);
124     keys_.resize(filenames.size());
125     streams_.resize(filenames.size(), 0);
126     positions_.resize(filenames.size());
127     for (size_t i = 0; i < filenames.size(); ++i) {
128       streams_[i] = new ifstream(
129           filenames[i].c_str(), ifstream::in | ifstream::binary);
130       int32 magic_number = 0, file_version = 0;
131       ReadType(*streams_[i], &magic_number);
132       ReadType(*streams_[i], &file_version);
133       if (magic_number != kSTTableMagicNumber) {
134         FSTERROR() << "STTableReader::STTableReader: wrong file type: "
135                    << filenames[i];
136         error_ = true;
137         return;
138       }
139       if (file_version != kSTTableFileVersion) {
140         FSTERROR() << "STTableReader::STTableReader: wrong file version: "
141                    << filenames[i];
142         error_ = true;
143         return;
144       }
145       int64 num_entries;
146       streams_[i]->seekg(-static_cast<int>(sizeof(int64)), ios_base::end);
147       ReadType(*streams_[i], &num_entries);
148       streams_[i]->seekg(-static_cast<int>(sizeof(int64)) *
149                          (num_entries + 1), ios_base::end);
150       positions_[i].resize(num_entries);
151       for (size_t j = 0; (j < num_entries) && (*streams_[i]); ++j)
152         ReadType(*streams_[i], &(positions_[i][j]));
153       streams_[i]->seekg(positions_[i][0]);
154       if (!*streams_[i]) {
155         FSTERROR() << "STTableReader::STTableReader: error reading file: "
156                    << filenames[i];
157         error_ = true;
158         return;
159       }
160 
161     }
162     MakeHeap();
163   }
164 
~STTableReader()165   ~STTableReader() {
166     for (size_t i = 0; i < streams_.size(); ++i)
167       delete streams_[i];
168     delete compare_;
169     if (entry_)
170       delete entry_;
171   }
172 
Open(const string & filename)173   static STTableReader<T, R> *Open(const string &filename) {
174     if (filename.empty()) {
175       LOG(ERROR) << "STTableReader: reading from standard in not supported";
176       return 0;
177     }
178     vector<string> filenames;
179     filenames.push_back(filename);
180     return new STTableReader<T, R>(filenames);
181   }
182 
Open(const vector<string> & filenames)183   static STTableReader<T, R> *Open(const vector<string> &filenames) {
184     return new STTableReader<T, R>(filenames);
185   }
186 
Reset()187   void Reset() {
188     if (error_) return;
189     for (size_t i = 0; i < streams_.size(); ++i)
190       streams_[i]->seekg(positions_[i].front());
191     MakeHeap();
192   }
193 
Find(const string & key)194   bool Find(const string &key) {
195     if (error_) return false;
196     for (size_t i = 0; i < streams_.size(); ++i)
197       LowerBound(i, key);
198     MakeHeap();
199     return keys_[current_] == key;
200   }
201 
Done()202   bool Done() const { return error_ || heap_.empty(); }
203 
Next()204   void Next() {
205     if (error_) return;
206     if (streams_[current_]->tellg() <= positions_[current_].back()) {
207       ReadType(*(streams_[current_]), &(keys_[current_]));
208       if (!*streams_[current_]) {
209         FSTERROR() << "STTableReader: error reading file: "
210                    << sources_[current_];
211         error_ = true;
212         return;
213       }
214       push_heap(heap_.begin(), heap_.end(), *compare_);
215     } else {
216       heap_.pop_back();
217     }
218     if (!heap_.empty())
219       PopHeap();
220   }
221 
GetKey()222   const string &GetKey() const {
223     return keys_[current_];
224   }
225 
GetEntry()226   const EntryType &GetEntry() const {
227     return *entry_;
228   }
229 
Error()230   bool Error() const { return error_; }
231 
232  private:
233   // Comparison functor used to compare stream IDs in the heap
234   struct Compare {
CompareCompare235     Compare(const vector<string> *keys) : keys_(keys) {}
236 
operatorCompare237     bool operator()(size_t i, size_t j) const {
238       return (*keys_)[i] > (*keys_)[j];
239     };
240 
241    private:
242     const vector<string> *keys_;
243   };
244 
245   // Position the stream with ID 'id' at the position corresponding
246   // to the lower bound for key 'find_key'
LowerBound(size_t id,const string & find_key)247   void LowerBound(size_t id, const string &find_key) {
248     ifstream *strm = streams_[id];
249     const vector<int64> &positions = positions_[id];
250     size_t low = 0, high = positions.size() - 1;
251 
252     while (low < high) {
253       size_t mid = (low + high)/2;
254       strm->seekg(positions[mid]);
255       string key;
256       ReadType(*strm, &key);
257       if (key > find_key) {
258         high = mid;
259       } else if (key < find_key) {
260         low = mid + 1;
261       } else {
262         for (size_t i = mid; i > low; --i) {
263           strm->seekg(positions[i - 1]);
264           ReadType(*strm, &key);
265           if (key != find_key) {
266             strm->seekg(positions[i]);
267             return;
268           }
269         }
270         strm->seekg(positions[low]);
271         return;
272       }
273     }
274     strm->seekg(positions[low]);
275   }
276 
277   // Add all streams to the heap
MakeHeap()278   void MakeHeap() {
279     heap_.clear();
280     for (size_t i = 0; i < streams_.size(); ++i) {
281       ReadType(*streams_[i], &(keys_[i]));
282       if (!*streams_[i]) {
283         FSTERROR() << "STTableReader: error reading file: " << sources_[i];
284         error_ = true;
285         return;
286       }
287       heap_.push_back(i);
288     }
289     make_heap(heap_.begin(), heap_.end(), *compare_);
290     PopHeap();
291   }
292 
293   // Position the stream with the lowest key at the top
294   // of the heap, set 'current_' to the ID of that stream
295   // and read the current entry from that stream
PopHeap()296   void PopHeap() {
297     pop_heap(heap_.begin(), heap_.end(), *compare_);
298     current_ = heap_.back();
299     if (entry_)
300       delete entry_;
301     entry_ = entry_reader_(*streams_[current_]);
302     if (!entry_)
303       error_ = true;
304     if (!*streams_[current_]) {
305       FSTERROR() << "STTableReader: error reading entry for key: "
306                  << keys_[current_] << ", file: " << sources_[current_];
307       error_ = true;
308     }
309   }
310 
311 
312   EntryReader entry_reader_;   // Read functor for 'EntryType'
313   vector<ifstream*> streams_;  // Input streams
314   vector<string> sources_;     // and corresponding file names
315   vector<vector<int64> > positions_;  // Index of positions for each stream
316   vector<string> keys_;  // Lowest unread key for each stream
317   vector<int64> heap_;   // Heap containing ID of streams with unread keys
318   int64 current_;        // Id of current stream to be read
319   Compare *compare_;     // Functor comparing stream IDs for the heap
320   mutable EntryType *entry_;  // Pointer to the currently read entry
321   bool error_;
322 
323   DISALLOW_COPY_AND_ASSIGN(STTableReader);
324 };
325 
326 
327 // String-to-type table header reading function template on the entry header
328 // type 'H' having a member function:
329 //   Read(istream &strm, const string &filename);
330 // Checks that 'filename' is an STTable and call the H::Read() on the last
331 // entry in the STTable.
332 template <class H>
ReadSTTableHeader(const string & filename,H * header)333 bool ReadSTTableHeader(const string &filename, H *header) {
334   ifstream strm(filename.c_str(), ifstream::in | ifstream::binary);
335   int32 magic_number = 0, file_version = 0;
336   ReadType(strm, &magic_number);
337   ReadType(strm, &file_version);
338   if (magic_number != kSTTableMagicNumber) {
339     LOG(ERROR) << "ReadSTTableHeader: wrong file type: " << filename;
340     return false;
341   }
342   if (file_version != kSTTableFileVersion) {
343     LOG(ERROR) << "ReadSTTableHeader: wrong file version: " << filename;
344     return false;
345   }
346   int64 i = -1;
347   strm.seekg(-static_cast<int>(sizeof(int64)), ios_base::end);
348   ReadType(strm, &i);  // Read number of entries
349   if (!strm) {
350     LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename;
351     return false;
352   }
353   if (i == 0) return true;  // No entry header to read
354   strm.seekg(-2 * static_cast<int>(sizeof(int64)), ios_base::end);
355   ReadType(strm, &i);  // Read position for last entry in file
356   strm.seekg(i);
357   string key;
358   ReadType(strm, &key);
359   header->Read(strm, filename + ":" + key);
360   if (!strm) {
361     LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename;
362     return false;
363   }
364   return true;
365 }
366 
367 bool IsSTTable(const string &filename);
368 
369 }  // namespace fst
370 
371 #endif  // FST_EXTENSIONS_FAR_STTABLE_H_
372