1 // sttable.h
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 // Copyright 2005-2010 Google, Inc.
16 // Author: allauzen@google.com (Cyril Allauzen)
17 //
18 // \file
19 // A generic string-to-type table file format
20 //
21 // This is not meant as a generalization of SSTable. This is more of
22 // a simple replacement for SSTable in order to provide an open-source
23 // implementation of the FAR format for the external version of the
24 // FST Library.
25
26 #ifndef FST_EXTENSIONS_FAR_STTABLE_H_
27 #define FST_EXTENSIONS_FAR_STTABLE_H_
28
29 #include <algorithm>
30 #include <iostream>
31 #include <fstream>
32 #include <sstream>
33 #include <fst/util.h>
34
35 namespace fst {
36
37 static const int32 kSTTableMagicNumber = 2125656924;
38 static const int32 kSTTableFileVersion = 1;
39
40 // String-to-type table writing class for object of type 'T' using functor 'W'
41 // to write an object of type 'T' from a stream. 'W' must conform to the
42 // following interface:
43 //
44 // struct Writer {
45 // void operator()(ostream &, const T &) const;
46 // };
47 //
48 template <class T, class W>
49 class STTableWriter {
50 public:
51 typedef T EntryType;
52 typedef W EntryWriter;
53
STTableWriter(const string & filename)54 explicit STTableWriter(const string &filename)
55 : stream_(filename.c_str(), ofstream::out | ofstream::binary),
56 error_(false) {
57 WriteType(stream_, kSTTableMagicNumber);
58 WriteType(stream_, kSTTableFileVersion);
59 if (!stream_) {
60 FSTERROR() << "STTableWriter::STTableWriter: error writing to file: "
61 << filename;
62 error_=true;
63 }
64 }
65
Create(const string & filename)66 static STTableWriter<T, W> *Create(const string &filename) {
67 if (filename.empty()) {
68 LOG(ERROR) << "STTableWriter: writing to standard out unsupported.";
69 return 0;
70 }
71 return new STTableWriter<T, W>(filename);
72 }
73
Add(const string & key,const T & t)74 void Add(const string &key, const T &t) {
75 if (key == "") {
76 FSTERROR() << "STTableWriter::Add: key empty: " << key;
77 error_ = true;
78 } else if (key < last_key_) {
79 FSTERROR() << "STTableWriter::Add: key disorder: " << key;
80 error_ = true;
81 }
82 if (error_) return;
83 last_key_ = key;
84 positions_.push_back(stream_.tellp());
85 WriteType(stream_, key);
86 entry_writer_(stream_, t);
87 }
88
Error()89 bool Error() const { return error_; }
90
~STTableWriter()91 ~STTableWriter() {
92 WriteType(stream_, positions_);
93 WriteType(stream_, static_cast<int64>(positions_.size()));
94 }
95
96 private:
97 EntryWriter entry_writer_; // Write functor for 'EntryType'
98 ofstream stream_; // Output stream
99 vector<int64> positions_; // Position in file of each key-entry pair
100 string last_key_; // Last key
101 bool error_;
102
103 DISALLOW_COPY_AND_ASSIGN(STTableWriter);
104 };
105
106
107 // String-to-type table reading class for object of type 'T' using functor 'R'
108 // to read an object of type 'T' form a stream. 'R' must conform to the
109 // following interface:
110 //
111 // struct Reader {
112 // T *operator()(istream &) const;
113 // };
114 //
115 template <class T, class R>
116 class STTableReader {
117 public:
118 typedef T EntryType;
119 typedef R EntryReader;
120
STTableReader(const vector<string> & filenames)121 explicit STTableReader(const vector<string> &filenames)
122 : sources_(filenames), entry_(0), error_(false) {
123 compare_ = new Compare(&keys_);
124 keys_.resize(filenames.size());
125 streams_.resize(filenames.size(), 0);
126 positions_.resize(filenames.size());
127 for (size_t i = 0; i < filenames.size(); ++i) {
128 streams_[i] = new ifstream(
129 filenames[i].c_str(), ifstream::in | ifstream::binary);
130 int32 magic_number = 0, file_version = 0;
131 ReadType(*streams_[i], &magic_number);
132 ReadType(*streams_[i], &file_version);
133 if (magic_number != kSTTableMagicNumber) {
134 FSTERROR() << "STTableReader::STTableReader: wrong file type: "
135 << filenames[i];
136 error_ = true;
137 return;
138 }
139 if (file_version != kSTTableFileVersion) {
140 FSTERROR() << "STTableReader::STTableReader: wrong file version: "
141 << filenames[i];
142 error_ = true;
143 return;
144 }
145 int64 num_entries;
146 streams_[i]->seekg(-static_cast<int>(sizeof(int64)), ios_base::end);
147 ReadType(*streams_[i], &num_entries);
148 streams_[i]->seekg(-static_cast<int>(sizeof(int64)) *
149 (num_entries + 1), ios_base::end);
150 positions_[i].resize(num_entries);
151 for (size_t j = 0; (j < num_entries) && (*streams_[i]); ++j)
152 ReadType(*streams_[i], &(positions_[i][j]));
153 streams_[i]->seekg(positions_[i][0]);
154 if (!*streams_[i]) {
155 FSTERROR() << "STTableReader::STTableReader: error reading file: "
156 << filenames[i];
157 error_ = true;
158 return;
159 }
160
161 }
162 MakeHeap();
163 }
164
~STTableReader()165 ~STTableReader() {
166 for (size_t i = 0; i < streams_.size(); ++i)
167 delete streams_[i];
168 delete compare_;
169 if (entry_)
170 delete entry_;
171 }
172
Open(const string & filename)173 static STTableReader<T, R> *Open(const string &filename) {
174 if (filename.empty()) {
175 LOG(ERROR) << "STTableReader: reading from standard in not supported";
176 return 0;
177 }
178 vector<string> filenames;
179 filenames.push_back(filename);
180 return new STTableReader<T, R>(filenames);
181 }
182
Open(const vector<string> & filenames)183 static STTableReader<T, R> *Open(const vector<string> &filenames) {
184 return new STTableReader<T, R>(filenames);
185 }
186
Reset()187 void Reset() {
188 if (error_) return;
189 for (size_t i = 0; i < streams_.size(); ++i)
190 streams_[i]->seekg(positions_[i].front());
191 MakeHeap();
192 }
193
Find(const string & key)194 bool Find(const string &key) {
195 if (error_) return false;
196 for (size_t i = 0; i < streams_.size(); ++i)
197 LowerBound(i, key);
198 MakeHeap();
199 return keys_[current_] == key;
200 }
201
Done()202 bool Done() const { return error_ || heap_.empty(); }
203
Next()204 void Next() {
205 if (error_) return;
206 if (streams_[current_]->tellg() <= positions_[current_].back()) {
207 ReadType(*(streams_[current_]), &(keys_[current_]));
208 if (!*streams_[current_]) {
209 FSTERROR() << "STTableReader: error reading file: "
210 << sources_[current_];
211 error_ = true;
212 return;
213 }
214 push_heap(heap_.begin(), heap_.end(), *compare_);
215 } else {
216 heap_.pop_back();
217 }
218 if (!heap_.empty())
219 PopHeap();
220 }
221
GetKey()222 const string &GetKey() const {
223 return keys_[current_];
224 }
225
GetEntry()226 const EntryType &GetEntry() const {
227 return *entry_;
228 }
229
Error()230 bool Error() const { return error_; }
231
232 private:
233 // Comparison functor used to compare stream IDs in the heap
234 struct Compare {
CompareCompare235 Compare(const vector<string> *keys) : keys_(keys) {}
236
operatorCompare237 bool operator()(size_t i, size_t j) const {
238 return (*keys_)[i] > (*keys_)[j];
239 };
240
241 private:
242 const vector<string> *keys_;
243 };
244
245 // Position the stream with ID 'id' at the position corresponding
246 // to the lower bound for key 'find_key'
LowerBound(size_t id,const string & find_key)247 void LowerBound(size_t id, const string &find_key) {
248 ifstream *strm = streams_[id];
249 const vector<int64> &positions = positions_[id];
250 size_t low = 0, high = positions.size() - 1;
251
252 while (low < high) {
253 size_t mid = (low + high)/2;
254 strm->seekg(positions[mid]);
255 string key;
256 ReadType(*strm, &key);
257 if (key > find_key) {
258 high = mid;
259 } else if (key < find_key) {
260 low = mid + 1;
261 } else {
262 for (size_t i = mid; i > low; --i) {
263 strm->seekg(positions[i - 1]);
264 ReadType(*strm, &key);
265 if (key != find_key) {
266 strm->seekg(positions[i]);
267 return;
268 }
269 }
270 strm->seekg(positions[low]);
271 return;
272 }
273 }
274 strm->seekg(positions[low]);
275 }
276
277 // Add all streams to the heap
MakeHeap()278 void MakeHeap() {
279 heap_.clear();
280 for (size_t i = 0; i < streams_.size(); ++i) {
281 ReadType(*streams_[i], &(keys_[i]));
282 if (!*streams_[i]) {
283 FSTERROR() << "STTableReader: error reading file: " << sources_[i];
284 error_ = true;
285 return;
286 }
287 heap_.push_back(i);
288 }
289 make_heap(heap_.begin(), heap_.end(), *compare_);
290 PopHeap();
291 }
292
293 // Position the stream with the lowest key at the top
294 // of the heap, set 'current_' to the ID of that stream
295 // and read the current entry from that stream
PopHeap()296 void PopHeap() {
297 pop_heap(heap_.begin(), heap_.end(), *compare_);
298 current_ = heap_.back();
299 if (entry_)
300 delete entry_;
301 entry_ = entry_reader_(*streams_[current_]);
302 if (!entry_)
303 error_ = true;
304 if (!*streams_[current_]) {
305 FSTERROR() << "STTableReader: error reading entry for key: "
306 << keys_[current_] << ", file: " << sources_[current_];
307 error_ = true;
308 }
309 }
310
311
312 EntryReader entry_reader_; // Read functor for 'EntryType'
313 vector<ifstream*> streams_; // Input streams
314 vector<string> sources_; // and corresponding file names
315 vector<vector<int64> > positions_; // Index of positions for each stream
316 vector<string> keys_; // Lowest unread key for each stream
317 vector<int64> heap_; // Heap containing ID of streams with unread keys
318 int64 current_; // Id of current stream to be read
319 Compare *compare_; // Functor comparing stream IDs for the heap
320 mutable EntryType *entry_; // Pointer to the currently read entry
321 bool error_;
322
323 DISALLOW_COPY_AND_ASSIGN(STTableReader);
324 };
325
326
327 // String-to-type table header reading function template on the entry header
328 // type 'H' having a member function:
329 // Read(istream &strm, const string &filename);
330 // Checks that 'filename' is an STTable and call the H::Read() on the last
331 // entry in the STTable.
332 template <class H>
ReadSTTableHeader(const string & filename,H * header)333 bool ReadSTTableHeader(const string &filename, H *header) {
334 ifstream strm(filename.c_str(), ifstream::in | ifstream::binary);
335 int32 magic_number = 0, file_version = 0;
336 ReadType(strm, &magic_number);
337 ReadType(strm, &file_version);
338 if (magic_number != kSTTableMagicNumber) {
339 LOG(ERROR) << "ReadSTTableHeader: wrong file type: " << filename;
340 return false;
341 }
342 if (file_version != kSTTableFileVersion) {
343 LOG(ERROR) << "ReadSTTableHeader: wrong file version: " << filename;
344 return false;
345 }
346 int64 i = -1;
347 strm.seekg(-static_cast<int>(sizeof(int64)), ios_base::end);
348 ReadType(strm, &i); // Read number of entries
349 if (!strm) {
350 LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename;
351 return false;
352 }
353 if (i == 0) return true; // No entry header to read
354 strm.seekg(-2 * static_cast<int>(sizeof(int64)), ios_base::end);
355 ReadType(strm, &i); // Read position for last entry in file
356 strm.seekg(i);
357 string key;
358 ReadType(strm, &key);
359 header->Read(strm, filename + ":" + key);
360 if (!strm) {
361 LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename;
362 return false;
363 }
364 return true;
365 }
366
367 bool IsSTTable(const string &filename);
368
369 } // namespace fst
370
371 #endif // FST_EXTENSIONS_FAR_STTABLE_H_
372