1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Experimental API for controlling threading in `tf.data` pipelines."""
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20import threading
21
22from tensorflow.python.data.ops import dataset_ops
23from tensorflow.python.eager import context
24from tensorflow.python.ops import gen_experimental_dataset_ops as ged_ops
25from tensorflow.python.ops import resource_variable_ops
26
27_uid_counter = 0
28_uid_lock = threading.Lock()
29
30
31def _generate_shared_name(prefix):
32  with _uid_lock:
33    global _uid_counter
34    uid = _uid_counter
35    _uid_counter += 1
36  return "{}{}".format(prefix, uid)
37
38
39# TODO(b/73383364): Properly export in the `tf.data.experimental` API when
40# stable or make private / remove.
41class PrivateThreadPool(object):
42  """A stateful resource that represents a private thread pool."""
43
44  def __init__(self, num_threads, display_name=None,
45               max_intra_op_parallelism=1):
46    """Creates a `PrivateThreadPool` with the given number of threads."""
47    if context.executing_eagerly():
48      shared_name = _generate_shared_name("privatethreadpool")
49      self._resource = ged_ops.experimental_thread_pool_handle(
50          num_threads=num_threads,
51          max_intra_op_parallelism=max_intra_op_parallelism,
52          display_name=display_name,
53          shared_name=shared_name)
54      self._resource_deleter = resource_variable_ops.EagerResourceDeleter(
55          handle=self._resource, handle_device=context.context().device_name)
56    else:
57      self._resource = ged_ops.experimental_thread_pool_handle(
58          num_threads=num_threads,
59          max_intra_op_parallelism=max_intra_op_parallelism,
60          display_name=display_name)
61
62
63class _ThreadPoolDataset(dataset_ops.UnaryUnchangedStructureDataset):
64  """A `Dataset` that acts as an identity, and sets a custom threadpool."""
65
66  def __init__(self, input_dataset, thread_pool):
67    self._input_dataset = input_dataset
68    self._thread_pool = thread_pool
69    variant_tensor = ged_ops.experimental_thread_pool_dataset(
70        self._input_dataset._variant_tensor,  # pylint: disable=protected-access
71        self._thread_pool._resource,  # pylint: disable=protected-access
72        **dataset_ops.flat_structure(self))
73    super(_ThreadPoolDataset, self).__init__(input_dataset, variant_tensor)
74
75
76# TODO(b/73383364): Properly export in the `tf.data.experimental` API when
77# stable or make private / remove.
78def override_threadpool(dataset, thread_pool):
79  """Returns a new dataset that uses the given thread pool for its operations.
80
81  Args:
82    dataset: A `tf.data.Dataset` object.
83    thread_pool: A `PrivateThreadPool` object.
84
85  Returns:
86    A dataset containing the same values as `dataset`, but which uses
87    `thread_pool` to compute any of its parallel operations (such as
88    `tf.data.Dataset.map`).
89  """
90  return _ThreadPoolDataset(dataset, thread_pool)
91