1#!/usr/bin/env python 2"""Lightweight record format. 3 4This format implements log file format from leveldb: 5http://leveldb.googlecode.com/svn/trunk/doc/log_format.txt 6 7The main advantages of this format are 81. to detect corruption. Every record has a crc32c checksum. 92. to quickly skip corrupted record to the next valid record. 10 11Full specification of format follows in case leveldb decides to change it. 12 13 14The log file contents are a sequence of 32KB blocks. The only 15exception is that the tail of the file may contain a partial block. 16 17Each block consists of a sequence of records: 18 block := record* trailer? 19 record := 20 checksum: uint32 // masked crc32c of type and data[] 21 length: uint16 22 type: uint8 // One of FULL, FIRST, MIDDLE, LAST 23 data: uint8[length] 24 25A record never starts within the last six bytes of a block (since it 26won't fit). Any leftover bytes here form the trailer, which must 27consist entirely of zero bytes and must be skipped by readers. 28 29Aside: if exactly seven bytes are left in the current block, and a new 30non-zero length record is added, the writer must emit a FIRST record 31(which contains zero bytes of user data) to fill up the trailing seven 32bytes of the block and then emit all of the user data in subsequent 33blocks. 34 35More types may be added in the future. Some Readers may skip record 36types they do not understand, others may report that some data was 37skipped. 38 39FULL == 1 40FIRST == 2 41MIDDLE == 3 42LAST == 4 43 44The FULL record contains the contents of an entire user record. 45 46FIRST, MIDDLE, LAST are types used for user records that have been 47split into multiple fragments (typically because of block boundaries). 48FIRST is the type of the first fragment of a user record, LAST is the 49type of the last fragment of a user record, and MID is the type of all 50interior fragments of a user record. 51 52Example: consider a sequence of user records: 53 A: length 1000 54 B: length 97270 55 C: length 8000 56A will be stored as a FULL record in the first block. 57 58B will be split into three fragments: first fragment occupies the rest 59of the first block, second fragment occupies the entirety of the 60second block, and the third fragment occupies a prefix of the third 61block. This will leave six bytes free in the third block, which will 62be left empty as the trailer. 63 64C will be stored as a FULL record in the fourth block. 65 66""" 67 68__all__ = ['RecordsWriter', 69 'RecordsReader'] 70 71import logging 72import struct 73 74# Note: this will be scrubbed to google.appengine.api.files import crc32c 75# when mapreduce is pushed to runtime. 76from mapreduce.third_party import crc32c 77from mapreduce import errors 78 79 80# pylint: disable=g-bad-name 81 82# Size of a block. 83_BLOCK_SIZE = 32 * 1024 84 85# Header format. 86_HEADER_FORMAT = '<IHB' 87 88# Header length in bytes. 89_HEADER_LENGTH = struct.calcsize(_HEADER_FORMAT) 90 91# Not a record but padding bytes. 92_RECORD_TYPE_NONE = 0 93 94# Full record. 95_RECORD_TYPE_FULL = 1 96 97# First data chunk record. 98_RECORD_TYPE_FIRST = 2 99 100# Middle data chunk record. 101_RECORD_TYPE_MIDDLE = 3 102 103# Last data chunk record. 104_RECORD_TYPE_LAST = 4 105 106 107# CRC Mask. Comes from http://leveldb.googlecode.com/svn/trunk/util/crc32c.h 108_CRC_MASK_DELTA = 0xa282ead8 109 110 111def _mask_crc(crc): 112 """Mask crc. 113 114 Args: 115 crc: integer crc. 116 Returns: 117 masked integer crc. 118 """ 119 return (((crc >> 15) | (crc << 17)) + _CRC_MASK_DELTA) & 0xFFFFFFFFL 120 121 122def _unmask_crc(masked_crc): 123 """Unmask crc. 124 125 Args: 126 masked_crc: masked integer crc. 127 Retruns: 128 orignal crc. 129 """ 130 rot = (masked_crc - _CRC_MASK_DELTA) & 0xFFFFFFFFL 131 return ((rot >> 17) | (rot << 15)) & 0xFFFFFFFFL 132 133 134class RecordsWriter(object): 135 """A writer for records format.""" 136 137 def __init__(self, writer): 138 """Constructor. 139 140 Args: 141 writer: a writer conforming to Python io.RawIOBase interface that 142 implements 'write'. 143 """ 144 self.__writer = writer 145 self.__position = 0 146 147 def __write_record(self, record_type, data): 148 """Write single physical record.""" 149 length = len(data) 150 151 crc = crc32c.crc_update(crc32c.CRC_INIT, [record_type]) 152 crc = crc32c.crc_update(crc, data) 153 crc = crc32c.crc_finalize(crc) 154 155 self.__writer.write( 156 struct.pack(_HEADER_FORMAT, _mask_crc(crc), length, record_type)) 157 self.__writer.write(data) 158 self.__position += _HEADER_LENGTH + length 159 160 def write(self, data): 161 """Write single record. 162 163 Args: 164 data: record data to write as string, byte array or byte sequence. 165 """ 166 block_remaining = _BLOCK_SIZE - self.__position % _BLOCK_SIZE 167 168 if block_remaining < _HEADER_LENGTH: 169 # Header won't fit into remainder 170 self.__writer.write('\x00' * block_remaining) 171 self.__position += block_remaining 172 block_remaining = _BLOCK_SIZE 173 174 if block_remaining < len(data) + _HEADER_LENGTH: 175 first_chunk = data[:block_remaining - _HEADER_LENGTH] 176 self.__write_record(_RECORD_TYPE_FIRST, first_chunk) 177 data = data[len(first_chunk):] 178 179 while True: 180 block_remaining = _BLOCK_SIZE - self.__position % _BLOCK_SIZE 181 if block_remaining >= len(data) + _HEADER_LENGTH: 182 self.__write_record(_RECORD_TYPE_LAST, data) 183 break 184 else: 185 chunk = data[:block_remaining - _HEADER_LENGTH] 186 self.__write_record(_RECORD_TYPE_MIDDLE, chunk) 187 data = data[len(chunk):] 188 else: 189 self.__write_record(_RECORD_TYPE_FULL, data) 190 191 def __enter__(self): 192 return self 193 194 def __exit__(self, atype, value, traceback): 195 self.close() 196 197 def close(self): 198 pass 199 200 def _pad_block(self): 201 """Pad block with 0. 202 203 Pad current block with 0. Reader will simply treat these as corrupted 204 record and skip the block. 205 206 This method is idempotent. 207 """ 208 pad_length = _BLOCK_SIZE - self.__position % _BLOCK_SIZE 209 if pad_length and pad_length != _BLOCK_SIZE: 210 self.__writer.write('\x00' * pad_length) 211 self.__position += pad_length 212 213 214class RecordsReader(object): 215 """A reader for records format.""" 216 217 def __init__(self, reader): 218 """Init. 219 220 Args: 221 reader: a reader conforming to Python io.RawIOBase interface that 222 implements 'read', 'seek', and 'tell'. 223 """ 224 self.__reader = reader 225 226 def __try_read_record(self): 227 """Try reading a record. 228 229 Returns: 230 (data, record_type) tuple. 231 Raises: 232 EOFError: when end of file was reached. 233 InvalidRecordError: when valid record could not be read. 234 """ 235 block_remaining = _BLOCK_SIZE - self.__reader.tell() % _BLOCK_SIZE 236 if block_remaining < _HEADER_LENGTH: 237 return ('', _RECORD_TYPE_NONE) 238 239 header = self.__reader.read(_HEADER_LENGTH) 240 if len(header) != _HEADER_LENGTH: 241 raise EOFError('Read %s bytes instead of %s' % 242 (len(header), _HEADER_LENGTH)) 243 244 (masked_crc, length, record_type) = struct.unpack(_HEADER_FORMAT, header) 245 crc = _unmask_crc(masked_crc) 246 247 if length + _HEADER_LENGTH > block_remaining: 248 # A record can't be bigger than one block. 249 raise errors.InvalidRecordError('Length is too big') 250 251 data = self.__reader.read(length) 252 if len(data) != length: 253 raise EOFError('Not enough data read. Expected: %s but got %s' % 254 (length, len(data))) 255 256 if record_type == _RECORD_TYPE_NONE: 257 return ('', record_type) 258 259 actual_crc = crc32c.crc_update(crc32c.CRC_INIT, [record_type]) 260 actual_crc = crc32c.crc_update(actual_crc, data) 261 actual_crc = crc32c.crc_finalize(actual_crc) 262 263 if actual_crc != crc: 264 raise errors.InvalidRecordError('Data crc does not match') 265 return (data, record_type) 266 267 def __sync(self): 268 """Skip reader to the block boundary.""" 269 pad_length = _BLOCK_SIZE - self.__reader.tell() % _BLOCK_SIZE 270 if pad_length and pad_length != _BLOCK_SIZE: 271 data = self.__reader.read(pad_length) 272 if len(data) != pad_length: 273 raise EOFError('Read %d bytes instead of %d' % 274 (len(data), pad_length)) 275 276 def read(self): 277 """Reads record from current position in reader. 278 279 Returns: 280 original bytes stored in a single record. 281 """ 282 data = None 283 while True: 284 last_offset = self.tell() 285 try: 286 (chunk, record_type) = self.__try_read_record() 287 if record_type == _RECORD_TYPE_NONE: 288 self.__sync() 289 elif record_type == _RECORD_TYPE_FULL: 290 if data is not None: 291 logging.warning( 292 "Ordering corruption: Got FULL record while already " 293 "in a chunk at offset %d", last_offset) 294 return chunk 295 elif record_type == _RECORD_TYPE_FIRST: 296 if data is not None: 297 logging.warning( 298 "Ordering corruption: Got FIRST record while already " 299 "in a chunk at offset %d", last_offset) 300 data = chunk 301 elif record_type == _RECORD_TYPE_MIDDLE: 302 if data is None: 303 logging.warning( 304 "Ordering corruption: Got MIDDLE record before FIRST " 305 "record at offset %d", last_offset) 306 else: 307 data += chunk 308 elif record_type == _RECORD_TYPE_LAST: 309 if data is None: 310 logging.warning( 311 "Ordering corruption: Got LAST record but no chunk is in " 312 "progress at offset %d", last_offset) 313 else: 314 result = data + chunk 315 data = None 316 return result 317 else: 318 raise errors.InvalidRecordError( 319 "Unsupported record type: %s" % record_type) 320 321 except errors.InvalidRecordError, e: 322 logging.warning("Invalid record encountered at %s (%s). Syncing to " 323 "the next block", last_offset, e) 324 data = None 325 self.__sync() 326 327 def __iter__(self): 328 try: 329 while True: 330 yield self.read() 331 except EOFError: 332 pass 333 334 def tell(self): 335 """Return file's current position.""" 336 return self.__reader.tell() 337 338 def seek(self, *args, **kwargs): 339 """Set the file's current position. 340 341 Arguments are passed directly to the underlying reader. 342 """ 343 return self.__reader.seek(*args, **kwargs) 344