1# Copyright 2020 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Keras timeseries dataset utilities.""" 16# pylint: disable=g-classes-have-attributes 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import numpy as np 22 23from tensorflow.python.data.ops import dataset_ops 24from tensorflow.python.ops import array_ops 25from tensorflow.python.ops import math_ops 26from tensorflow.python.util.tf_export import keras_export 27 28 29@keras_export('keras.preprocessing.timeseries_dataset_from_array', v1=[]) 30def timeseries_dataset_from_array( 31 data, 32 targets, 33 sequence_length, 34 sequence_stride=1, 35 sampling_rate=1, 36 batch_size=128, 37 shuffle=False, 38 seed=None, 39 start_index=None, 40 end_index=None): 41 """Creates a dataset of sliding windows over a timeseries provided as array. 42 43 This function takes in a sequence of data-points gathered at 44 equal intervals, along with time series parameters such as 45 length of the sequences/windows, spacing between two sequence/windows, etc., 46 to produce batches of timeseries inputs and targets. 47 48 Args: 49 data: Numpy array or eager tensor 50 containing consecutive data points (timesteps). 51 Axis 0 is expected to be the time dimension. 52 targets: Targets corresponding to timesteps in `data`. 53 `targets[i]` should be the target 54 corresponding to the window that starts at index `i` 55 (see example 2 below). 56 Pass None if you don't have target data (in this case the dataset will 57 only yield the input data). 58 sequence_length: Length of the output sequences (in number of timesteps). 59 sequence_stride: Period between successive output sequences. 60 For stride `s`, output samples would 61 start at index `data[i]`, `data[i + s]`, `data[i + 2 * s]`, etc. 62 sampling_rate: Period between successive individual timesteps 63 within sequences. For rate `r`, timesteps 64 `data[i], data[i + r], ... data[i + sequence_length]` 65 are used for create a sample sequence. 66 batch_size: Number of timeseries samples in each batch 67 (except maybe the last one). 68 shuffle: Whether to shuffle output samples, 69 or instead draw them in chronological order. 70 seed: Optional int; random seed for shuffling. 71 start_index: Optional int; data points earlier (exclusive) 72 than `start_index` will not be used 73 in the output sequences. This is useful to reserve part of the 74 data for test or validation. 75 end_index: Optional int; data points later (exclusive) than `end_index` 76 will not be used in the output sequences. 77 This is useful to reserve part of the data for test or validation. 78 79 Returns: 80 A tf.data.Dataset instance. If `targets` was passed, the dataset yields 81 tuple `(batch_of_sequences, batch_of_targets)`. If not, the dataset yields 82 only `batch_of_sequences`. 83 84 Example 1: 85 Consider indices `[0, 1, ... 99]`. 86 With `sequence_length=10, sampling_rate=2, sequence_stride=3`, 87 `shuffle=False`, the dataset will yield batches of sequences 88 composed of the following indices: 89 90 ``` 91 First sequence: [0 2 4 6 8 10 12 14 16 18] 92 Second sequence: [3 5 7 9 11 13 15 17 19 21] 93 Third sequence: [6 8 10 12 14 16 18 20 22 24] 94 ... 95 Last sequence: [78 80 82 84 86 88 90 92 94 96] 96 ``` 97 98 In this case the last 3 data points are discarded since no full sequence 99 can be generated to include them (the next sequence would have started 100 at index 81, and thus its last step would have gone over 99). 101 102 Example 2: temporal regression. 103 Consider an array `data` of scalar values, of shape `(steps,)`. 104 To generate a dataset that uses the past 10 105 timesteps to predict the next timestep, you would use: 106 107 ```python 108 input_data = data[:-10] 109 targets = data[10:] 110 dataset = tf.keras.preprocessing.timeseries_dataset_from_array( 111 input_data, targets, sequence_length=10) 112 for batch in dataset: 113 inputs, targets = batch 114 assert np.array_equal(inputs[0], data[:10]) # First sequence: steps [0-9] 115 assert np.array_equal(targets[0], data[10]) # Corresponding target: step 10 116 break 117 ``` 118 119 Example 3: temporal regression for many-to-many architectures. 120 Consider two arrays of scalar values `X` and `Y`, 121 both of shape `(100,)`. The resulting dataset should consist samples with 122 20 timestamps each. The samples should not overlap. 123 To generate a dataset that uses the current timestamp 124 to predict the corresponding target timestep, you would use: 125 126 ```python 127 X = np.arange(100) 128 Y = X*2 129 130 sample_length = 20 131 input_dataset = tf.keras.preprocessing.timeseries_dataset_from_array( 132 X, None, sequence_length=sample_length, sequence_stride=sample_length) 133 target_dataset = tf.keras.preprocessing.timeseries_dataset_from_array( 134 Y, None, sequence_length=sample_length, sequence_stride=sample_length) 135 136 for batch in zip(input_dataset, target_dataset): 137 inputs, targets = batch 138 assert np.array_equal(inputs[0], X[:sample_length]) 139 140 # second sample equals output timestamps 20-40 141 assert np.array_equal(targets[1], Y[sample_length:2*sample_length]) 142 break 143 ``` 144 """ 145 if start_index and (start_index < 0 or start_index >= len(data)): 146 raise ValueError('start_index must be higher than 0 and lower than the ' 147 'length of the data. Got: start_index=%s ' 148 'for data of length %s.' % (start_index, len(data))) 149 if end_index: 150 if start_index and end_index <= start_index: 151 raise ValueError('end_index must be higher than start_index. Got: ' 152 'start_index=%s, end_index=%s.' % 153 (start_index, end_index)) 154 if end_index >= len(data): 155 raise ValueError('end_index must be lower than the length of the data. ' 156 'Got: end_index=%s' % (end_index,)) 157 if end_index <= 0: 158 raise ValueError('end_index must be higher than 0. ' 159 'Got: end_index=%s' % (end_index,)) 160 161 # Validate strides 162 if sampling_rate <= 0 or sampling_rate >= len(data): 163 raise ValueError( 164 'sampling_rate must be higher than 0 and lower than ' 165 'the length of the data. Got: ' 166 'sampling_rate=%s for data of length %s.' % (sampling_rate, len(data))) 167 if sequence_stride <= 0 or sequence_stride >= len(data): 168 raise ValueError( 169 'sequence_stride must be higher than 0 and lower than ' 170 'the length of the data. Got: sequence_stride=%s ' 171 'for data of length %s.' % (sequence_stride, len(data))) 172 173 if start_index is None: 174 start_index = 0 175 if end_index is None: 176 end_index = len(data) 177 178 # Determine the lowest dtype to store start positions (to lower memory usage). 179 num_seqs = end_index - start_index - (sequence_length * sampling_rate) + 1 180 if targets is not None: 181 num_seqs = min(num_seqs, len(targets)) 182 if num_seqs < 2147483647: 183 index_dtype = 'int32' 184 else: 185 index_dtype = 'int64' 186 187 # Generate start positions 188 start_positions = np.arange(0, num_seqs, sequence_stride, dtype=index_dtype) 189 if shuffle: 190 if seed is None: 191 seed = np.random.randint(1e6) 192 rng = np.random.RandomState(seed) 193 rng.shuffle(start_positions) 194 195 sequence_length = math_ops.cast(sequence_length, dtype=index_dtype) 196 sampling_rate = math_ops.cast(sampling_rate, dtype=index_dtype) 197 198 positions_ds = dataset_ops.Dataset.from_tensors(start_positions).repeat() 199 200 # For each initial window position, generates indices of the window elements 201 indices = dataset_ops.Dataset.zip( 202 (dataset_ops.Dataset.range(len(start_positions)), positions_ds)).map( 203 lambda i, positions: math_ops.range( # pylint: disable=g-long-lambda 204 positions[i], 205 positions[i] + sequence_length * sampling_rate, 206 sampling_rate), 207 num_parallel_calls=dataset_ops.AUTOTUNE) 208 209 dataset = sequences_from_indices(data, indices, start_index, end_index) 210 if targets is not None: 211 indices = dataset_ops.Dataset.zip( 212 (dataset_ops.Dataset.range(len(start_positions)), positions_ds)).map( 213 lambda i, positions: positions[i], 214 num_parallel_calls=dataset_ops.AUTOTUNE) 215 target_ds = sequences_from_indices( 216 targets, indices, start_index, end_index) 217 dataset = dataset_ops.Dataset.zip((dataset, target_ds)) 218 if shuffle: 219 # Shuffle locally at each iteration 220 dataset = dataset.shuffle(buffer_size=batch_size * 8, seed=seed) 221 dataset = dataset.batch(batch_size) 222 return dataset 223 224 225def sequences_from_indices(array, indices_ds, start_index, end_index): 226 dataset = dataset_ops.Dataset.from_tensors(array[start_index : end_index]) 227 dataset = dataset_ops.Dataset.zip((dataset.repeat(), indices_ds)).map( 228 lambda steps, inds: array_ops.gather(steps, inds), # pylint: disable=unnecessary-lambda 229 num_parallel_calls=dataset_ops.AUTOTUNE) 230 return dataset 231