1#!/usr/bin/env python
2"""Lightweight record format.
3
4This format implements log file format from leveldb:
5http://leveldb.googlecode.com/svn/trunk/doc/log_format.txt
6
7The main advantages of this format are
81. to detect corruption. Every record has a crc32c checksum.
92. to quickly skip corrupted record to the next valid record.
10
11Full specification of format follows in case leveldb decides to change it.
12
13
14The log file contents are a sequence of 32KB blocks.  The only
15exception is that the tail of the file may contain a partial block.
16
17Each block consists of a sequence of records:
18   block := record* trailer?
19   record :=
20      checksum: uint32  // masked crc32c of type and data[]
21      length: uint16
22      type: uint8       // One of FULL, FIRST, MIDDLE, LAST
23      data: uint8[length]
24
25A record never starts within the last six bytes of a block (since it
26won't fit).  Any leftover bytes here form the trailer, which must
27consist entirely of zero bytes and must be skipped by readers.
28
29Aside: if exactly seven bytes are left in the current block, and a new
30non-zero length record is added, the writer must emit a FIRST record
31(which contains zero bytes of user data) to fill up the trailing seven
32bytes of the block and then emit all of the user data in subsequent
33blocks.
34
35More types may be added in the future.  Some Readers may skip record
36types they do not understand, others may report that some data was
37skipped.
38
39FULL == 1
40FIRST == 2
41MIDDLE == 3
42LAST == 4
43
44The FULL record contains the contents of an entire user record.
45
46FIRST, MIDDLE, LAST are types used for user records that have been
47split into multiple fragments (typically because of block boundaries).
48FIRST is the type of the first fragment of a user record, LAST is the
49type of the last fragment of a user record, and MID is the type of all
50interior fragments of a user record.
51
52Example: consider a sequence of user records:
53   A: length 1000
54   B: length 97270
55   C: length 8000
56A will be stored as a FULL record in the first block.
57
58B will be split into three fragments: first fragment occupies the rest
59of the first block, second fragment occupies the entirety of the
60second block, and the third fragment occupies a prefix of the third
61block.  This will leave six bytes free in the third block, which will
62be left empty as the trailer.
63
64C will be stored as a FULL record in the fourth block.
65
66"""
67
68__all__ = ['RecordsWriter',
69           'RecordsReader']
70
71import logging
72import struct
73
74# Note: this will be scrubbed to google.appengine.api.files import crc32c
75# when mapreduce is pushed to runtime.
76from mapreduce.third_party import crc32c
77from mapreduce import errors
78
79
80# pylint: disable=g-bad-name
81
82# Size of a block.
83_BLOCK_SIZE = 32 * 1024
84
85# Header format.
86_HEADER_FORMAT = '<IHB'
87
88# Header length in bytes.
89_HEADER_LENGTH = struct.calcsize(_HEADER_FORMAT)
90
91# Not a record but padding bytes.
92_RECORD_TYPE_NONE = 0
93
94# Full record.
95_RECORD_TYPE_FULL = 1
96
97# First data chunk record.
98_RECORD_TYPE_FIRST = 2
99
100# Middle data chunk record.
101_RECORD_TYPE_MIDDLE = 3
102
103# Last data chunk record.
104_RECORD_TYPE_LAST = 4
105
106
107# CRC Mask. Comes from http://leveldb.googlecode.com/svn/trunk/util/crc32c.h
108_CRC_MASK_DELTA = 0xa282ead8
109
110
111def _mask_crc(crc):
112  """Mask crc.
113
114  Args:
115    crc: integer crc.
116  Returns:
117    masked integer crc.
118  """
119  return (((crc >> 15) | (crc << 17)) + _CRC_MASK_DELTA) & 0xFFFFFFFFL
120
121
122def _unmask_crc(masked_crc):
123  """Unmask crc.
124
125  Args:
126    masked_crc: masked integer crc.
127  Retruns:
128    orignal crc.
129  """
130  rot = (masked_crc - _CRC_MASK_DELTA) & 0xFFFFFFFFL
131  return ((rot >> 17) | (rot << 15)) & 0xFFFFFFFFL
132
133
134class RecordsWriter(object):
135  """A writer for records format."""
136
137  def __init__(self, writer):
138    """Constructor.
139
140    Args:
141      writer: a writer conforming to Python io.RawIOBase interface that
142        implements 'write'.
143    """
144    self.__writer = writer
145    self.__position = 0
146
147  def __write_record(self, record_type, data):
148    """Write single physical record."""
149    length = len(data)
150
151    crc = crc32c.crc_update(crc32c.CRC_INIT, [record_type])
152    crc = crc32c.crc_update(crc, data)
153    crc = crc32c.crc_finalize(crc)
154
155    self.__writer.write(
156        struct.pack(_HEADER_FORMAT, _mask_crc(crc), length, record_type))
157    self.__writer.write(data)
158    self.__position += _HEADER_LENGTH + length
159
160  def write(self, data):
161    """Write single record.
162
163    Args:
164      data: record data to write as string, byte array or byte sequence.
165    """
166    block_remaining = _BLOCK_SIZE - self.__position % _BLOCK_SIZE
167
168    if block_remaining < _HEADER_LENGTH:
169      # Header won't fit into remainder
170      self.__writer.write('\x00' * block_remaining)
171      self.__position += block_remaining
172      block_remaining = _BLOCK_SIZE
173
174    if block_remaining < len(data) + _HEADER_LENGTH:
175      first_chunk = data[:block_remaining - _HEADER_LENGTH]
176      self.__write_record(_RECORD_TYPE_FIRST, first_chunk)
177      data = data[len(first_chunk):]
178
179      while True:
180        block_remaining = _BLOCK_SIZE - self.__position % _BLOCK_SIZE
181        if block_remaining >= len(data) + _HEADER_LENGTH:
182          self.__write_record(_RECORD_TYPE_LAST, data)
183          break
184        else:
185          chunk = data[:block_remaining - _HEADER_LENGTH]
186          self.__write_record(_RECORD_TYPE_MIDDLE, chunk)
187          data = data[len(chunk):]
188    else:
189      self.__write_record(_RECORD_TYPE_FULL, data)
190
191  def __enter__(self):
192    return self
193
194  def __exit__(self, atype, value, traceback):
195    self.close()
196
197  def close(self):
198    pass
199
200  def _pad_block(self):
201    """Pad block with 0.
202
203    Pad current block with 0. Reader will simply treat these as corrupted
204    record and skip the block.
205
206    This method is idempotent.
207    """
208    pad_length = _BLOCK_SIZE - self.__position % _BLOCK_SIZE
209    if pad_length and pad_length != _BLOCK_SIZE:
210      self.__writer.write('\x00' * pad_length)
211      self.__position += pad_length
212
213
214class RecordsReader(object):
215  """A reader for records format."""
216
217  def __init__(self, reader):
218    """Init.
219
220    Args:
221      reader: a reader conforming to Python io.RawIOBase interface that
222        implements 'read', 'seek', and 'tell'.
223    """
224    self.__reader = reader
225
226  def __try_read_record(self):
227    """Try reading a record.
228
229    Returns:
230      (data, record_type) tuple.
231    Raises:
232      EOFError: when end of file was reached.
233      InvalidRecordError: when valid record could not be read.
234    """
235    block_remaining = _BLOCK_SIZE - self.__reader.tell() % _BLOCK_SIZE
236    if block_remaining < _HEADER_LENGTH:
237      return ('', _RECORD_TYPE_NONE)
238
239    header = self.__reader.read(_HEADER_LENGTH)
240    if len(header) != _HEADER_LENGTH:
241      raise EOFError('Read %s bytes instead of %s' %
242                     (len(header), _HEADER_LENGTH))
243
244    (masked_crc, length, record_type) = struct.unpack(_HEADER_FORMAT, header)
245    crc = _unmask_crc(masked_crc)
246
247    if length + _HEADER_LENGTH > block_remaining:
248      # A record can't be bigger than one block.
249      raise errors.InvalidRecordError('Length is too big')
250
251    data = self.__reader.read(length)
252    if len(data) != length:
253      raise EOFError('Not enough data read. Expected: %s but got %s' %
254                     (length, len(data)))
255
256    if record_type == _RECORD_TYPE_NONE:
257      return ('', record_type)
258
259    actual_crc = crc32c.crc_update(crc32c.CRC_INIT, [record_type])
260    actual_crc = crc32c.crc_update(actual_crc, data)
261    actual_crc = crc32c.crc_finalize(actual_crc)
262
263    if actual_crc != crc:
264      raise errors.InvalidRecordError('Data crc does not match')
265    return (data, record_type)
266
267  def __sync(self):
268    """Skip reader to the block boundary."""
269    pad_length = _BLOCK_SIZE - self.__reader.tell() % _BLOCK_SIZE
270    if pad_length and pad_length != _BLOCK_SIZE:
271      data = self.__reader.read(pad_length)
272      if len(data) != pad_length:
273        raise EOFError('Read %d bytes instead of %d' %
274                       (len(data), pad_length))
275
276  def read(self):
277    """Reads record from current position in reader.
278
279    Returns:
280      original bytes stored in a single record.
281    """
282    data = None
283    while True:
284      last_offset = self.tell()
285      try:
286        (chunk, record_type) = self.__try_read_record()
287        if record_type == _RECORD_TYPE_NONE:
288          self.__sync()
289        elif record_type == _RECORD_TYPE_FULL:
290          if data is not None:
291            logging.warning(
292                "Ordering corruption: Got FULL record while already "
293                "in a chunk at offset %d", last_offset)
294          return chunk
295        elif record_type == _RECORD_TYPE_FIRST:
296          if data is not None:
297            logging.warning(
298                "Ordering corruption: Got FIRST record while already "
299                "in a chunk at offset %d", last_offset)
300          data = chunk
301        elif record_type == _RECORD_TYPE_MIDDLE:
302          if data is None:
303            logging.warning(
304                "Ordering corruption: Got MIDDLE record before FIRST "
305                "record at offset %d", last_offset)
306          else:
307            data += chunk
308        elif record_type == _RECORD_TYPE_LAST:
309          if data is None:
310            logging.warning(
311                "Ordering corruption: Got LAST record but no chunk is in "
312                "progress at offset %d", last_offset)
313          else:
314            result = data + chunk
315            data = None
316            return result
317        else:
318          raise errors.InvalidRecordError(
319              "Unsupported record type: %s" % record_type)
320
321      except errors.InvalidRecordError, e:
322        logging.warning("Invalid record encountered at %s (%s). Syncing to "
323                        "the next block", last_offset, e)
324        data = None
325        self.__sync()
326
327  def __iter__(self):
328    try:
329      while True:
330        yield self.read()
331    except EOFError:
332      pass
333
334  def tell(self):
335    """Return file's current position."""
336    return self.__reader.tell()
337
338  def seek(self, *args, **kwargs):
339    """Set the file's current position.
340
341    Arguments are passed directly to the underlying reader.
342    """
343    return self.__reader.seek(*args, **kwargs)
344