15"""Filtering postprocessors for SequentialTimeSeriesModels."""
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
21import abc
23import six
25from tensorflow.contrib.timeseries.python.timeseries import math_utils
27from tensorflow.python.framework import dtypes
28from tensorflow.python.framework import ops
29from tensorflow.python.ops import array_ops
30from tensorflow.python.ops import check_ops
31from tensorflow.python.ops import math_ops
32from tensorflow.python.util import nest
36class FilteringStepPostprocessor(object):
37  """Base class for processors that are applied after each filter step."""
39  @abc.abstractmethod
40  def process_filtering_step(self, current_times, current_values,
41                             predicted_state, filtered_state, outputs):
42    """Extends/modifies a filtering step, altering state and loss.
44    Args:
45      current_times: A [batch size] integer Tensor of times.
46      current_values: A [batch size x num features] Tensor of values filtering
47          is being performed on.
48      predicted_state: A (possibly nested) list of Tensors indicating model
49          state which does not take `current_times` and `current_values` into
50          account.
51      filtered_state: Same structure as predicted_state, but updated to take
52          `current_times` and `current_values` into account.
53      outputs: A dictionary of outputs produced by model filtering
54          (SequentialTimeSeriesModel._process_filtering_step).
55    Returns: A tuple of (new_state, updated_outputs);
56      new_state: Updated state with the same structure as `filtered_state` and
57          `predicted_state`.
58      updated_outputs: The `outputs` dictionary, updated with any new outputs
59          from this filtering postprocessor.
60    """
61    pass
63  @abc.abstractproperty
64  def output_names(self):
65    return []
68def cauchy_alternative_to_gaussian(current_times, current_values, outputs):
69  """A Cauchy anomaly distribution, centered at a Gaussian prediction.
71  Performs an entropy-matching approximation of the scale parameters of
72  independent Cauchy distributions given the covariance matrix of a multivariate
73  Gaussian in outputs["covariance"], and centers the Cauchy distributions at
74  outputs["mean"]. This requires that the model that we are creating an
75  alternative/anomaly distribution for produces a mean and covariance.
77  Args:
78    current_times: A [batch size] Tensor of times, unused.
79    current_values: A [batch size x num features] Tensor of values to evaluate
80        the anomaly distribution at.
81    outputs: A dictionary of Tensors with keys "mean" and "covariance"
82        describing the Gaussian to construct an anomaly distribution from. The
83        value corresponding to "mean" has shape [batch size x num features], and
84        the value corresponding to "covariance" has shape [batch size x num
85        features x num features].
86  Returns:
87    A [batch size] Tensor of log likelihoods; the anomaly log PDF evaluated at
88    `current_values`.
89  """
90  del current_times  # unused
91  cauchy_scale = math_utils.entropy_matched_cauchy_scale(outputs["covariance"])
92  individual_log_pdfs = math_utils.cauchy_log_prob(
93      loc=outputs["mean"],
94      scale=cauchy_scale,
95      x=current_values)
96  return math_ops.reduce_sum(individual_log_pdfs, axis=1)
99def _interpolate_state_linear(first_state, second_state, first_responsibility):
100  """Interpolate between two model states linearly."""
101  interpolated_state_flat = []
102  for first_state_tensor, second_state_tensor in zip(
103      nest.flatten(first_state), nest.flatten(second_state)):
104    assert first_state_tensor.dtype == second_state_tensor.dtype
105    if first_state_tensor.dtype.is_floating:
106      # Pad the responsibility shape with ones up to the state's rank so that it
107      # broadcasts
108      first_responsibility_padded = array_ops.reshape(
109          tensor=first_responsibility,
110          shape=array_ops.concat([
111              array_ops.shape(first_responsibility), array_ops.ones(
112                  [array_ops.rank(first_state_tensor) - 1], dtype=dtypes.int32)
113          ], 0))
114      interpolated_state = (
115          first_responsibility_padded * first_state_tensor
116          + (1. - first_responsibility_padded) * second_state_tensor)
117      interpolated_state.set_shape(first_state_tensor.get_shape())
118      interpolated_state_flat.append(interpolated_state)
119    else:
120      # Integer dtypes are probably representing times, and don't need
121      # interpolation. Make sure they're identical to be sure.
122      with ops.control_dependencies(
123          [check_ops.assert_equal(first_state_tensor, second_state_tensor)]):
124        interpolated_state_flat.append(array_ops.identity(first_state_tensor))
125  return nest.pack_sequence_as(first_state, interpolated_state_flat)
128class StateInterpolatingAnomalyDetector(FilteringStepPostprocessor):
129  """An anomaly detector which guards model state against outliers.
131  Smoothly interpolates between a model's predicted and inferred states, based
132  on the posterior probability of an anomaly, p(anomaly | data). This is useful
133  if anomalies would otherwise lead to model state which is hard to recover
134  from (Gaussian state space models suffer from this, for example).
136  Relies on (1) an alternative distribution, typically with heavier tails than
137  the model's normal predictions, and (2) a prior probability of an anomaly. The
138  prior probability acts as a penalty, discouraging the system from marking too
139  many points as anomalies. The alternative distribution indicates the
140  probability of a datapoint given that it is an anomaly, and is a heavy-tailed
141  distribution (Cauchy) centered around the model's predictions by default.
143  Specifically, we have:
145    p(anomaly | data) = p(data | anomaly) * anomaly_prior_probability
146        / (p(data | not anomaly) * (1 - anomaly_prior_probability)
147           + p(data | anomaly) * anomaly_prior_probability)
149  This is simply Bayes' theorem, where p(data | anomaly) is the
150  alternative/anomaly distribution, p(data | not anomaly) is the model's
151  predicted distribution, and anomaly_prior_probability is the prior probability
152  of an anomaly occurring (user-specified, defaulting to 1%).
154  Rather than computing p(anomaly | data) directly, we use the odds ratio:
156    odds_ratio = p(data | anomaly) * anomaly_prior_probability
157        / (p(data | not anomaly) * (1 - anomaly_prior_probability))
159  This has the same information as p(anomaly | data):
161    odds_ratio = p(anomaly | data) / p(not anomaly | data)
163  A "responsibility" score is computed for the model based on the log odds
164  ratio, and state interpolated based on this responsibility:
166    model_responsibility = 1 / (1 + exp(-responsibility_scaling
167                                        * ln(odds_ratio)))
168    model_state = filtered_model_state * model_responsibility
169                  + predicted_model_state * (1 - model_responsibility)
170    loss = model_responsibility
171             * ln(p(data | not anomaly) * (1 - anomaly_prior_probability))
172           + (1 - model_responsibility)
173             * ln(p(data | anomaly) * anomaly_prior_probability)
175  """
177  output_names = ["anomaly_score"]
179  def __init__(self,
180               anomaly_log_likelihood=cauchy_alternative_to_gaussian,
181               anomaly_prior_probability=0.01,
182               responsibility_scaling=1.0):
183    """Configure the anomaly detector.
185    Args:
186      anomaly_log_likelihood: A function taking `current_times`,
187          `current_values`, and `outputs` (same as the corresponding arguments
188          to process_filtering_step) and returning a [batch size] Tensor of log
189          likelihoods under an anomaly distribution.
190      anomaly_prior_probability: A scalar value, between 0 and 1, indicating the
191          prior probability of a particular example being an anomaly.
192      responsibility_scaling: A positive scalar controlling how fast
193          interpolation transitions between not-anomaly and anomaly; lower
194          values (closer to 0) create a smoother/slower transition.
195    """
196    self._anomaly_log_likelihood = anomaly_log_likelihood
197    self._responsibility_scaling = responsibility_scaling
198    self._anomaly_prior_probability = anomaly_prior_probability
200  def process_filtering_step(self, current_times, current_values,
201                             predicted_state, filtered_state, outputs):
202    """Fall back on `predicted_state` for anomalies.
204    Args:
205      current_times: A [batch size] integer Tensor of times.
206      current_values: A [batch size x num features] Tensor of values filtering
207          is being performed on.
208      predicted_state: A (possibly nested) list of Tensors indicating model
209          state which does not take `current_times` and `current_values` into
210          account.
211      filtered_state: Same structure as predicted_state, but updated to take
212          `current_times` and `current_values` into account.
213      outputs: A dictionary of outputs produced by model filtering. Must
214          include `log_likelihood`, a [batch size] Tensor indicating the log
215          likelihood of the observations under the model's predictions.
216    Returns:
217      A tuple of (new_state, updated_outputs);
218        new_state: Updated state with the same structure as `filtered_state` and
219            `predicted_state`; predicted_state for anomalies and filtered_state
220            otherwise (per batch element).
221        updated_outputs: The `outputs` dictionary, updated with a new "loss"
222            (the interpolated negative log likelihoods under the model and
223            anomaly distributions) and "anomaly_score" (the log odds ratio of
224            each part of the batch being an anomaly).
225    """
226    anomaly_log_likelihood = self._anomaly_log_likelihood(
227        current_times=current_times,
228        current_values=current_values,
229        outputs=outputs)
230    anomaly_prior_probability = ops.convert_to_tensor(
231        self._anomaly_prior_probability, dtype=current_values.dtype)
232    # p(data | anomaly) * p(anomaly)
233    data_and_anomaly_log_probability = (
234        anomaly_log_likelihood + math_ops.log(anomaly_prior_probability))
235    # p(data | no anomaly) * p(no anomaly)
236    data_and_no_anomaly_log_probability = (
237        outputs["log_likelihood"] + math_ops.log(1. - anomaly_prior_probability)
238    )
239    # A log odds ratio is slightly nicer here than computing p(anomaly | data),
240    # since it is centered around zero
241    anomaly_log_odds_ratio = (
242        data_and_anomaly_log_probability
243        - data_and_no_anomaly_log_probability)
244    model_responsibility = math_ops.sigmoid(-self._responsibility_scaling *
245                                            anomaly_log_odds_ratio)
246    # Do a linear interpolation between predicted and inferred model state
247    # based on the model's "responsibility". If we knew for sure whether
248    # this was an anomaly or not (binary responsibility), this would be the
249    # correct thing to do, but given that we don't it's just a
250    # (differentiable) heuristic.
251    interpolated_state = _interpolate_state_linear(
252        first_state=filtered_state,
253        second_state=predicted_state,
254        first_responsibility=model_responsibility)
255    # TODO(allenl): Try different responsibility scalings and interpolation
256    # methods (e.g. average in probability space rather than log space).
257    interpolated_log_likelihood = (
258        model_responsibility * data_and_no_anomaly_log_probability
259        + (1. - model_responsibility) * data_and_anomaly_log_probability)
260    outputs["loss"] = -interpolated_log_likelihood
261    outputs["anomaly_score"] = anomaly_log_odds_ratio
262    return (interpolated_state, outputs)