1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Experimental API for controlling threading in `tf.data` pipelines.""" 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20import threading 21 22from tensorflow.python.data.ops import dataset_ops 23from tensorflow.python.eager import context 24from tensorflow.python.ops import gen_experimental_dataset_ops as ged_ops 25from tensorflow.python.ops import resource_variable_ops 26 27_uid_counter = 0 28_uid_lock = threading.Lock() 29 30 31def _generate_shared_name(prefix): 32 with _uid_lock: 33 global _uid_counter 34 uid = _uid_counter 35 _uid_counter += 1 36 return "{}{}".format(prefix, uid) 37 38 39# TODO(b/73383364): Properly export in the `tf.data.experimental` API when 40# stable or make private / remove. 41class PrivateThreadPool(object): 42 """A stateful resource that represents a private thread pool.""" 43 44 def __init__(self, num_threads, display_name=None, 45 max_intra_op_parallelism=1): 46 """Creates a `PrivateThreadPool` with the given number of threads.""" 47 if context.executing_eagerly(): 48 shared_name = _generate_shared_name("privatethreadpool") 49 self._resource = ged_ops.experimental_thread_pool_handle( 50 num_threads=num_threads, 51 max_intra_op_parallelism=max_intra_op_parallelism, 52 display_name=display_name, 53 shared_name=shared_name) 54 self._resource_deleter = resource_variable_ops.EagerResourceDeleter( 55 handle=self._resource, handle_device=context.context().device_name) 56 else: 57 self._resource = ged_ops.experimental_thread_pool_handle( 58 num_threads=num_threads, 59 max_intra_op_parallelism=max_intra_op_parallelism, 60 display_name=display_name) 61 62 63class _ThreadPoolDataset(dataset_ops.UnaryUnchangedStructureDataset): 64 """A `Dataset` that acts as an identity, and sets a custom threadpool.""" 65 66 def __init__(self, input_dataset, thread_pool): 67 self._input_dataset = input_dataset 68 self._thread_pool = thread_pool 69 variant_tensor = ged_ops.experimental_thread_pool_dataset( 70 self._input_dataset._variant_tensor, # pylint: disable=protected-access 71 self._thread_pool._resource, # pylint: disable=protected-access 72 **dataset_ops.flat_structure(self)) 73 super(_ThreadPoolDataset, self).__init__(input_dataset, variant_tensor) 74 75 76# TODO(b/73383364): Properly export in the `tf.data.experimental` API when 77# stable or make private / remove. 78def override_threadpool(dataset, thread_pool): 79 """Returns a new dataset that uses the given thread pool for its operations. 80 81 Args: 82 dataset: A `tf.data.Dataset` object. 83 thread_pool: A `PrivateThreadPool` object. 84 85 Returns: 86 A dataset containing the same values as `dataset`, but which uses 87 `thread_pool` to compute any of its parallel operations (such as 88 `tf.data.Dataset.map`). 89 """ 90 return _ThreadPoolDataset(dataset, thread_pool) 91