1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Defines ways of splicing and re-arranging time series.
16
17This file provides methods for reading, parsing, and re-arranging a time
18series. The main departure from standard TensorFlow input pipelines is a focus
19on "chunking" a time series, i.e. slicing it into small contiguous windows which
20are then batched together for training, a form of truncated
21backpropagation. This typically provides a significant speedup compared to
22looping over the whole series sequentially, by exploiting data parallelism and
23by reducing redundant contributions to gradients (due to redundant information
24in the series itself).
25
26A series, consisting of times (an increasing vector of integers) and values (one
27or more floating point values for each time) along with any exogenous features,
28is stored either in memory or on disk in various formats (e.g. "one record per
29timestep" on disk, or as a dictionary of Numpy arrays in memory). The location
30and format is specified by configuring a `TimeSeriesReader` object
31(e.g. `NumpyReader`, `CSVReader`), which reads the data into the TensorFlow
32graph. A `TimeSeriesInputFn` object (typically `RandomWindowInputFn`) then
33performs windowing and batching.
34
35Time series are passed through this pipeline as dictionaries mapping feature
36names to their values. For training and evaluation, these require at minimum
37`TrainEvalFeatures.TIMES` (scalar integers, one per timestep) and
38`TrainEvalFeatures.VALUES` (may be either univariate or multivariate). Exogenous
39features may have any shape, but are likewise associated with a timestep. Times
40themselves need not be contiguous or regular (although smaller/fewer gaps are
41generally better), but each timestep must have all `VALUES` and any exogenous
42features (i.e. times may be missing, but given that a time is specified, every
43other feature must also be specified for that step; some models may support
44making exogenous updates conditional).
45
46The expected use case of a `TimeSeriesInputFn` is that it is first configured
47(for example setting a batch or window size) and passed a reader (a
48`TimeSeriesReader` object). The `TimeSeriesInputFn` can then be passed as the
49input_fn of an Estimator.
50
51For example, `RandomWindowInputFn` is useful for creating batches of random
52chunks of a series for training:
53
54```
55  # Read data in the default "time,value" CSV format with no header
56  reader = input_pipeline.CSVReader(csv_file_name)
57  # Set up windowing and batching for training
58  train_input_fn = input_pipeline.RandomWindowInputFn(
59      reader, batch_size=16, window_size=16)
60  # Fit model parameters to data
61  estimator.train(input_fn=train_input_fn, steps=150)
62```
63
64`RandomWindowInputFn` is the primary tool for training and quantitative
65evaluation of time series. `WholeDatasetInputFn`, which reads a whole series
66into memory, is useful for qualitative evaluation and preparing to make
67predictions with `predict_continuation_input_fn`.
68"""
69
70from __future__ import absolute_import
71from __future__ import division
72from __future__ import print_function
73
74import abc
75
76import numpy
77
78from tensorflow.contrib.timeseries.python.timeseries import feature_keys
79from tensorflow.contrib.timeseries.python.timeseries import model_utils
80
81from tensorflow.python.estimator import estimator_lib
82from tensorflow.python.framework import constant_op
83from tensorflow.python.framework import dtypes
84from tensorflow.python.framework import ops
85from tensorflow.python.framework import tensor_shape
86from tensorflow.python.ops import array_ops
87from tensorflow.python.ops import control_flow_ops
88from tensorflow.python.ops import io_ops
89from tensorflow.python.ops import math_ops
90from tensorflow.python.ops import nn
91from tensorflow.python.ops import parsing_ops
92from tensorflow.python.ops import random_ops
93from tensorflow.python.ops import state_ops
94from tensorflow.python.ops import tensor_array_ops
95from tensorflow.python.ops import variable_scope
96from tensorflow.python.training import input as input_lib
97from tensorflow.python.training import training
98from tensorflow.python.util import nest
99
100
101def predict_continuation_input_fn(
102    evaluation, steps=None, times=None, exogenous_features=None):
103  """An Estimator input_fn for running predict() after evaluate().
104
105  If the call to evaluate() we are making predictions based on had a batch_size
106  greater than one, predictions will start after each of these windows
107  (i.e. will have the same batch dimension).
108
109  Args:
110    evaluation: The dictionary returned by `Estimator.evaluate`, with keys
111      FilteringResults.STATE_TUPLE and FilteringResults.TIMES.
112    steps: The number of steps to predict (scalar), starting after the
113      evaluation. If `times` is specified, `steps` must not be; one is required.
114    times: A [batch_size x window_size] array of integers (not a Tensor)
115      indicating times to make predictions for. These times must be after the
116      corresponding evaluation. If `steps` is specified, `times` must not be;
117      one is required. If the batch dimension is omitted, it is assumed to be 1.
118    exogenous_features: Optional dictionary. If specified, indicates exogenous
119      features for the model to use while making the predictions. Values must
120      have shape [batch_size x window_size x ...], where `batch_size` matches
121      the batch dimension used when creating `evaluation`, and `window_size` is
122      either the `steps` argument or the `window_size` of the `times` argument
123      (depending on which was specified).
124  Returns:
125    An `input_fn` suitable for passing to the `predict` function of a time
126    series `Estimator`.
127  Raises:
128    ValueError: If `times` or `steps` are misspecified.
129  """
130  if exogenous_features is None:
131    exogenous_features = {}
132  predict_times = model_utils.canonicalize_times_or_steps_from_output(
133      times=times, steps=steps, previous_model_output=evaluation)
134  features = {
135      feature_keys.PredictionFeatures.STATE_TUPLE:
136          evaluation[feature_keys.FilteringResults.STATE_TUPLE],
137      feature_keys.PredictionFeatures.TIMES:
138          predict_times
139  }
140  features.update(exogenous_features)
141  def _predict_input_fn():
142    """An input_fn for predict()."""
143    # Prevents infinite iteration with a constant output in an Estimator's
144    # predict().
145    limited_features = {}
146    for key, values in features.items():
147      limited_values = nest.map_structure(
148          lambda value: training.limit_epochs(value, num_epochs=1), values)
149      limited_features[key] = limited_values
150    return (limited_features, None)
151  return _predict_input_fn
152
153
154class TimeSeriesReader(object):
155  """Reads from and parses a data source for a `TimeSeriesInputFn`.
156
157  This class provides methods that read a few records (`read`) or the full data
158  set at once (`read_full`), and returns them as dictionaries mapping feature
159  names to feature Tensors. Please see note at the top of the file for the
160  structure of these dictionaries. The output is generally chunked by a
161  `TimeSeriesInputFn` before being passed to the model.
162  """
163
164  def check_dataset_size(self, minimum_dataset_size):
165    """When possible, raises an error if the dataset is too small.
166
167    This method allows TimeSeriesReaders to raise informative error messages if
168    the user has selected a window size in their TimeSeriesInputFn which is
169    larger than the dataset size. However, many TimeSeriesReaders will not have
170    access to a dataset size, in which case they do not need to override this
171    method.
172
173    Args:
174      minimum_dataset_size: The minimum number of records which should be
175        contained in the dataset. Readers should attempt to raise an error when
176        possible if an epoch of data contains fewer records.
177    """
178    pass
179
180  @abc.abstractmethod
181  def read(self):
182    """Parses one or more records into a feature dictionary.
183
184    This method is expected to be called by a `TimeSeriesInputFn` object, and is
185    not for use with models directly.
186
187    A `TimeSeriesReader` object reads multiple records at a single time for
188    efficiency; the size of these batches is an implementation detail internal
189    to the input pipeline. These records should generally be sequential,
190    although some out-of-order records due to file wraparounds are expected and
191    must be handled by callers.
192
193    Returns:
194      A dictionary mapping feature names to `Tensor` values, each with an
195      arbitrary batch dimension (for efficiency) as their first dimension.
196    """
197    pass
198
199  @abc.abstractmethod
200  def read_full(self):
201    """Return the full dataset.
202
203    Largely for interactive use/plotting (or evaluation on small
204    datasets). Generally not very efficient. Not recommended for training.
205
206    Returns:
207      Same return type as `read`, but with the full dataset rather than an
208      arbitrary chunk of it. A dictionary mapping feature names to `Tensor`
209      values, where the size of the first dimension of each `Tensor` is the
210      number of samples in the entire dataset. These `Tensor`s should be
211      constant across graph invocations, assuming that the underlying data
212      remains constant. Current implementations re-read data on each graph
213      invocation, although this may change in the future.
214    """
215    pass
216
217
218class NumpyReader(TimeSeriesReader):
219  """A time series parser for feeding Numpy arrays to a `TimeSeriesInputFn`.
220
221  Avoids embedding data in the graph as constants.
222  """
223
224  def __init__(self, data, read_num_records_hint=4096):
225    """Numpy array input for a `TimeSeriesInputFn`.
226
227    Args:
228      data: A dictionary mapping feature names to Numpy arrays, with two
229        possible shapes (requires keys `TrainEvalFeatures.TIMES` and
230        `TrainEvalFeatures.VALUES`):
231          Univariate; `TIMES` and `VALUES` are both vectors of shape [series
232            length]
233          Multivariate; `TIMES` is a vector of shape [series length], `VALUES`
234            has shape [series length x number of features].
235        In any case, `VALUES` and any exogenous features must have their shapes
236        prefixed by the shape of the value corresponding to the `TIMES` key.
237      read_num_records_hint: The maximum number of samples to read at one time,
238        for efficiency.
239    """
240    self._features = _canonicalize_numpy_data(
241        data, require_single_batch=True)
242    self._read_num_records_hint = read_num_records_hint
243
244  def check_dataset_size(self, minimum_dataset_size):
245    """Raise an error if the dataset is too small."""
246    dataset_size = self._features[feature_keys.TrainEvalFeatures.TIMES].shape[1]
247    if dataset_size < minimum_dataset_size:
248      raise ValueError(
249          ("A TimeSeriesInputFn is configured to create windows of size {}, "
250           "but only {} records were available in the dataset. Either decrease "
251           "the window size or provide more records.").format(
252               minimum_dataset_size, dataset_size))
253
254  def read(self):
255    """Returns a large chunk of the Numpy arrays for later re-chunking."""
256    # Remove the batch dimension from all features
257    features = {key: numpy.squeeze(value, axis=0)
258                for key, value in self._features.items()}
259    return estimator_lib.inputs.numpy_input_fn(
260        x=features,
261        # The first dimensions of features are the series length, since we have
262        # removed the batch dimension above. We now pull out
263        # self._read_num_records_hint steps of this single time series to pass
264        # to the TimeSeriesInputFn.
265        batch_size=self._read_num_records_hint,
266        num_epochs=None,
267        shuffle=False)()
268
269  def read_full(self):
270    """Returns `Tensor` versions of the full Numpy arrays."""
271    features = estimator_lib.inputs.numpy_input_fn(
272        x=self._features,
273        batch_size=1,
274        num_epochs=None,
275        queue_capacity=2,  # Each queue element is a full copy of the dataset
276        shuffle=False)()
277    # TimeSeriesInputFn expect just a batch dimension
278    return {feature_name: array_ops.squeeze(feature_value, axis=0)
279            for feature_name, feature_value in features.items()}
280
281
282class ReaderBaseTimeSeriesParser(TimeSeriesReader):
283  """Base for time series readers which wrap a `tf.ReaderBase`."""
284
285  def __init__(self, filenames, read_num_records_hint=4096):
286    """Configure the time series reader.
287
288    Args:
289      filenames: A string or list of strings indicating files to read records
290        from.
291      read_num_records_hint: When not reading a full dataset, indicates the
292        number of records to transfer in a single chunk (for efficiency). The
293        actual number transferred at one time may vary.
294    """
295    self._filenames = filenames
296    self._read_num_records_hint = read_num_records_hint
297
298  @abc.abstractmethod
299  def _get_reader(self):
300    """Get an instance of the tf.ReaderBase associated with this class."""
301    pass
302
303  @abc.abstractmethod
304  def _process_records(self, lines):
305    """Given string items, return a processed dictionary of Tensors.
306
307    Args:
308      lines: A 1-dimensional string Tensor, each representing a record to parse
309        (source dependent, e.g. a line of a file, or a serialized protocol
310        buffer).
311
312    Returns:
313      A dictionary mapping feature names to their values. The batch dimensions
314      should match the length of `lines`.
315    """
316    pass
317
318  def _get_filename_queue(self, epoch_limit):
319    """Constructs a filename queue with an epoch limit.
320
321    `epoch_limit` is intended as an error checking fallback to prevent a reader
322    from infinitely looping in its requests for more work items if none are
323    available in any file. It should be set high enough that it is never reached
324    assuming at least one record exists in some file.
325
326    Args:
327      epoch_limit: The maximum number of times to read through the complete list
328        of files before throwing an OutOfRangeError.
329    Returns:
330      A tuple of (filename_queue, epoch_limiter):
331        filename_queue: A FIFOQueue with filename work items.
332        epoch_limiter: The local variable used for epoch limitation. This should
333          be set to zero before a reader is passed `filename_queue` in order to
334          reset the epoch limiter's state.
335    """
336    epoch_limiter = variable_scope.variable(
337        initial_value=constant_op.constant(0, dtype=dtypes.int64),
338        name="epoch_limiter",
339        trainable=False,
340        collections=[ops.GraphKeys.LOCAL_VARIABLES])
341    filenames_tensor = array_ops.reshape(
342        ops.convert_to_tensor(self._filenames), [-1])
343    # We can't rely on epoch_limiter being initialized, since queue runners are
344    # started before local variables are initialized. Instead, we ignore epoch
345    # limits before variable initialization. This means that prior to variable
346    # initialization, a QueueRunner may cause a reader to enter an un-checked
347    # infinite loop. However, as soon as local variables are initialized, we
348    # will start incrementing and checking epoch_limiter, which will interrupt
349    # any in-progress loops.
350    conditional_count_up_to = control_flow_ops.cond(
351        state_ops.is_variable_initialized(epoch_limiter),
352        lambda: epoch_limiter.count_up_to(epoch_limit),
353        lambda: constant_op.constant(0, dtype=dtypes.int64))
354    with ops.control_dependencies([conditional_count_up_to]):
355      filenames_tensor = array_ops.identity(filenames_tensor)
356    filename_queue = input_lib.string_input_producer(
357        filenames_tensor, shuffle=False, capacity=1)
358    return filename_queue, epoch_limiter
359
360  def read(self):
361    """Reads a chunk of data from the `tf.ReaderBase` for later re-chunking."""
362    # Assuming there is at least one item to be read among all of the files in
363    # self._filenames, we will not need to go through more than
364    # self._read_num_records_hint epochs to get a batch of
365    # self._read_num_records_hint records. Setting this limit and resetting it
366    # before each reader.read_up_to call prevents infinite looping when there
367    # are no records available in any of the files.
368    filename_queue, epoch_limiter = self._get_filename_queue(
369        epoch_limit=self._read_num_records_hint)
370    reader = self._get_reader()
371    epoch_reset_op = state_ops.assign(epoch_limiter, 0)
372    with ops.control_dependencies([epoch_reset_op]):
373      _, records = reader.read_up_to(
374          filename_queue, self._read_num_records_hint)
375    return self._process_records(records)
376
377  def read_full(self):
378    """Reads a full epoch of data into memory."""
379    reader = self._get_reader()
380    # Set a hard limit of 2 epochs through self._filenames. If there are any
381    # records available, we should only end up reading the first record in the
382    # second epoch before exiting the while loop and subsequently resetting the
383    # epoch limit. If there are no records available in any of the files, this
384    # hard limit prevents the reader.read_up_to call from looping infinitely.
385    filename_queue, epoch_limiter = self._get_filename_queue(epoch_limit=2)
386    epoch_reset_op = state_ops.assign(epoch_limiter, 0)
387    with ops.control_dependencies([epoch_reset_op]):
388      first_key, first_value = reader.read_up_to(filename_queue, 1)
389    # Read until we get a duplicate key (one epoch)
390    def _while_condition(
391        current_key, current_value, current_index, collected_records):
392      del current_value, current_index, collected_records  # unused
393      return math_ops.not_equal(array_ops.squeeze(current_key, axis=0),
394                                array_ops.squeeze(first_key, axis=0))
395
396    def _while_body(
397        current_key, current_value, current_index, collected_records):
398      del current_key  # unused
399      new_key, new_value = reader.read_up_to(filename_queue, 1)
400      new_key.set_shape([1])
401      new_value.set_shape([1])
402      return (new_key,
403              new_value,
404              current_index + 1,
405              collected_records.write(current_index, current_value))
406    _, _, _, records_ta = control_flow_ops.while_loop(
407        _while_condition,
408        _while_body,
409        [constant_op.constant([""]), first_value,
410         0,  # current_index starting value
411         tensor_array_ops.TensorArray(  # collected_records
412             dtype=dtypes.string, size=0, dynamic_size=True)])
413    records = records_ta.concat()
414    # Reset the reader when we're done so that subsequent requests for data get
415    # the dataset in the proper order.
416    with ops.control_dependencies([records]):
417      reader_reset_op = reader.reset()
418    with ops.control_dependencies([reader_reset_op]):
419      records = array_ops.identity(records)
420    return self._process_records(records)
421
422
423class CSVReader(ReaderBaseTimeSeriesParser):
424  """Reads from a collection of CSV-formatted files."""
425
426  def __init__(self,
427               filenames,
428               column_names=(feature_keys.TrainEvalFeatures.TIMES,
429                             feature_keys.TrainEvalFeatures.VALUES),
430               column_dtypes=None,
431               skip_header_lines=None,
432               read_num_records_hint=4096):
433    """CSV-parsing reader for a `TimeSeriesInputFn`.
434
435    Args:
436      filenames: A filename or list of filenames to read the time series
437          from. Each line must have columns corresponding to `column_names`.
438      column_names: A list indicating names for each
439          feature. `TrainEvalFeatures.TIMES` and `TrainEvalFeatures.VALUES` are
440          required; `VALUES` may be repeated to indicate a multivariate series.
441      column_dtypes: If provided, must be a list with the same length as
442          `column_names`, indicating dtypes for each column. Defaults to
443          `tf.int64` for `TrainEvalFeatures.TIMES` and `tf.float32` for
444          everything else.
445      skip_header_lines: Passed on to `tf.TextLineReader`; skips this number of
446          lines at the beginning of each file.
447      read_num_records_hint: When not reading a full dataset, indicates the
448          number of records to parse/transfer in a single chunk (for
449          efficiency). The actual number transferred at one time may be more or
450          less.
451    Raises:
452      ValueError: If required column names are not specified, or if lengths do
453        not match.
454    """
455    if feature_keys.TrainEvalFeatures.TIMES not in column_names:
456      raise ValueError("'{}' is a required column.".format(
457          feature_keys.TrainEvalFeatures.TIMES))
458    if feature_keys.TrainEvalFeatures.VALUES not in column_names:
459      raise ValueError("'{}' is a required column.".format(
460          feature_keys.TrainEvalFeatures.VALUES))
461    if column_dtypes is not None and len(column_dtypes) != len(column_names):
462      raise ValueError(
463          ("If specified, the length of column_dtypes must match the length of "
464           "column_names (got column_dtypes={} and column_names={}).").format(
465               column_dtypes, column_names))
466    if sum(1 for column_name in column_names
467           if column_name == feature_keys.TrainEvalFeatures.TIMES) != 1:
468      raise ValueError(
469          "Got more than one times column ('{}'), but exactly "
470          "one is required.".format(feature_keys.TrainEvalFeatures.TIMES))
471    self._column_names = column_names
472    self._column_dtypes = column_dtypes
473    self._skip_header_lines = skip_header_lines
474    super(CSVReader, self).__init__(
475        filenames=filenames, read_num_records_hint=read_num_records_hint)
476
477  def _get_reader(self):
478    return io_ops.TextLineReader(skip_header_lines=self._skip_header_lines)
479
480  def _process_records(self, lines):
481    """Parse `lines` as CSV records."""
482    if self._column_dtypes is None:
483      default_values = [(array_ops.zeros([], dtypes.int64),)
484                        if column_name == feature_keys.TrainEvalFeatures.TIMES
485                        else () for column_name in self._column_names]
486    else:
487      default_values = [(array_ops.zeros([], dtype),)
488                        for dtype in self._column_dtypes]
489    columns = parsing_ops.decode_csv(lines, default_values)
490    features_lists = {}
491    for column_name, value in zip(self._column_names, columns):
492      features_lists.setdefault(column_name, []).append(value)
493    features = {}
494    for column_name, values in features_lists.items():
495      if column_name == feature_keys.TrainEvalFeatures.TIMES:
496        features[column_name] = values[0]
497      else:
498        features[column_name] = array_ops.stack(values, axis=1)
499    return features
500
501
502class TFExampleReader(ReaderBaseTimeSeriesParser):
503  """Reads and parses `tf.Example`s from a TFRecords file."""
504
505  def __init__(self,
506               filenames,
507               features):
508    """Configure `tf.Example` parsing.
509
510    Args:
511      filenames: A filename or list of filenames to read the time series
512          from. Each line must have columns corresponding to `column_names`.
513      features: A dictionary mapping from feature keys to `tf.FixedLenFeature`
514          objects. Must include `TrainEvalFeatures.TIMES` (scalar integer) and
515          `TrainEvalFeatures.VALUES` (floating point vector) features.
516    Raises:
517      ValueError: If required times/values features are not present.
518    """
519    if feature_keys.TrainEvalFeatures.TIMES not in features:
520      raise ValueError("'{}' is a required column.".format(
521          feature_keys.TrainEvalFeatures.TIMES))
522    if feature_keys.TrainEvalFeatures.VALUES not in features:
523      raise ValueError("'{}' is a required column.".format(
524          feature_keys.TrainEvalFeatures.VALUES))
525    self._features = features
526    super(TFExampleReader, self).__init__(filenames=filenames)
527
528  def _get_reader(self):
529    return io_ops.TFRecordReader()
530
531  def _process_records(self, examples):
532    """Parse `tf.Example`s into `Tensors`."""
533    return parsing_ops.parse_example(
534        serialized=examples, features=self._features)
535
536
537class TimeSeriesInputFn(object):
538  """Base for classes which create batches of windows from a time series."""
539
540  @abc.abstractmethod
541  def create_batch(self):
542    """Creates chunked Tensors from times, values, and other features.
543
544    Suitable for use as the input_fn argument of a tf.estimator.Estimator's
545    fit() or evaluate() method.
546
547    Returns:
548      A tuple of (features, targets):
549        features: A dictionary with `TrainEvalFeatures.TIMES` and
550          `TrainEvalFeatures.VALUES` as keys, `TIMES` having an associated value
551          with shape [batch size x window length], `VALUES` with shape [batch
552          size x window length x number of features]. Any other features will
553          also have shapes prefixed with [batch size x window length].
554        targets: Not used, but must have a value for compatibility with the
555          Estimator API. That value should be None.
556    """
557    pass
558
559  def __call__(self):
560    # Allow a TimeSeriesInputFn to be used as an input function directly
561    return self.create_batch()
562
563
564class WholeDatasetInputFn(TimeSeriesInputFn):
565  """Supports passing a full time series to a model for evaluation/inference.
566
567  Note that this `TimeSeriesInputFn` is not designed for high throughput, and
568  should not be used for training. It allows for sequential evaluation on a full
569  dataset (with sequential in-sample predictions), which then feeds naturally
570  into `predict_continuation_input_fn` for making out-of-sample
571  predictions. While this is useful for plotting and interactive use,
572  `RandomWindowInputFn` is better suited to training and quantitative
573  evaluation.
574  """
575  # TODO(allenl): A SequentialWindowInputFn for getting model end state without
576  # loading the whole dataset into memory (or for quantitative evaluation of
577  # sequential models). Note that an Estimator using such a TimeSeriesInputFn
578  # won't return in-sample predictions for the whole dataset, which means it
579  # won't be terribly useful for interactive use/plotting (unless the user
580  # passes in concat metrics). Also need to be careful about state saving for
581  # sequential models, particularly the gaps between chunks.
582
583  def __init__(self, time_series_reader):
584    """Initialize the `TimeSeriesInputFn`.
585
586    Args:
587      time_series_reader: A TimeSeriesReader object.
588    """
589    self._reader = time_series_reader
590    super(WholeDatasetInputFn, self).__init__()
591
592  def create_batch(self):
593    """A suitable `input_fn` for an `Estimator`'s `evaluate()`.
594
595    Returns:
596      A dictionary mapping feature names to `Tensors`, each shape
597      prefixed by [1, data set size] (i.e. a batch size of 1).
598    """
599    features = self._reader.read_full()
600    # Add a batch dimension of one to each feature.
601    return ({feature_name: feature_value[None, ...]
602             for feature_name, feature_value in features.items()},
603            None)
604
605
606class RandomWindowInputFn(TimeSeriesInputFn):
607  """Wraps a `TimeSeriesReader` to create random batches of windows.
608
609  Tensors are first collected into sequential windows (in a windowing queue
610  created by `tf.train.batch`, based on the order returned from
611  `time_series_reader`), then these windows are randomly batched (in a
612  `RandomShuffleQueue`), the Tensors returned by `create_batch` having shapes
613  prefixed by [`batch_size`, `window_size`].
614
615  This `TimeSeriesInputFn` is useful for both training and quantitative
616  evaluation (but be sure to run several epochs for sequential models such as
617  `StructuralEnsembleRegressor` to completely flush stale state left over from
618  training). For qualitative evaluation or when preparing for predictions, use
619  `WholeDatasetInputFn`.
620  """
621
622  def __init__(
623      self, time_series_reader, window_size, batch_size,
624      queue_capacity_multiplier=1000, shuffle_min_after_dequeue_multiplier=2,
625      discard_out_of_order=True, discard_consecutive_batches_limit=1000,
626      jitter=True, num_threads=2, shuffle_seed=None):
627    """Configure the RandomWindowInputFn.
628
629    Args:
630      time_series_reader: A TimeSeriesReader object.
631      window_size: The number of examples to keep together sequentially. This
632        controls the length of truncated backpropagation: smaller values mean
633        less sequential computation, which can lead to faster training, but
634        create a coarser approximation to the gradient (which would ideally be
635        computed by a forward pass over the entire sequence in order).
636      batch_size: The number of windows to place together in a batch. Larger
637        values will lead to more stable gradients during training.
638      queue_capacity_multiplier: The capacity for the queues used to create
639        batches, specified as a multiple of `batch_size` (for
640        RandomShuffleQueue) and `batch_size * window_size` (for the
641        FIFOQueue). Controls the maximum number of windows stored. Should be
642        greater than `shuffle_min_after_dequeue_multiplier`.
643      shuffle_min_after_dequeue_multiplier: The minimum number of windows in the
644        RandomShuffleQueue after a dequeue, which controls the amount of entropy
645        introduced during batching. Specified as a multiple of `batch_size`.
646      discard_out_of_order: If True, windows of data which have times which
647        decrease (a higher time followed by a lower time) are discarded. If
648        False, the window and associated features are instead sorted so that
649        times are non-decreasing. Discarding is typically faster, as models do
650        not have to deal with artificial gaps in the data. However, discarding
651        does create a bias where the beginnings and endings of files are
652        under-sampled.
653      discard_consecutive_batches_limit: Raise an OutOfRangeError if more than
654        this number of batches are discarded without a single non-discarded
655        window (prevents infinite looping when the dataset is too small).
656      jitter: If True, randomly discards examples between some windows in order
657        to avoid deterministic chunking patterns. This is important for models
658        like AR which may otherwise overfit a fixed chunking.
659      num_threads: Use this number of threads for queues. Setting a value of 1
660        removes one source of non-determinism (and in combination with
661        shuffle_seed should provide deterministic windowing).
662      shuffle_seed: A seed for window shuffling. The default value of None
663        provides random behavior. With `shuffle_seed` set and
664        `num_threads=1`, provides deterministic behavior.
665    """
666    self._reader = time_series_reader
667    self._window_size = window_size
668    self._reader.check_dataset_size(minimum_dataset_size=self._window_size)
669    self._batch_size = batch_size
670    self._queue_capacity_multiplier = queue_capacity_multiplier
671    self._shuffle_min_after_dequeue_multiplier = (
672        shuffle_min_after_dequeue_multiplier)
673    self._discard_out_of_order = discard_out_of_order
674    self._discard_limit = discard_consecutive_batches_limit
675    self._jitter = jitter
676    if num_threads is None:
677      self._num_threads = self._batch_size
678    else:
679      self._num_threads = num_threads
680    self._shuffle_seed = shuffle_seed
681    super(RandomWindowInputFn, self).__init__()
682
683  def create_batch(self):
684    """Create queues to window and batch time series data.
685
686    Returns:
687      A dictionary of Tensors corresponding to the output of `self._reader`
688      (from the `time_series_reader` constructor argument), each with shapes
689      prefixed by [`batch_size`, `window_size`].
690    """
691    features = self._reader.read()
692    if self._jitter:
693      # TODO(agarwal, allenl): Figure out if more jitter is needed here.
694      jitter = random_ops.random_uniform(shape=[], maxval=2, dtype=dtypes.int32)
695    else:
696      jitter = 0
697    # To keep things efficient, we pass from the windowing batcher to the
698    # batch-of-windows batcher in batches. This avoids the need for huge numbers
699    # of threads, but does mean that jitter is only applied occasionally.
700    # TODO(allenl): Experiment with different internal passing sizes.
701    internal_passing_size = self._batch_size
702    features_windowed = input_lib.batch(
703        features,
704        batch_size=self._window_size * internal_passing_size + jitter,
705        enqueue_many=True,
706        capacity=(self._queue_capacity_multiplier
707                  * internal_passing_size * self._window_size),
708        num_threads=self._num_threads)
709    raw_features_windowed = features_windowed
710    if self._jitter:
711      features_windowed = {
712          key: value[jitter:]
713          for key, value in features_windowed.items()}
714    features_windowed = {
715        key: array_ops.reshape(
716            value,
717            array_ops.concat(
718                [[internal_passing_size, self._window_size],
719                 array_ops.shape(value)[1:]],
720                axis=0))
721        for key, value in features_windowed.items()}
722    batch_and_window_shape = tensor_shape.TensorShape(
723        [internal_passing_size, self._window_size])
724    for key in features_windowed.keys():
725      features_windowed[key].set_shape(
726          batch_and_window_shape.concatenate(
727              raw_features_windowed[key].get_shape()[1:]))
728    # When switching files, we may end up with windows where the time is not
729    # decreasing, even if times within each file are sorted (and even if those
730    # files are visited in order, when looping back around to the beginning of
731    # the first file). This is hard for models to deal with, so we either
732    # discard such examples, creating a bias where the beginning and end of the
733    # series is under-sampled, or we sort the window, creating large gaps.
734    times = features_windowed[feature_keys.TrainEvalFeatures.TIMES]
735    if self._discard_out_of_order:
736      non_decreasing = math_ops.reduce_all(
737          times[:, 1:] >= times[:, :-1], axis=1)
738      # Ensure that no more than self._discard_limit complete batches are
739      # discarded contiguously (resetting the count when we find a single clean
740      # window). This prevents infinite looping when the dataset is smaller than
741      # the window size.
742      # TODO(allenl): Figure out a way to return informative errors from
743      # count_up_to.
744      discarded_windows_limiter = variable_scope.variable(
745          initial_value=constant_op.constant(0, dtype=dtypes.int64),
746          name="discarded_windows_limiter",
747          trainable=False,
748          collections=[ops.GraphKeys.LOCAL_VARIABLES])
749      def _initialized_limit_check():
750        return control_flow_ops.cond(
751            math_ops.reduce_any(non_decreasing),
752            lambda: state_ops.assign(discarded_windows_limiter, 0),
753            lambda: discarded_windows_limiter.count_up_to(self._discard_limit))
754      discard_limit_op = control_flow_ops.cond(
755          state_ops.is_variable_initialized(discarded_windows_limiter),
756          _initialized_limit_check,
757          lambda: constant_op.constant(0, dtype=dtypes.int64))
758      with ops.control_dependencies([discard_limit_op]):
759        non_decreasing = array_ops.identity(non_decreasing)
760    else:
761      _, indices_descending = nn.top_k(
762          times, k=array_ops.shape(times)[-1], sorted=True)
763      indices = array_ops.reverse(indices_descending, axis=[0])
764      features_windowed = {
765          key: array_ops.gather(params=value, indices=indices)
766          for key, value in features_windowed.items()
767      }
768      non_decreasing = True
769    features_batched = input_lib.maybe_shuffle_batch(
770        features_windowed,
771        num_threads=self._num_threads,
772        seed=self._shuffle_seed,
773        batch_size=self._batch_size,
774        capacity=self._queue_capacity_multiplier * self._batch_size,
775        min_after_dequeue=(self._shuffle_min_after_dequeue_multiplier *
776                           self._batch_size),
777        keep_input=non_decreasing,
778        enqueue_many=True)
779    return (features_batched, None)
780
781
782def _canonicalize_numpy_data(data, require_single_batch):
783  """Do basic checking and reshaping for Numpy data.
784
785  Args:
786    data: A dictionary mapping keys to Numpy arrays, with several possible
787      shapes (requires keys `TrainEvalFeatures.TIMES` and
788      `TrainEvalFeatures.VALUES`):
789        Single example; `TIMES` is a scalar and `VALUES` is either a scalar or a
790          vector of length [number of features].
791        Sequence; `TIMES` is a vector of shape [series length], `VALUES` either
792          has shape [series length] (univariate) or [series length x number of
793          features] (multivariate).
794        Batch of sequences; `TIMES` is a vector of shape [batch size x series
795          length], `VALUES` has shape [batch size x series length] or [batch
796          size x series length x number of features].
797      In any case, `VALUES` and any exogenous features must have their shapes
798      prefixed by the shape of the value corresponding to the `TIMES` key.
799    require_single_batch: If True, raises an error if the provided data has a
800      batch dimension > 1.
801  Returns:
802    A dictionary with features normalized to have shapes prefixed with [batch
803    size x series length]. The sizes of dimensions which were omitted in the
804    inputs are 1.
805  Raises:
806    ValueError: If dimensions are incorrect or do not match, or required
807      features are missing.
808  """
809  features = {key: numpy.array(value) for key, value in data.items()}
810  if (feature_keys.TrainEvalFeatures.TIMES not in features or
811      feature_keys.TrainEvalFeatures.VALUES not in features):
812    raise ValueError("{} and {} are required features.".format(
813        feature_keys.TrainEvalFeatures.TIMES,
814        feature_keys.TrainEvalFeatures.VALUES))
815  times = features[feature_keys.TrainEvalFeatures.TIMES]
816  for key, value in features.items():
817    if value.shape[:len(times.shape)] != times.shape:
818      raise ValueError(
819          ("All features must have their shapes prefixed by the shape of the"
820           " times feature. Got shape {} for feature '{}', but shape {} for"
821           " '{}'").format(value.shape, key, times.shape,
822                           feature_keys.TrainEvalFeatures.TIMES))
823  if not times.shape:  # a single example
824    if not features[feature_keys.TrainEvalFeatures.VALUES].shape:  # univariate
825      # Add a feature dimension (with one feature)
826      features[feature_keys.TrainEvalFeatures.VALUES] = features[
827          feature_keys.TrainEvalFeatures.VALUES][..., None]
828    elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 1:
829      raise ValueError(
830          ("Got an unexpected number of dimensions for the '{}' feature."
831           " Was expecting at most 1 dimension"
832           " ([number of features]) since '{}' does not "
833           "have a batch or time dimension, but got shape {}").format(
834               feature_keys.TrainEvalFeatures.VALUES,
835               feature_keys.TrainEvalFeatures.TIMES,
836               features[feature_keys.TrainEvalFeatures.VALUES].shape))
837    # Add trivial batch and time dimensions for every feature
838    features = {key: value[None, None, ...] for key, value in features.items()}
839  if len(times.shape) == 1:  # shape [series length]
840    if len(features[feature_keys.TrainEvalFeatures.VALUES]
841           .shape) == 1:  # shape [series length]
842      # Add a feature dimension (with one feature)
843      features[feature_keys.TrainEvalFeatures.VALUES] = features[
844          feature_keys.TrainEvalFeatures.VALUES][..., None]
845    elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 2:
846      raise ValueError(
847          ("Got an unexpected number of dimensions for the '{}' feature."
848           " Was expecting at most 2 dimensions"
849           " ([series length, number of features]) since '{}' does not "
850           "have a batch dimension, but got shape {}").format(
851               feature_keys.TrainEvalFeatures.VALUES,
852               feature_keys.TrainEvalFeatures.TIMES,
853               features[feature_keys.TrainEvalFeatures.VALUES].shape))
854    # Add trivial batch dimensions for every feature
855    features = {key: value[None, ...] for key, value in features.items()}
856  elif len(features[feature_keys.TrainEvalFeatures.TIMES]
857           .shape) != 2:  # shape [batch size, series length]
858    raise ValueError(
859        ("Got an unexpected number of dimensions for times. Was expecting at "
860         "most two ([batch size, series length]), but got shape {}.").format(
861             times.shape))
862  if require_single_batch:
863    # We don't expect input to be already batched; batching is done later
864    if features[feature_keys.TrainEvalFeatures.TIMES].shape[0] != 1:
865      raise ValueError("Got batch input, was expecting unbatched input.")
866  return features
867