1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Auto-Regressive models for time series data."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21from tensorflow.contrib.rnn.python.ops import lstm_ops
22from tensorflow.contrib.timeseries.python.timeseries import math_utils
23from tensorflow.contrib.timeseries.python.timeseries import model
24from tensorflow.contrib.timeseries.python.timeseries import model_utils
25from tensorflow.contrib.timeseries.python.timeseries.feature_keys import PredictionFeatures
26from tensorflow.contrib.timeseries.python.timeseries.feature_keys import TrainEvalFeatures
27
28from tensorflow.python.estimator import estimator_lib
29from tensorflow.python.framework import constant_op
30from tensorflow.python.framework import dtypes
31from tensorflow.python.framework import ops
32from tensorflow.python.keras.engine import sequential
33from tensorflow.python.keras.engine import training
34from tensorflow.python.keras.layers import core
35from tensorflow.python.ops import array_ops
36from tensorflow.python.ops import check_ops
37from tensorflow.python.ops import control_flow_ops
38from tensorflow.python.ops import gen_math_ops
39from tensorflow.python.ops import init_ops
40from tensorflow.python.ops import math_ops
41from tensorflow.python.ops import nn_ops
42from tensorflow.python.ops import tensor_array_ops
43from tensorflow.python.ops import variable_scope
44
45
46class FlatPredictionModel(training.Model):
47  """Flattens input and output windows and puts them through dense layers.
48
49  This model does not operate on its own, but rather is a plugin to
50  `ARModel`. See `ARModel`'s constructor documentation
51  (`prediction_model_factory`) for a usage example.
52  """
53
54  def __init__(self,
55               num_features,
56               input_window_size,
57               output_window_size,
58               hidden_layer_sizes=None):
59    """Construct the flat prediction model.
60
61    Args:
62      num_features: number of input features per time step.
63      input_window_size: Number of past time steps of data to look at when doing
64        the regression.
65      output_window_size: Number of future time steps to predict. Note that
66        setting it to > 1 empirically seems to give a better fit.
67      hidden_layer_sizes: list of sizes of hidden layers.
68    """
69    super(FlatPredictionModel, self).__init__()
70    self._input_flatten = core.Flatten()
71    self._output_flatten = core.Flatten()
72    if hidden_layer_sizes:
73      self._hidden_layers = sequential.Sequential([
74          core.Dense(layer_size, activation=nn_ops.relu)
75          for layer_size in hidden_layer_sizes])
76    else:
77      self._hidden_layers = None
78    self._mean_transform = core.Dense(num_features * output_window_size,
79                                      name="predicted_mean")
80    self._covariance_transform = core.Dense(num_features * output_window_size,
81                                            name="log_sigma_square")
82    self._prediction_shape = [-1, output_window_size, num_features]
83
84  def call(self, input_window_features, output_window_features):
85    """Compute predictions from input and output windows.
86
87    Args:
88      input_window_features: A floating point Tensor with shape [batch size,
89        input window size, input features]. The batch dimension may not have
90        static shape information, but the window size and number of input
91        features are known at graph construction time and recorded in the static
92        shape information for the `input_window_features` `Tensor`. Note that
93        `input_window_size` may be zero.
94      output_window_features: A floating point Tensor with shape [batch size,
95        output window size, output features]. As with `input_window_features`,
96        the last two dimensions have static shape information. If there are no
97        output features, the size of the last dimension will be zero.
98    Returns:
99      A dictionary of predictions with keys "mean" and "covariance" (only
100      diagonal covariances are currently supported). Each has shape
101      [batch size, output window size, num_features], where num_features is the
102      same as the constructor argument.
103    """
104    if input_window_features.shape.dims[1].value == 0:
105      # TODO(allenl): Make reshape()'s static shape information work on
106      # zero-size Tensors? Currently this special case is required because
107      # otherwise the Dense layers get unknown last dimensions.
108      activation = self._output_flatten(output_window_features)
109    elif output_window_features.shape.dims[2].value == 0:
110      activation = self._input_flatten(input_window_features)
111    else:
112      activation = array_ops.concat(
113          [self._input_flatten(input_window_features),
114           self._output_flatten(output_window_features)],
115          axis=1)
116    if self._hidden_layers:
117      activation = self._hidden_layers(activation)
118    predicted_mean = array_ops.reshape(
119        self._mean_transform(activation),
120        self._prediction_shape)
121    predicted_covariance = array_ops.reshape(
122        gen_math_ops.exp(self._covariance_transform(activation)),
123        self._prediction_shape)
124    return {"mean": predicted_mean,
125            "covariance": predicted_covariance}
126
127
128class LSTMPredictionModel(training.Model):
129  """A simple encoder/decoder model using an LSTM.
130
131  This model does not operate on its own, but rather is a plugin to
132  `ARModel`. See `ARModel`'s constructor documentation
133  (`prediction_model_factory`) for a usage example.
134  """
135
136  def __init__(self,
137               num_features,
138               input_window_size,
139               output_window_size,
140               num_units=128):
141    """Construct the LSTM prediction model.
142
143    Args:
144      num_features: number of input features per time step.
145      input_window_size: Number of past time steps of data to look at when doing
146        the regression.
147      output_window_size: Number of future time steps to predict. Note that
148        setting it to > 1 empirically seems to give a better fit.
149      num_units: The number of units in the encoder and decoder LSTM cells.
150    """
151    super(LSTMPredictionModel, self).__init__()
152    self._encoder = lstm_ops.LSTMBlockFusedCell(
153        num_units=num_units, name="encoder")
154    self._decoder = lstm_ops.LSTMBlockFusedCell(
155        num_units=num_units, name="decoder")
156    self._mean_transform = core.Dense(num_features,
157                                      name="mean_transform")
158    self._covariance_transform = core.Dense(num_features,
159                                            name="covariance_transform")
160
161  def call(self, input_window_features, output_window_features):
162    """Compute predictions from input and output windows."""
163    # Convert to time major
164    input_window_features = array_ops.transpose(input_window_features,
165                                                [1, 0, 2])
166    output_window_features = array_ops.transpose(output_window_features,
167                                                 [1, 0, 2])
168    _, encoder_state = self._encoder(
169        input_window_features, dtype=self.dtype)
170    decoder_output, _ = self._decoder(
171        output_window_features, dtype=self.dtype,
172        initial_state=encoder_state)
173
174    # Switch back to batch major
175    decoder_output = array_ops.transpose(decoder_output, [1, 0, 2])
176    predicted_mean = self._mean_transform(decoder_output)
177    predicted_covariance = gen_math_ops.exp(
178        self._covariance_transform(decoder_output))
179    return {"mean": predicted_mean,
180            "covariance": predicted_covariance}
181
182
183class ARModel(model.TimeSeriesModel):
184  """Auto-regressive model, both linear and non-linear.
185
186  Features to the model include time and values of input_window_size timesteps,
187  and times for output_window_size timesteps. These are passed through a
188  configurable prediction model, and then fed to a loss function (e.g. squared
189  loss).
190
191  Note that this class can also be used to regress against time only by setting
192  the input_window_size to zero.
193
194  Each periodicity in the `periodicities` arg is divided by the
195  `num_time_buckets` into time buckets that are represented as features added
196  to the model.
197
198  A good heuristic for picking an appropriate periodicity for a given data set
199  would be the length of cycles in the data. For example, energy usage in a
200  home is typically cyclic each day. If the time feature in a home energy
201  usage dataset is in the unit of hours, then 24 would be an appropriate
202  periodicity. Similarly, a good heuristic for `num_time_buckets` is how often
203  the data is expected to change within the cycle. For the aforementioned home
204  energy usage dataset and periodicity of 24, then 48 would be a reasonable
205  value if usage is expected to change every half hour.
206
207  Each feature's value for a given example with time t is the difference
208  between t and the start of the time bucket it falls under. If it doesn't fall
209  under a feature's associated time bucket, then that feature's value is zero.
210
211  For example: if `periodicities` = (9, 12) and `num_time_buckets` = 3, then 6
212  features would be added to the model, 3 for periodicity 9 and 3 for
213  periodicity 12.
214
215  For an example data point where t = 17:
216  - It's in the 3rd time bucket for periodicity 9 (2nd period is 9-18 and 3rd
217    time bucket is 15-18)
218  - It's in the 2nd time bucket for periodicity 12 (2nd period is 12-24 and
219    2nd time bucket is between 16-20).
220
221  Therefore the 6 added features for this row with t = 17 would be:
222
223  # Feature name (periodicity#_timebucket#), feature value
224  P9_T1, 0 # not in first time bucket
225  P9_T2, 0 # not in second time bucket
226  P9_T3, 2 # 17 - 15 since 15 is the start of the 3rd time bucket
227  P12_T1, 0 # not in first time bucket
228  P12_T2, 1 # 17 - 16 since 16 is the start of the 2nd time bucket
229  P12_T3, 0 # not in third time bucket
230  """
231  SQUARED_LOSS = "squared_loss"
232  NORMAL_LIKELIHOOD_LOSS = "normal_likelihood_loss"
233
234  def __init__(self,
235               periodicities,
236               input_window_size,
237               output_window_size,
238               num_features,
239               prediction_model_factory=FlatPredictionModel,
240               num_time_buckets=10,
241               loss=NORMAL_LIKELIHOOD_LOSS,
242               exogenous_feature_columns=None):
243    """Constructs an auto-regressive model.
244
245    Args:
246      periodicities: periodicities of the input data, in the same units as the
247        time feature (for example 24 if feeding hourly data with a daily
248        periodicity, or 60 * 24 if feeding minute-level data with daily
249        periodicity). Note this can be a single value or a list of values for
250        multiple periodicities.
251      input_window_size: Number of past time steps of data to look at when doing
252        the regression.
253      output_window_size: Number of future time steps to predict. Note that
254        setting it to > 1 empirically seems to give a better fit.
255      num_features: number of input features per time step.
256      prediction_model_factory: A callable taking arguments `num_features`,
257        `input_window_size`, and `output_window_size` and returning a
258        `tf.keras.Model`. The `Model`'s `call()` takes two arguments: an input
259        window and an output window, and returns a dictionary of predictions.
260        See `FlatPredictionModel` for an example. Example usage:
261
262        ```python model = ar_model.ARModel( periodicities=2, num_features=3,
263        prediction_model_factory=functools.partial( FlatPredictionModel,
264        hidden_layer_sizes=[10, 10])) ```
265
266        The default model computes predictions as a linear function of flattened
267        input and output windows.
268      num_time_buckets: Number of buckets into which to divide (time %
269        periodicity). This value multiplied by the number of periodicities is
270        the number of time features added to the model.
271      loss: Loss function to use for training. Currently supported values are
272        SQUARED_LOSS and NORMAL_LIKELIHOOD_LOSS. Note that for
273        NORMAL_LIKELIHOOD_LOSS, we train the covariance term as well. For
274        SQUARED_LOSS, the evaluation loss is reported based on un-scaled
275        observations and predictions, while the training loss is computed on
276        normalized data (if input statistics are available).
277      exogenous_feature_columns: A list of `tf.feature_column`s (for example
278        `tf.feature_column.embedding_column`) corresponding to
279        features which provide extra information to the model but are not part
280        of the series to be predicted.
281    """
282    self._model_factory = prediction_model_factory
283    self.input_window_size = input_window_size
284    self.output_window_size = output_window_size
285    self.window_size = self.input_window_size + self.output_window_size
286    self.loss = loss
287    super(ARModel, self).__init__(
288        num_features=num_features,
289        exogenous_feature_columns=exogenous_feature_columns)
290    if exogenous_feature_columns is not None:
291      self.exogenous_size = self._get_exogenous_embedding_shape()[-1]
292    else:
293      self.exogenous_size = 0
294    assert num_time_buckets > 0
295    self._buckets = int(num_time_buckets)
296    if periodicities is None or not periodicities:
297      periodicities = []
298    elif (not isinstance(periodicities, list) and
299          not isinstance(periodicities, tuple)):
300      periodicities = [periodicities]
301    self._periodicities = [int(p) for p in periodicities]
302    for p in self._periodicities:
303      assert p > 0
304    assert len(self._periodicities) or self.input_window_size
305    assert output_window_size > 0
306
307  def initialize_graph(self, input_statistics=None):
308    super(ARModel, self).initialize_graph(input_statistics=input_statistics)
309    self._model_scope = variable_scope.variable_scope(
310        # The trailing slash means we strip all enclosing variable_scopes, which
311        # unfortunately is necessary because the model gets called inside and
312        # outside a "while" scope (for prediction and training respectively),
313        # and the variables names need to match.
314        "model/", use_resource=True)
315    self._model_instance = self._model_factory(
316        num_features=self.num_features,
317        input_window_size=self.input_window_size,
318        output_window_size=self.output_window_size)
319
320  def get_start_state(self):
321    # State which matches the format we'll return later. Typically this will not
322    # be used by the model directly, but the shapes and dtypes should match so
323    # that the serving input_receiver_fn gets placeholder shapes correct.
324    return (array_ops.zeros([self.input_window_size], dtype=dtypes.int64),
325            array_ops.zeros(
326                [self.input_window_size, self.num_features], dtype=self.dtype),
327            array_ops.zeros(
328                [self.input_window_size, self.exogenous_size],
329                dtype=self.dtype))
330
331  # TODO(allenl,agarwal): Support sampling for AR.
332  def random_model_parameters(self, seed=None):
333    pass
334
335  def generate(self, number_of_series, series_length,
336               model_parameters=None, seed=None):
337    pass
338
339  def _predicted_covariance_op(self, activations, num_values):
340    activation, activation_size = activations[-1]
341    if self.loss == ARModel.NORMAL_LIKELIHOOD_LOSS:
342      log_sigma_square = model_utils.fully_connected(
343          activation,
344          activation_size,
345          self.output_window_size * num_values,
346          name="log_sigma_square",
347          activation=None)
348      predicted_covariance = gen_math_ops.exp(log_sigma_square)
349      predicted_covariance = array_ops.reshape(
350          predicted_covariance, [-1, self.output_window_size, num_values])
351    else:
352      shape = array_ops.stack([
353          array_ops.shape(activation)[0],
354          constant_op.constant(self.output_window_size),
355          constant_op.constant(num_values)
356      ])
357      predicted_covariance = array_ops.ones(shape=shape, dtype=activation.dtype)
358    return predicted_covariance
359
360  def _predicted_mean_op(self, activations):
361    activation, activation_size = activations[-1]
362    predicted_mean = model_utils.fully_connected(
363        activation,
364        activation_size,
365        self.output_window_size * self.num_features,
366        name="predicted_mean",
367        activation=None)
368    return array_ops.reshape(predicted_mean,
369                             [-1, self.output_window_size, self.num_features])
370
371  def prediction_ops(self, times, values, exogenous_regressors):
372    """Compute model predictions given input data.
373
374    Args:
375      times: A [batch size, self.window_size] integer Tensor, the first
376          self.input_window_size times in each part of the batch indicating
377          input features, and the last self.output_window_size times indicating
378          prediction times.
379      values: A [batch size, self.input_window_size, self.num_features] Tensor
380          with input features.
381      exogenous_regressors: A [batch size, self.window_size,
382          self.exogenous_size] Tensor with exogenous features.
383    Returns:
384      Tuple (predicted_mean, predicted_covariance), where each element is a
385      Tensor with shape [batch size, self.output_window_size,
386      self.num_features].
387    """
388    times.get_shape().assert_is_compatible_with([None, self.window_size])
389    batch_size = array_ops.shape(times)[0]
390    if self.input_window_size:
391      values.get_shape().assert_is_compatible_with(
392          [None, self.input_window_size, self.num_features])
393    if exogenous_regressors is not None:
394      exogenous_regressors.get_shape().assert_is_compatible_with(
395          [None, self.window_size, self.exogenous_size])
396    # Create input features.
397    input_window_features = []
398    input_feature_size = 0
399    output_window_features = []
400    output_feature_size = 0
401    if self._periodicities:
402      _, time_features = self._compute_time_features(times)
403      num_time_features = self._buckets * len(self._periodicities)
404      time_features = array_ops.reshape(
405          time_features,
406          [batch_size,
407           self.window_size,
408           num_time_features])
409      input_time_features, output_time_features = array_ops.split(
410          time_features, (self.input_window_size, self.output_window_size),
411          axis=1)
412      input_feature_size += num_time_features
413      output_feature_size += num_time_features
414      input_window_features.append(input_time_features)
415      output_window_features.append(output_time_features)
416    if self.input_window_size:
417      inp = array_ops.slice(values, [0, 0, 0], [-1, self.input_window_size, -1])
418      input_window_features.append(
419          array_ops.reshape(
420              inp,
421              [batch_size, self.input_window_size, self.num_features]))
422      input_feature_size += self.num_features
423    if self.exogenous_size:
424      input_exogenous_features, output_exogenous_features = array_ops.split(
425          exogenous_regressors,
426          (self.input_window_size, self.output_window_size),
427          axis=1)
428      input_feature_size += self.exogenous_size
429      output_feature_size += self.exogenous_size
430      input_window_features.append(input_exogenous_features)
431      output_window_features.append(output_exogenous_features)
432    assert input_window_features
433    input_window_features = array_ops.concat(input_window_features, axis=2)
434    if output_window_features:
435      output_window_features = array_ops.concat(output_window_features, axis=2)
436    else:
437      output_window_features = array_ops.zeros(
438          [batch_size, self.output_window_size, 0],
439          dtype=self.dtype)
440    static_batch_size = times.get_shape().dims[0].value
441    input_window_features.set_shape(
442        [static_batch_size, self.input_window_size, input_feature_size])
443    output_window_features.set_shape(
444        [static_batch_size, self.output_window_size, output_feature_size])
445    return self._output_window_predictions(input_window_features,
446                                           output_window_features)
447
448  def _output_window_predictions(
449      self, input_window_features, output_window_features):
450    with self._model_scope:
451      predictions = self._model_instance(
452          input_window_features, output_window_features)
453      result_shape = [None, self.output_window_size, self.num_features]
454      for v in predictions.values():
455        v.set_shape(result_shape)
456      return predictions
457
458  def loss_op(self, targets, prediction_ops):
459    """Create loss_op."""
460    prediction = prediction_ops["mean"]
461    if self.loss == ARModel.NORMAL_LIKELIHOOD_LOSS:
462      covariance = prediction_ops["covariance"]
463      sigma = math_ops.sqrt(gen_math_ops.maximum(covariance, 1e-5))
464      loss_op = -math_ops.reduce_sum(
465          math_utils.normal_log_prob(targets, sigma, prediction))
466    else:
467      assert self.loss == ARModel.SQUARED_LOSS, self.loss
468      loss_op = math_ops.reduce_sum(
469          math_ops.squared_difference(prediction, targets))
470    loss_op /= math_ops.cast(
471        math_ops.reduce_prod(array_ops.shape(targets)), loss_op.dtype)
472    return loss_op
473
474  def _process_exogenous_features(self, times, features):
475    embedded = super(ARModel, self)._process_exogenous_features(
476        times=times, features=features)
477    if embedded is None:
478      assert self.exogenous_size == 0
479      # No embeddings. Return a zero-size [batch, times, 0] array so we don't
480      # have to special case it downstream.
481      return array_ops.zeros(
482          array_ops.concat([array_ops.shape(times), constant_op.constant([0])],
483                           axis=0))
484    else:
485      return embedded
486
487  # TODO(allenl, agarwal): Consider better ways of warm-starting predictions.
488  def predict(self, features):
489    """Computes predictions multiple steps into the future.
490
491    Args:
492      features: A dictionary with the following key/value pairs:
493        PredictionFeatures.TIMES: A [batch size, predict window size]
494          integer Tensor of times, after the window of data indicated by
495          `STATE_TUPLE`, to make predictions for.
496        PredictionFeatures.STATE_TUPLE: A tuple of (times, values), times with
497          shape [batch size, self.input_window_size], values with shape [batch
498          size, self.input_window_size, self.num_features] representing a
499          segment of the time series before `TIMES`. This data is used
500          to start of the autoregressive computation. This should have data for
501          at least self.input_window_size timesteps.
502        And any exogenous features, with shapes prefixed by shape of `TIMES`.
503    Returns:
504      A dictionary with keys, "mean", "covariance". The
505      values are Tensors of shape [batch_size, predict window size,
506      num_features] and correspond to the values passed in `TIMES`.
507    """
508    if not self._graph_initialized:
509      self.initialize_graph()
510    predict_times = math_ops.cast(
511        ops.convert_to_tensor(features[PredictionFeatures.TIMES]), dtypes.int32)
512    exogenous_regressors = self._process_exogenous_features(
513        times=predict_times,
514        features={key: value for key, value in features.items()
515                  if key not in [TrainEvalFeatures.TIMES,
516                                 TrainEvalFeatures.VALUES,
517                                 PredictionFeatures.STATE_TUPLE]})
518    with ops.control_dependencies(
519        [check_ops.assert_equal(array_ops.shape(predict_times)[1],
520                                array_ops.shape(exogenous_regressors)[1])]):
521      exogenous_regressors = array_ops.identity(exogenous_regressors)
522    batch_size = array_ops.shape(predict_times)[0]
523    num_predict_values = array_ops.shape(predict_times)[1]
524    prediction_iterations = ((num_predict_values + self.output_window_size - 1)
525                             // self.output_window_size)
526    # Pad predict_times and exogenous regressors so as to have exact multiple of
527    # self.output_window_size values per example.
528    padding_size = (prediction_iterations * self.output_window_size -
529                    num_predict_values)
530    predict_times = array_ops.pad(
531        predict_times, [[0, 0], [0, padding_size]])
532    exogenous_regressors = array_ops.pad(
533        exogenous_regressors, [[0, 0], [0, padding_size], [0, 0]])
534    state = features[PredictionFeatures.STATE_TUPLE]
535    (state_times, state_values, state_exogenous_regressors) = state
536    state_times = math_ops.cast(
537        ops.convert_to_tensor(state_times), dtypes.int32)
538    state_values = ops.convert_to_tensor(state_values, dtype=self.dtype)
539    state_exogenous_regressors = ops.convert_to_tensor(
540        state_exogenous_regressors, dtype=self.dtype)
541
542    initial_input_times = predict_times[:, :self.output_window_size]
543    initial_input_exogenous_regressors = (
544        exogenous_regressors[:, :self.output_window_size, :])
545    if self.input_window_size > 0:
546      initial_input_times = array_ops.concat(
547          [state_times[:, -self.input_window_size:], initial_input_times], 1)
548      values_size = array_ops.shape(state_values)[1]
549      times_size = array_ops.shape(state_times)[1]
550      with ops.control_dependencies([
551          check_ops.assert_greater_equal(values_size, self.input_window_size),
552          check_ops.assert_equal(values_size, times_size)
553      ]):
554        initial_input_values = state_values[:, -self.input_window_size:, :]
555        initial_input_exogenous_regressors = array_ops.concat(
556            [state_exogenous_regressors[:, -self.input_window_size:, :],
557             initial_input_exogenous_regressors[
558                 :, :self.output_window_size, :]],
559            axis=1)
560    else:
561      initial_input_values = 0
562
563    # Iterate over the predict_times, predicting self.output_window_size values
564    # in each iteration.
565    def _while_condition(iteration_number, *unused_args):
566      return math_ops.less(iteration_number, prediction_iterations)
567
568    def _while_body(iteration_number, input_times, input_values,
569                    input_exogenous_regressors, mean_ta, covariance_ta):
570      """Predict self.output_window_size values."""
571      prediction_ops = self.prediction_ops(
572          input_times, input_values, input_exogenous_regressors)
573      predicted_mean = prediction_ops["mean"]
574      predicted_covariance = prediction_ops["covariance"]
575      offset = self.output_window_size * gen_math_ops.minimum(
576          iteration_number + 1, prediction_iterations - 1)
577      if self.input_window_size > 0:
578        if self.output_window_size < self.input_window_size:
579          new_input_values = array_ops.concat(
580              [input_values[:, self.output_window_size:, :], predicted_mean], 1)
581          new_input_exogenous_regressors = array_ops.concat(
582              [input_exogenous_regressors[:, -self.input_window_size:, :],
583               exogenous_regressors[
584                   :, offset:offset + self.output_window_size, :]],
585              axis=1)
586          new_input_times = array_ops.concat([
587              input_times[:, -self.input_window_size:],
588              predict_times[:, offset:offset + self.output_window_size]
589          ], 1)
590        else:
591          new_input_values = predicted_mean[:, -self.input_window_size:, :]
592          new_input_exogenous_regressors = exogenous_regressors[
593              :,
594              offset - self.input_window_size:offset + self.output_window_size,
595              :]
596          new_input_times = predict_times[
597              :,
598              offset - self.input_window_size:offset + self.output_window_size]
599      else:
600        new_input_values = input_values
601        new_input_exogenous_regressors = exogenous_regressors[
602            :, offset:offset + self.output_window_size, :]
603        new_input_times = predict_times[:,
604                                        offset:offset + self.output_window_size]
605      new_input_times.set_shape(initial_input_times.get_shape())
606      new_input_exogenous_regressors.set_shape(
607          initial_input_exogenous_regressors.get_shape())
608      new_mean_ta = mean_ta.write(iteration_number, predicted_mean)
609      if isinstance(covariance_ta, tensor_array_ops.TensorArray):
610        new_covariance_ta = covariance_ta.write(iteration_number,
611                                                predicted_covariance)
612      else:
613        new_covariance_ta = covariance_ta
614      return (iteration_number + 1,
615              new_input_times,
616              new_input_values,
617              new_input_exogenous_regressors,
618              new_mean_ta,
619              new_covariance_ta)
620
621    # Note that control_flow_ops.while_loop doesn't seem happy with None. Hence
622    # using 0 for cases where we don't want to predict covariance.
623    covariance_ta_init = (tensor_array_ops.TensorArray(
624        dtype=self.dtype, size=prediction_iterations)
625                          if self.loss != ARModel.SQUARED_LOSS else 0.)
626    mean_ta_init = tensor_array_ops.TensorArray(
627        dtype=self.dtype, size=prediction_iterations)
628    _, _, _, _, mean_ta, covariance_ta = control_flow_ops.while_loop(
629        _while_condition, _while_body, [
630            0,
631            initial_input_times,
632            initial_input_values,
633            initial_input_exogenous_regressors,
634            mean_ta_init,
635            covariance_ta_init
636        ])
637
638    def _parse_ta(values_ta):
639      """Helper function to parse the returned TensorArrays."""
640
641      if not isinstance(values_ta, tensor_array_ops.TensorArray):
642        return None
643      predictions_length = prediction_iterations * self.output_window_size
644      # Shape [prediction_iterations, batch_size, self.output_window_size,
645      #        self.num_features]
646      values_packed = values_ta.stack()
647      # Transpose to move batch dimension outside.
648      output_values = array_ops.reshape(
649          array_ops.transpose(values_packed, [1, 0, 2, 3]),
650          array_ops.stack([batch_size, predictions_length, -1]))
651      # Clip to desired size
652      return output_values[:, :num_predict_values, :]
653
654    predicted_mean = _parse_ta(mean_ta)
655    predicted_covariance = _parse_ta(covariance_ta)
656    if predicted_covariance is None:
657      predicted_covariance = array_ops.ones_like(predicted_mean)
658
659    # Transform and scale the mean and covariance appropriately.
660    predicted_mean = self._scale_back_data(predicted_mean)
661    predicted_covariance = self._scale_back_variance(predicted_covariance)
662
663    return {"mean": predicted_mean,
664            "covariance": predicted_covariance}
665
666  def _process_window(self, features, mode, exogenous_regressors):
667    """Compute model outputs on a single window of data."""
668    times = math_ops.cast(features[TrainEvalFeatures.TIMES], dtypes.int64)
669    values = math_ops.cast(features[TrainEvalFeatures.VALUES], dtype=self.dtype)
670    exogenous_regressors = math_ops.cast(exogenous_regressors, dtype=self.dtype)
671    original_values = values
672
673    # Extra shape checking for the window size (above that in
674    # `head.create_estimator_spec`).
675    expected_times_shape = [None, self.window_size]
676    if not times.get_shape().is_compatible_with(expected_times_shape):
677      raise ValueError(
678          ("ARModel with input_window_size={input_window_size} "
679           "and output_window_size={output_window_size} expects "
680           "feature '{times_feature}' to have shape (batch_size, "
681           "{window_size}) (for any batch_size), but got shape {times_shape}. "
682           "If you are using RandomWindowInputFn, set "
683           "window_size={window_size} or adjust the input_window_size and "
684           "output_window_size arguments to ARModel.").format(
685               input_window_size=self.input_window_size,
686               output_window_size=self.output_window_size,
687               times_feature=TrainEvalFeatures.TIMES,
688               window_size=self.window_size,
689               times_shape=times.get_shape()))
690    values = self._scale_data(values)
691    if self.input_window_size > 0:
692      input_values = values[:, :self.input_window_size, :]
693    else:
694      input_values = None
695    prediction_ops = self.prediction_ops(
696        times, input_values, exogenous_regressors)
697    prediction = prediction_ops["mean"]
698    covariance = prediction_ops["covariance"]
699    targets = array_ops.slice(values, [0, self.input_window_size, 0],
700                              [-1, -1, -1])
701    targets.get_shape().assert_is_compatible_with(prediction.get_shape())
702    if (mode == estimator_lib.ModeKeys.EVAL
703        and self.loss == ARModel.SQUARED_LOSS):
704      # Report an evaluation loss which matches the expected
705      #  (observed - predicted) ** 2.
706      # Note that this affects only evaluation; the training loss is unaffected.
707      loss = self.loss_op(
708          self._scale_back_data(targets),
709          {"mean": self._scale_back_data(prediction_ops["mean"])})
710    else:
711      loss = self.loss_op(targets, prediction_ops)
712
713    # Scale back the prediction.
714    prediction = self._scale_back_data(prediction)
715    covariance = self._scale_back_variance(covariance)
716
717    return model.ModelOutputs(
718        loss=loss,
719        end_state=(times[:, -self.input_window_size:],
720                   values[:, -self.input_window_size:, :],
721                   exogenous_regressors[:, -self.input_window_size:, :]),
722        predictions={"mean": prediction, "covariance": covariance,
723                     "observed": original_values[:, -self.output_window_size:]},
724        prediction_times=times[:, -self.output_window_size:])
725
726  def get_batch_loss(self, features, mode, state):
727    """Computes predictions and a loss.
728
729    Args:
730      features: A dictionary (such as is produced by a chunker) with the
731        following key/value pairs (shapes are given as required for training):
732          TrainEvalFeatures.TIMES: A [batch size, self.window_size] integer
733            Tensor with times for each observation. To train on longer
734            sequences, the data should first be chunked.
735          TrainEvalFeatures.VALUES: A [batch size, self.window_size,
736            self.num_features] Tensor with values for each observation.
737        When evaluating, `TIMES` and `VALUES` must have a window size of at
738        least self.window_size, but it may be longer, in which case the last
739        window_size - self.input_window_size times (or fewer if this is not
740        divisible by self.output_window_size) will be evaluated on with
741        non-overlapping output windows (and will have associated
742        predictions). This is primarily to support qualitative
743        evaluation/plotting, and is not a recommended way to compute evaluation
744        losses (since there is no overlap in the output windows, which for
745        window-based models is an undesirable bias).
746      mode: The tf.estimator.ModeKeys mode to use (TRAIN or EVAL).
747      state: Unused
748    Returns:
749      A model.ModelOutputs object.
750    Raises:
751      ValueError: If `mode` is not TRAIN or EVAL, or if static shape information
752      is incorrect.
753    """
754    features = {feature_name: ops.convert_to_tensor(feature_value)
755                for feature_name, feature_value in features.items()}
756    times = features[TrainEvalFeatures.TIMES]
757    exogenous_regressors = self._process_exogenous_features(
758        times=times,
759        features={key: value for key, value in features.items()
760                  if key not in [TrainEvalFeatures.TIMES,
761                                 TrainEvalFeatures.VALUES,
762                                 PredictionFeatures.STATE_TUPLE]})
763    if mode == estimator_lib.ModeKeys.TRAIN:
764      # For training, we require the window size to be self.window_size as
765      # iterating sequentially on larger windows could introduce a bias.
766      return self._process_window(
767          features, mode=mode, exogenous_regressors=exogenous_regressors)
768    elif mode == estimator_lib.ModeKeys.EVAL:
769      # For evaluation, we allow the user to pass in a larger window, in which
770      # case we try to cover as much of the window as possible without
771      # overlap. Quantitative evaluation is more efficient/correct with fixed
772      # windows matching self.window_size (as with training), but this looping
773      # allows easy plotting of "in-sample" predictions.
774      times.get_shape().assert_has_rank(2)
775      static_window_size = times.get_shape().dims[1].value
776      if (static_window_size is not None
777          and static_window_size < self.window_size):
778        raise ValueError(
779            ("ARModel requires a window of at least input_window_size + "
780             "output_window_size to evaluate on (input_window_size={}, "
781             "output_window_size={}, and got shape {} for feature '{}' (batch "
782             "size, window size)).").format(
783                 self.input_window_size, self.output_window_size,
784                 times.get_shape(), TrainEvalFeatures.TIMES))
785      num_iterations = ((array_ops.shape(times)[1] -  self.input_window_size)
786                        // self.output_window_size)
787      output_size = num_iterations * self.output_window_size
788      # Rather than dealing with overlapping windows of output, discard a bit at
789      # the beginning if output windows don't cover evenly.
790      crop_length = output_size + self.input_window_size
791      features = {feature_name: feature_value[:, -crop_length:]
792                  for feature_name, feature_value in features.items()}
793      # Note that, unlike the ARModel's predict() while_loop and the
794      # SequentialTimeSeriesModel while_loop, each iteration here can run in
795      # parallel, since we are not feeding predictions or state from previous
796      # iterations.
797      def _while_condition(iteration_number, loss_ta, mean_ta, covariance_ta):
798        del loss_ta, mean_ta, covariance_ta  # unused
799        return iteration_number < num_iterations
800
801      def _while_body(iteration_number, loss_ta, mean_ta, covariance_ta):
802        """Perform a processing step on a single window of data."""
803        base_offset = iteration_number * self.output_window_size
804        model_outputs = self._process_window(
805            features={
806                feature_name:
807                feature_value[:, base_offset:base_offset + self.window_size]
808                for feature_name, feature_value in features.items()},
809            mode=mode,
810            exogenous_regressors=exogenous_regressors[
811                :, base_offset:base_offset + self.window_size])
812        # This code needs to be updated if new predictions are added in
813        # self._process_window
814        assert len(model_outputs.predictions) == 3
815        assert "mean" in model_outputs.predictions
816        assert "covariance" in model_outputs.predictions
817        assert "observed" in model_outputs.predictions
818        return (iteration_number + 1,
819                loss_ta.write(
820                    iteration_number, model_outputs.loss),
821                mean_ta.write(
822                    iteration_number, model_outputs.predictions["mean"]),
823                covariance_ta.write(
824                    iteration_number, model_outputs.predictions["covariance"]))
825      _, loss_ta, mean_ta, covariance_ta = control_flow_ops.while_loop(
826          _while_condition, _while_body,
827          [0,
828           tensor_array_ops.TensorArray(dtype=self.dtype, size=num_iterations),
829           tensor_array_ops.TensorArray(dtype=self.dtype, size=num_iterations),
830           tensor_array_ops.TensorArray(dtype=self.dtype, size=num_iterations)])
831      values = math_ops.cast(features[TrainEvalFeatures.VALUES],
832                             dtype=self.dtype)
833      batch_size = array_ops.shape(times)[0]
834      prediction_shape = [batch_size, self.output_window_size * num_iterations,
835                          self.num_features]
836      (previous_state_times,
837       previous_state_values,
838       previous_state_exogenous_regressors) = state
839      # Make sure returned state always has windows of self.input_window_size,
840      # even if we were passed fewer than self.input_window_size points this
841      # time.
842      if self.input_window_size > 0:
843        new_state_times = array_ops.concat(
844            [previous_state_times,
845             math_ops.cast(times, dtype=dtypes.int64)],
846            axis=1)[:, -self.input_window_size:]
847        new_state_times.set_shape((None, self.input_window_size))
848        new_state_values = array_ops.concat(
849            [previous_state_values,
850             self._scale_data(values)], axis=1)[:, -self.input_window_size:, :]
851        new_state_values.set_shape((None, self.input_window_size,
852                                    self.num_features))
853        new_exogenous_regressors = array_ops.concat(
854            [previous_state_exogenous_regressors,
855             exogenous_regressors], axis=1)[:, -self.input_window_size:, :]
856        new_exogenous_regressors.set_shape(
857            (None,
858             self.input_window_size,
859             self.exogenous_size))
860      else:
861        # There is no state to keep, and the strided slices above do not handle
862        # input_window_size=0.
863        new_state_times = previous_state_times
864        new_state_values = previous_state_values
865        new_exogenous_regressors = previous_state_exogenous_regressors
866      return model.ModelOutputs(
867          loss=math_ops.reduce_mean(loss_ta.stack(), axis=0),
868          end_state=(new_state_times,
869                     new_state_values,
870                     new_exogenous_regressors),
871          predictions={
872              "mean": array_ops.reshape(
873                  array_ops.transpose(mean_ta.stack(), [1, 0, 2, 3]),
874                  prediction_shape),
875              "covariance": array_ops.reshape(
876                  array_ops.transpose(covariance_ta.stack(), [1, 0, 2, 3]),
877                  prediction_shape),
878              "observed": values[:, -output_size:]},
879          prediction_times=times[:, -output_size:])
880    else:
881      raise ValueError(
882          "Unknown mode '{}' passed to get_batch_loss.".format(mode))
883
884  def _compute_time_features(self, time):
885    """Compute some features on the time value."""
886    batch_size = array_ops.shape(time)[0]
887    num_periods = len(self._periodicities)
888    # Reshape to 3D.
889    periods = constant_op.constant(
890        self._periodicities, shape=[1, 1, num_periods, 1], dtype=time.dtype)
891    time = array_ops.reshape(time, [batch_size, -1, 1, 1])
892    window_offset = time / self._periodicities
893    # Cast to appropriate type and scale to [0, 1) range
894    mod = (math_ops.cast(time % periods, self.dtype) * self._buckets /
895           math_ops.cast(periods, self.dtype))
896    # Bucketize based on some fixed width intervals. For a value t and interval
897    # [a, b), we return (t - a) if a <= t < b, else 0.
898    intervals = array_ops.reshape(
899        math_ops.range(self._buckets, dtype=self.dtype),
900        [1, 1, 1, self._buckets])
901    mod = nn_ops.relu(mod - intervals)
902    mod = array_ops.where(mod < 1.0, mod, array_ops.zeros_like(mod))
903    return window_offset, mod
904
905
906class AnomalyMixtureARModel(ARModel):
907  """Model data as a mixture of normal and anomaly distributions.
908
909  Note that this model works by changing the loss function to reduce the penalty
910  when predicting an anomalous target. However the predictions are still based
911  on anomalous input features, and this may affect the quality of fit. One
912  possible solution is to downweight/filter anomalous inputs, but that requires
913  more sequential processing instead of completely random windows.
914  """
915
916  GAUSSIAN_ANOMALY = "gaussian"
917  CAUCHY_ANOMALY = "cauchy"
918
919  def __init__(self,
920               periodicities,
921               anomaly_prior_probability,
922               input_window_size,
923               output_window_size,
924               num_features,
925               prediction_model_factory=FlatPredictionModel,
926               anomaly_distribution=GAUSSIAN_ANOMALY,
927               num_time_buckets=10,
928               exogenous_feature_columns=None):
929    assert (anomaly_prior_probability < 1.0 and
930            anomaly_prior_probability > 0.0)
931    self._anomaly_prior_probability = anomaly_prior_probability
932    assert anomaly_distribution in [
933        AnomalyMixtureARModel.GAUSSIAN_ANOMALY,
934        AnomalyMixtureARModel.CAUCHY_ANOMALY]
935    self._anomaly_distribution = anomaly_distribution
936    super(AnomalyMixtureARModel, self).__init__(
937        periodicities=periodicities,
938        num_features=num_features,
939        num_time_buckets=num_time_buckets,
940        input_window_size=input_window_size,
941        output_window_size=output_window_size,
942        loss=ARModel.NORMAL_LIKELIHOOD_LOSS,
943        prediction_model_factory=prediction_model_factory,
944        exogenous_feature_columns=exogenous_feature_columns)
945
946  def _create_anomaly_ops(self, times, values, prediction_ops_dict):
947    anomaly_log_param = variable_scope.get_variable(
948        "anomaly_log_param",
949        shape=[],
950        dtype=self.dtype,
951        initializer=init_ops.zeros_initializer())
952    # Anomaly param is the variance for Gaussian and scale for Cauchy
953    # distribution.
954    prediction_ops_dict["anomaly_params"] = gen_math_ops.exp(anomaly_log_param)
955
956  def prediction_ops(self, times, values, exogenous_regressors):
957    prediction_ops_dict = super(AnomalyMixtureARModel, self).prediction_ops(
958        times, values, exogenous_regressors)
959    self._create_anomaly_ops(times, values, prediction_ops_dict)
960    return prediction_ops_dict
961
962  def _anomaly_log_prob(self, targets, prediction_ops):
963    prediction = prediction_ops["mean"]
964    if self._anomaly_distribution == AnomalyMixtureARModel.GAUSSIAN_ANOMALY:
965      anomaly_variance = prediction_ops["anomaly_params"]
966      anomaly_sigma = math_ops.sqrt(
967          gen_math_ops.maximum(anomaly_variance, 1e-5))
968      log_prob = math_utils.normal_log_prob(targets, anomaly_sigma, prediction)
969    else:
970      assert self._anomaly_distribution == AnomalyMixtureARModel.CAUCHY_ANOMALY
971      anomaly_scale = prediction_ops["anomaly_params"]
972      log_prob = math_utils.cauchy_log_prob(targets, anomaly_scale, prediction)
973    return log_prob
974
975  def loss_op(self, targets, prediction_ops):
976    """Create loss_op."""
977    prediction = prediction_ops["mean"]
978    covariance = prediction_ops["covariance"]
979    # Normal data log probability.
980    sigma = math_ops.sqrt(gen_math_ops.maximum(covariance, 1e-5))
981    log_prob1 = math_utils.normal_log_prob(targets, sigma, prediction)
982    log_prob1 += math_ops.log(1 - self._anomaly_prior_probability)
983    # Anomaly log probability.
984    log_prob2 = self._anomaly_log_prob(targets, prediction_ops)
985    log_prob2 += math_ops.log(self._anomaly_prior_probability)
986    # We need to compute log(exp(log_prob1) + exp(log_prob2). For numerical
987    # stability, we rewrite the expression as below.
988    p1 = gen_math_ops.minimum(log_prob1, log_prob2)
989    p2 = gen_math_ops.maximum(log_prob1, log_prob2)
990    mixed_log_prob = p2 + math_ops.log(1 + gen_math_ops.exp(p1 - p2))
991    loss_op = -math_ops.reduce_sum(mixed_log_prob)
992    loss_op /= math_ops.cast(
993        math_ops.reduce_prod(array_ops.shape(targets)), self.dtype)
994    return loss_op
995