1# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Keras timeseries dataset utilities."""
16# pylint: disable=g-classes-have-attributes
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import numpy as np
22
23from tensorflow.python.data.ops import dataset_ops
24from tensorflow.python.ops import array_ops
25from tensorflow.python.ops import math_ops
26from tensorflow.python.util.tf_export import keras_export
27
28
29@keras_export('keras.preprocessing.timeseries_dataset_from_array', v1=[])
30def timeseries_dataset_from_array(
31    data,
32    targets,
33    sequence_length,
34    sequence_stride=1,
35    sampling_rate=1,
36    batch_size=128,
37    shuffle=False,
38    seed=None,
39    start_index=None,
40    end_index=None):
41  """Creates a dataset of sliding windows over a timeseries provided as array.
42
43  This function takes in a sequence of data-points gathered at
44  equal intervals, along with time series parameters such as
45  length of the sequences/windows, spacing between two sequence/windows, etc.,
46  to produce batches of timeseries inputs and targets.
47
48  Args:
49    data: Numpy array or eager tensor
50      containing consecutive data points (timesteps).
51      Axis 0 is expected to be the time dimension.
52    targets: Targets corresponding to timesteps in `data`.
53      `targets[i]` should be the target
54      corresponding to the window that starts at index `i`
55      (see example 2 below).
56      Pass None if you don't have target data (in this case the dataset will
57      only yield the input data).
58    sequence_length: Length of the output sequences (in number of timesteps).
59    sequence_stride: Period between successive output sequences.
60      For stride `s`, output samples would
61      start at index `data[i]`, `data[i + s]`, `data[i + 2 * s]`, etc.
62    sampling_rate: Period between successive individual timesteps
63      within sequences. For rate `r`, timesteps
64      `data[i], data[i + r], ... data[i + sequence_length]`
65      are used for create a sample sequence.
66    batch_size: Number of timeseries samples in each batch
67      (except maybe the last one).
68    shuffle: Whether to shuffle output samples,
69      or instead draw them in chronological order.
70    seed: Optional int; random seed for shuffling.
71    start_index: Optional int; data points earlier (exclusive)
72      than `start_index` will not be used
73      in the output sequences. This is useful to reserve part of the
74      data for test or validation.
75    end_index: Optional int; data points later (exclusive) than `end_index`
76      will not be used in the output sequences.
77      This is useful to reserve part of the data for test or validation.
78
79  Returns:
80    A tf.data.Dataset instance. If `targets` was passed, the dataset yields
81    tuple `(batch_of_sequences, batch_of_targets)`. If not, the dataset yields
82    only `batch_of_sequences`.
83
84  Example 1:
85    Consider indices `[0, 1, ... 99]`.
86    With `sequence_length=10,  sampling_rate=2, sequence_stride=3`,
87    `shuffle=False`, the dataset will yield batches of sequences
88    composed of the following indices:
89
90    ```
91    First sequence:  [0  2  4  6  8 10 12 14 16 18]
92    Second sequence: [3  5  7  9 11 13 15 17 19 21]
93    Third sequence:  [6  8 10 12 14 16 18 20 22 24]
94    ...
95    Last sequence:   [78 80 82 84 86 88 90 92 94 96]
96    ```
97
98    In this case the last 3 data points are discarded since no full sequence
99    can be generated to include them (the next sequence would have started
100    at index 81, and thus its last step would have gone over 99).
101
102  Example 2: temporal regression.
103    Consider an array `data` of scalar values, of shape `(steps,)`.
104    To generate a dataset that uses the past 10
105    timesteps to predict the next timestep, you would use:
106
107    ```python
108    input_data = data[:-10]
109    targets = data[10:]
110    dataset = tf.keras.preprocessing.timeseries_dataset_from_array(
111        input_data, targets, sequence_length=10)
112    for batch in dataset:
113      inputs, targets = batch
114      assert np.array_equal(inputs[0], data[:10])  # First sequence: steps [0-9]
115      assert np.array_equal(targets[0], data[10])  # Corresponding target: step 10
116      break
117    ```
118
119  Example 3: temporal regression for many-to-many architectures.
120    Consider two arrays of scalar values `X` and `Y`,
121    both of shape `(100,)`. The resulting dataset should consist samples with
122    20 timestamps each. The samples should not overlap.
123    To generate a dataset that uses the current timestamp
124    to predict the corresponding target timestep, you would use:
125
126    ```python
127    X = np.arange(100)
128    Y = X*2
129
130    sample_length = 20
131    input_dataset = tf.keras.preprocessing.timeseries_dataset_from_array(
132      X, None, sequence_length=sample_length, sequence_stride=sample_length)
133    target_dataset = tf.keras.preprocessing.timeseries_dataset_from_array(
134      Y, None, sequence_length=sample_length, sequence_stride=sample_length)
135
136    for batch in zip(input_dataset, target_dataset):
137      inputs, targets = batch
138      assert np.array_equal(inputs[0], X[:sample_length])
139
140      # second sample equals output timestamps 20-40
141      assert np.array_equal(targets[1], Y[sample_length:2*sample_length])
142      break
143    ```
144  """
145  if start_index and (start_index < 0 or start_index >= len(data)):
146    raise ValueError('start_index must be higher than 0 and lower than the '
147                     'length of the data. Got: start_index=%s '
148                     'for data of length %s.' % (start_index, len(data)))
149  if end_index:
150    if start_index and end_index <= start_index:
151      raise ValueError('end_index must be higher than start_index. Got: '
152                       'start_index=%s, end_index=%s.' %
153                       (start_index, end_index))
154    if end_index >= len(data):
155      raise ValueError('end_index must be lower than the length of the data. '
156                       'Got: end_index=%s' % (end_index,))
157    if end_index <= 0:
158      raise ValueError('end_index must be higher than 0. '
159                       'Got: end_index=%s' % (end_index,))
160
161  # Validate strides
162  if sampling_rate <= 0 or sampling_rate >= len(data):
163    raise ValueError(
164        'sampling_rate must be higher than 0 and lower than '
165        'the length of the data. Got: '
166        'sampling_rate=%s for data of length %s.' % (sampling_rate, len(data)))
167  if sequence_stride <= 0 or sequence_stride >= len(data):
168    raise ValueError(
169        'sequence_stride must be higher than 0 and lower than '
170        'the length of the data. Got: sequence_stride=%s '
171        'for data of length %s.' % (sequence_stride, len(data)))
172
173  if start_index is None:
174    start_index = 0
175  if end_index is None:
176    end_index = len(data)
177
178  # Determine the lowest dtype to store start positions (to lower memory usage).
179  num_seqs = end_index - start_index - (sequence_length * sampling_rate) + 1
180  if targets is not None:
181    num_seqs = min(num_seqs, len(targets))
182  if num_seqs < 2147483647:
183    index_dtype = 'int32'
184  else:
185    index_dtype = 'int64'
186
187  # Generate start positions
188  start_positions = np.arange(0, num_seqs, sequence_stride, dtype=index_dtype)
189  if shuffle:
190    if seed is None:
191      seed = np.random.randint(1e6)
192    rng = np.random.RandomState(seed)
193    rng.shuffle(start_positions)
194
195  sequence_length = math_ops.cast(sequence_length, dtype=index_dtype)
196  sampling_rate = math_ops.cast(sampling_rate, dtype=index_dtype)
197
198  positions_ds = dataset_ops.Dataset.from_tensors(start_positions).repeat()
199
200  # For each initial window position, generates indices of the window elements
201  indices = dataset_ops.Dataset.zip(
202      (dataset_ops.Dataset.range(len(start_positions)), positions_ds)).map(
203          lambda i, positions: math_ops.range(  # pylint: disable=g-long-lambda
204              positions[i],
205              positions[i] + sequence_length * sampling_rate,
206              sampling_rate),
207          num_parallel_calls=dataset_ops.AUTOTUNE)
208
209  dataset = sequences_from_indices(data, indices, start_index, end_index)
210  if targets is not None:
211    indices = dataset_ops.Dataset.zip(
212        (dataset_ops.Dataset.range(len(start_positions)), positions_ds)).map(
213            lambda i, positions: positions[i],
214            num_parallel_calls=dataset_ops.AUTOTUNE)
215    target_ds = sequences_from_indices(
216        targets, indices, start_index, end_index)
217    dataset = dataset_ops.Dataset.zip((dataset, target_ds))
218  if shuffle:
219    # Shuffle locally at each iteration
220    dataset = dataset.shuffle(buffer_size=batch_size * 8, seed=seed)
221  dataset = dataset.batch(batch_size)
222  return dataset
223
224
225def sequences_from_indices(array, indices_ds, start_index, end_index):
226  dataset = dataset_ops.Dataset.from_tensors(array[start_index : end_index])
227  dataset = dataset_ops.Dataset.zip((dataset.repeat(), indices_ds)).map(
228      lambda steps, inds: array_ops.gather(steps, inds),  # pylint: disable=unnecessary-lambda
229      num_parallel_calls=dataset_ops.AUTOTUNE)
230  return dataset
231