1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Methods to read data in the graph (deprecated).
16
17This module and all its submodules are deprecated. See
18[contrib/learn/README.md](https://www.tensorflow.org/code/tensorflow/contrib/learn/README.md)
19for migration instructions.
20"""
21
22from __future__ import absolute_import
23from __future__ import division
24from __future__ import print_function
25
26from tensorflow.contrib.input_pipeline.python.ops import input_pipeline_ops
27from tensorflow.python.framework import constant_op
28from tensorflow.python.framework import dtypes
29from tensorflow.python.framework import errors
30from tensorflow.python.framework import ops
31from tensorflow.python.framework import sparse_tensor
32from tensorflow.python.layers import utils
33from tensorflow.python.ops import array_ops
34from tensorflow.python.ops import data_flow_ops
35from tensorflow.python.ops import io_ops
36from tensorflow.python.ops import math_ops
37from tensorflow.python.ops import parsing_ops
38from tensorflow.python.platform import gfile
39from tensorflow.python.summary import summary
40from tensorflow.python.training import input as input_ops
41from tensorflow.python.training import queue_runner
42from tensorflow.python.util.deprecation import deprecated
43
44# Default name for key in the feature dict.
45KEY_FEATURE_NAME = '__key__'
46
47
48@deprecated(None, 'Use tf.data.')
49def read_batch_examples(file_pattern,
50                        batch_size,
51                        reader,
52                        randomize_input=True,
53                        num_epochs=None,
54                        queue_capacity=10000,
55                        num_threads=1,
56                        read_batch_size=1,
57                        parse_fn=None,
58                        name=None,
59                        seed=None):
60  """Adds operations to read, queue, batch `Example` protos.
61
62  Given file pattern (or list of files), will setup a queue for file names,
63  read `Example` proto using provided `reader`, use batch queue to create
64  batches of examples of size `batch_size`.
65
66  All queue runners are added to the queue runners collection, and may be
67  started via `start_queue_runners`.
68
69  All ops are added to the default graph.
70
71  Use `parse_fn` if you need to do parsing / processing on single examples.
72
73  Args:
74    file_pattern: List of files or patterns of file paths containing
75        `Example` records. See `tf.gfile.Glob` for pattern rules.
76    batch_size: An int or scalar `Tensor` specifying the batch size to use.
77    reader: A function or class that returns an object with
78      `read` method, (filename tensor) -> (example tensor).
79    randomize_input: Whether the input should be randomized.
80    num_epochs: Integer specifying the number of times to read through the
81      dataset. If `None`, cycles through the dataset forever.
82      NOTE - If specified, creates a variable that must be initialized, so call
83      `tf.local_variables_initializer()` and run the op in a session.
84    queue_capacity: Capacity for input queue.
85    num_threads: The number of threads enqueuing examples. In order to have
86      predictable and repeatable order of reading and enqueueing, such as in
87      prediction and evaluation mode, `num_threads` should be 1.
88    read_batch_size: An int or scalar `Tensor` specifying the number of
89      records to read at once.
90    parse_fn: Parsing function, takes `Example` Tensor returns parsed
91      representation. If `None`, no parsing is done.
92    name: Name of resulting op.
93    seed: An integer (optional). Seed used if randomize_input == True.
94
95  Returns:
96    String `Tensor` of batched `Example` proto.
97
98  Raises:
99    ValueError: for invalid inputs.
100  """
101  _, examples = read_keyed_batch_examples(
102      file_pattern=file_pattern,
103      batch_size=batch_size,
104      reader=reader,
105      randomize_input=randomize_input,
106      num_epochs=num_epochs,
107      queue_capacity=queue_capacity,
108      num_threads=num_threads,
109      read_batch_size=read_batch_size,
110      parse_fn=parse_fn,
111      name=name,
112      seed=seed)
113  return examples
114
115
116@deprecated(None, 'Use tf.data.')
117def read_keyed_batch_examples(file_pattern,
118                              batch_size,
119                              reader,
120                              randomize_input=True,
121                              num_epochs=None,
122                              queue_capacity=10000,
123                              num_threads=1,
124                              read_batch_size=1,
125                              parse_fn=None,
126                              name=None,
127                              seed=None):
128  """Adds operations to read, queue, batch `Example` protos.
129
130  Given file pattern (or list of files), will setup a queue for file names,
131  read `Example` proto using provided `reader`, use batch queue to create
132  batches of examples of size `batch_size`.
133
134  All queue runners are added to the queue runners collection, and may be
135  started via `start_queue_runners`.
136
137  All ops are added to the default graph.
138
139  Use `parse_fn` if you need to do parsing / processing on single examples.
140
141  Args:
142    file_pattern: List of files or patterns of file paths containing
143        `Example` records. See `tf.gfile.Glob` for pattern rules.
144    batch_size: An int or scalar `Tensor` specifying the batch size to use.
145    reader: A function or class that returns an object with
146      `read` method, (filename tensor) -> (example tensor).
147    randomize_input: Whether the input should be randomized.
148    num_epochs: Integer specifying the number of times to read through the
149      dataset. If `None`, cycles through the dataset forever.
150      NOTE - If specified, creates a variable that must be initialized, so call
151      `tf.local_variables_initializer()` and run the op in a session.
152    queue_capacity: Capacity for input queue.
153    num_threads: The number of threads enqueuing examples. In order to have
154      predictable and repeatable order of reading and enqueueing, such as in
155      prediction and evaluation mode, `num_threads` should be 1.
156    read_batch_size: An int or scalar `Tensor` specifying the number of
157      records to read at once.
158    parse_fn: Parsing function, takes `Example` Tensor returns parsed
159      representation. If `None`, no parsing is done.
160    name: Name of resulting op.
161    seed: An integer (optional). Seed used if randomize_input == True.
162
163  Returns:
164    Returns tuple of:
165    - `Tensor` of string keys.
166    - String `Tensor` of batched `Example` proto.
167
168  Raises:
169    ValueError: for invalid inputs.
170  """
171  return _read_keyed_batch_examples_helper(
172      file_pattern,
173      batch_size,
174      reader,
175      randomize_input=randomize_input,
176      num_epochs=num_epochs,
177      queue_capacity=queue_capacity,
178      num_threads=num_threads,
179      read_batch_size=read_batch_size,
180      parse_fn=parse_fn,
181      setup_shared_queue=False,
182      name=name,
183      seed=seed)
184
185
186@deprecated(None, 'Use tf.data.')
187def read_keyed_batch_examples_shared_queue(file_pattern,
188                                           batch_size,
189                                           reader,
190                                           randomize_input=True,
191                                           num_epochs=None,
192                                           queue_capacity=10000,
193                                           num_threads=1,
194                                           read_batch_size=1,
195                                           parse_fn=None,
196                                           name=None,
197                                           seed=None):
198  """Adds operations to read, queue, batch `Example` protos.
199
200  Given file pattern (or list of files), will setup a shared queue for file
201  names, setup a worker queue that pulls from the shared queue, read `Example`
202  protos using provided `reader`, use batch queue to create batches of examples
203  of size `batch_size`. This provides at most once visit guarantees. Note that
204  this only works if the parameter servers are not pre-empted or restarted or
205  the session is not restored from a checkpoint since the state of a queue
206  is not checkpointed and we will end up restarting from the entire list of
207  files.
208
209  All queue runners are added to the queue runners collection, and may be
210  started via `start_queue_runners`.
211
212  All ops are added to the default graph.
213
214  Use `parse_fn` if you need to do parsing / processing on single examples.
215
216  Args:
217    file_pattern: List of files or patterns of file paths containing
218        `Example` records. See `tf.gfile.Glob` for pattern rules.
219    batch_size: An int or scalar `Tensor` specifying the batch size to use.
220    reader: A function or class that returns an object with
221      `read` method, (filename tensor) -> (example tensor).
222    randomize_input: Whether the input should be randomized.
223    num_epochs: Integer specifying the number of times to read through the
224      dataset. If `None`, cycles through the dataset forever.
225      NOTE - If specified, creates a variable that must be initialized, so call
226      `tf.local_variables_initializer()` and run the op in a session.
227    queue_capacity: Capacity for input queue.
228    num_threads: The number of threads enqueuing examples.
229    read_batch_size: An int or scalar `Tensor` specifying the number of
230      records to read at once.
231    parse_fn: Parsing function, takes `Example` Tensor returns parsed
232      representation. If `None`, no parsing is done.
233    name: Name of resulting op.
234    seed: An integer (optional). Seed used if randomize_input == True.
235
236  Returns:
237    Returns tuple of:
238    - `Tensor` of string keys.
239    - String `Tensor` of batched `Example` proto.
240
241  Raises:
242    ValueError: for invalid inputs.
243  """
244  return _read_keyed_batch_examples_helper(
245      file_pattern,
246      batch_size,
247      reader,
248      randomize_input=randomize_input,
249      num_epochs=num_epochs,
250      queue_capacity=queue_capacity,
251      num_threads=num_threads,
252      read_batch_size=read_batch_size,
253      parse_fn=parse_fn,
254      setup_shared_queue=True,
255      name=name,
256      seed=seed)
257
258
259def _get_file_names(file_pattern, randomize_input):
260  """Parse list of file names from pattern, optionally shuffled.
261
262  Args:
263    file_pattern: File glob pattern, or list of glob patterns.
264    randomize_input: Whether to shuffle the order of file names.
265
266  Returns:
267    List of file names matching `file_pattern`.
268
269  Raises:
270    ValueError: If `file_pattern` is empty, or pattern matches no files.
271  """
272  if isinstance(file_pattern, list):
273    if not file_pattern:
274      raise ValueError('No files given to dequeue_examples.')
275    file_names = []
276    for entry in file_pattern:
277      file_names.extend(gfile.Glob(entry))
278  else:
279    file_names = list(gfile.Glob(file_pattern))
280
281  if not file_names:
282    raise ValueError('No files match %s.' % file_pattern)
283
284  # Sort files so it will be deterministic for unit tests. They'll be shuffled
285  # in `string_input_producer` if `randomize_input` is enabled.
286  if not randomize_input:
287    file_names = sorted(file_names)
288  return file_names
289
290
291def _get_examples(file_name_queue, reader, num_threads, read_batch_size,
292                  filter_fn, parse_fn):
293  """Get example filenames matching.
294
295  Args:
296    file_name_queue: A queue implementation that dequeues elements in
297      first-in first-out order.
298    reader: A function or class that returns an object with
299      `read` method, (filename tensor) -> (example tensor).
300    num_threads: The number of threads enqueuing examples.
301    read_batch_size: An int or scalar `Tensor` specifying the number of
302      records to read at once.
303    filter_fn: Filtering function, takes both keys as well as an `Example`
304      Tensors and returns a boolean mask of the same shape as the input Tensors
305      to be applied for filtering. If `None`, no filtering is done.
306    parse_fn: Parsing function, takes `Example` Tensor returns parsed
307      representation. If `None`, no parsing is done.
308
309  Returns:
310    List of example file names matching `file_name_queue`.
311  """
312  with ops.name_scope('read'):
313    example_list = []
314    for _ in range(num_threads):
315      keys, examples_proto = utils.smart_cond(
316          read_batch_size > 1,
317          lambda: reader().read_up_to(file_name_queue, read_batch_size),
318          lambda: reader().read(file_name_queue))
319
320      if filter_fn:
321        mask = filter_fn(keys, examples_proto)
322        keys = array_ops.boolean_mask(keys, mask)
323        examples_proto = array_ops.boolean_mask(examples_proto, mask)
324      if parse_fn:
325        parsed_examples = parse_fn(examples_proto)
326        # Map keys into example map because batch_join doesn't support
327        # tuple of Tensor + dict.
328        if isinstance(parsed_examples, dict):
329          parsed_examples[KEY_FEATURE_NAME] = keys
330          example_list.append(parsed_examples)
331        else:
332          example_list.append((keys, parsed_examples))
333      else:
334        example_list.append((keys, examples_proto))
335    return example_list
336
337
338def _read_keyed_batch_examples_helper(file_pattern,
339                                      batch_size,
340                                      reader,
341                                      randomize_input=True,
342                                      num_epochs=None,
343                                      queue_capacity=10000,
344                                      num_threads=1,
345                                      read_batch_size=1,
346                                      filter_fn=None,
347                                      parse_fn=None,
348                                      setup_shared_queue=False,
349                                      name=None,
350                                      seed=None):
351  """Adds operations to read, queue, batch `Example` protos.
352
353  Args:
354    file_pattern: List of files or patterns of file paths containing
355        `Example` records. See `tf.gfile.Glob` for pattern rules.
356    batch_size: An int or scalar `Tensor` specifying the batch size to use.
357    reader: A function or class that returns an object with
358      `read` method, (filename tensor) -> (example tensor).
359    randomize_input: Whether the input should be randomized.
360    num_epochs: Integer specifying the number of times to read through the
361      dataset. If `None`, cycles through the dataset forever.
362      NOTE - If specified, creates a variable that must be initialized, so call
363      `tf.local_variables_initializer()` and run the op in a session.
364    queue_capacity: Capacity for input queue.
365    num_threads: The number of threads enqueuing examples.
366    read_batch_size: An int or scalar `Tensor` specifying the number of
367      records to read at once.
368    filter_fn: Filtering function, takes both keys as well `Example` Tensors
369      and returns a boolean mask of the same shape as the input Tensors to
370      be applied for filtering. If `None`, no filtering is done.
371    parse_fn: Parsing function, takes `Example` Tensor returns parsed
372      representation. If `None`, no parsing is done.
373    setup_shared_queue: Whether to set up a shared queue for file names.
374    name: Name of resulting op.
375    seed: An integer (optional). Seed used if randomize_input == True.
376
377  Returns:
378    Returns tuple of:
379    - `Tensor` of string keys.
380    - String `Tensor` of batched `Example` proto.
381
382  Raises:
383    ValueError: for invalid inputs.
384  """
385  # Retrieve files to read.
386  file_names = _get_file_names(file_pattern, randomize_input)
387
388  # Check input parameters are given and reasonable.
389  if (not queue_capacity) or (queue_capacity <= 0):
390    raise ValueError('Invalid queue_capacity %s.' % queue_capacity)
391  if (batch_size is None) or (
392      (not isinstance(batch_size, ops.Tensor)) and
393      (batch_size <= 0 or batch_size >= queue_capacity)):
394    raise ValueError('Invalid batch_size %s, with queue_capacity %s.' %
395                     (batch_size, queue_capacity))
396  if (read_batch_size is None) or (
397      (not isinstance(read_batch_size, ops.Tensor)) and (read_batch_size <= 0)):
398    raise ValueError('Invalid read_batch_size %s.' % read_batch_size)
399  if (not num_threads) or (num_threads <= 0):
400    raise ValueError('Invalid num_threads %s.' % num_threads)
401  if (num_epochs is not None) and (num_epochs <= 0):
402    raise ValueError('Invalid num_epochs %s.' % num_epochs)
403
404  with ops.name_scope(name, 'read_batch_examples', [file_pattern]) as scope:
405    with ops.name_scope('file_name_queue') as file_name_queue_scope:
406      if setup_shared_queue:
407        file_name_queue = data_flow_ops.FIFOQueue(
408            capacity=1, dtypes=[dtypes.string], shapes=[[]])
409        enqueue_op = file_name_queue.enqueue(
410            input_pipeline_ops.seek_next(
411                file_names,
412                shuffle=randomize_input,
413                num_epochs=num_epochs,
414                seed=seed))
415        queue_runner.add_queue_runner(
416            queue_runner.QueueRunner(file_name_queue, [enqueue_op]))
417      else:
418        file_name_queue = input_ops.string_input_producer(
419            constant_op.constant(file_names, name='input'),
420            shuffle=randomize_input,
421            num_epochs=num_epochs,
422            name=file_name_queue_scope,
423            seed=seed)
424
425    example_list = _get_examples(file_name_queue, reader, num_threads,
426                                 read_batch_size, filter_fn, parse_fn)
427
428    enqueue_many = read_batch_size > 1
429
430    if num_epochs is None:
431      allow_smaller_final_batch = False
432    else:
433      allow_smaller_final_batch = True
434
435    # Setup batching queue given list of read example tensors.
436    if randomize_input:
437      if isinstance(batch_size, ops.Tensor):
438        min_after_dequeue = int(queue_capacity * 0.4)
439      else:
440        min_after_dequeue = max(queue_capacity - (3 * batch_size), batch_size)
441      queued_examples_with_keys = input_ops.shuffle_batch_join(
442          example_list,
443          batch_size,
444          capacity=queue_capacity,
445          min_after_dequeue=min_after_dequeue,
446          enqueue_many=enqueue_many,
447          name=scope,
448          allow_smaller_final_batch=allow_smaller_final_batch,
449          seed=seed)
450    else:
451      queued_examples_with_keys = input_ops.batch_join(
452          example_list,
453          batch_size,
454          capacity=queue_capacity,
455          enqueue_many=enqueue_many,
456          name=scope,
457          allow_smaller_final_batch=allow_smaller_final_batch)
458    if parse_fn and isinstance(queued_examples_with_keys, dict):
459      queued_keys = queued_examples_with_keys.pop(KEY_FEATURE_NAME)
460      return queued_keys, queued_examples_with_keys
461    return queued_examples_with_keys
462
463
464@deprecated(None, 'Use tf.data.')
465def read_keyed_batch_features(file_pattern,
466                              batch_size,
467                              features,
468                              reader,
469                              randomize_input=True,
470                              num_epochs=None,
471                              queue_capacity=10000,
472                              reader_num_threads=1,
473                              feature_queue_capacity=100,
474                              num_enqueue_threads=2,
475                              parse_fn=None,
476                              name=None,
477                              read_batch_size=None):
478  """Adds operations to read, queue, batch and parse `Example` protos.
479
480  Given file pattern (or list of files), will setup a queue for file names,
481  read `Example` proto using provided `reader`, use batch queue to create
482  batches of examples of size `batch_size` and parse example given `features`
483  specification.
484
485  All queue runners are added to the queue runners collection, and may be
486  started via `start_queue_runners`.
487
488  All ops are added to the default graph.
489
490  Args:
491    file_pattern: List of files or patterns of file paths containing
492        `Example` records. See `tf.gfile.Glob` for pattern rules.
493    batch_size: An int or scalar `Tensor` specifying the batch size to use.
494    features: A `dict` mapping feature keys to `FixedLenFeature` or
495      `VarLenFeature` values.
496    reader: A function or class that returns an object with
497      `read` method, (filename tensor) -> (example tensor).
498    randomize_input: Whether the input should be randomized.
499    num_epochs: Integer specifying the number of times to read through the
500      dataset. If None, cycles through the dataset forever. NOTE - If specified,
501      creates a variable that must be initialized, so call
502      tf.local_variables_initializer() and run the op in a session.
503    queue_capacity: Capacity for input queue.
504    reader_num_threads: The number of threads to read examples. In order to have
505      predictable and repeatable order of reading and enqueueing, such as in
506      prediction and evaluation mode, `reader_num_threads` should be 1.
507    feature_queue_capacity: Capacity of the parsed features queue.
508    num_enqueue_threads: Number of threads to enqueue the parsed example queue.
509      Using multiple threads to enqueue the parsed example queue helps maintain
510      a full queue when the subsequent computations overall are cheaper than
511      parsing. In order to have predictable and repeatable order of reading and
512      enqueueing, such as in prediction and evaluation mode,
513      `num_enqueue_threads` should be 1.
514    parse_fn: Parsing function, takes `Example` Tensor returns parsed
515      representation. If `None`, no parsing is done.
516    name: Name of resulting op.
517    read_batch_size: An int or scalar `Tensor` specifying the number of
518      records to read at once. If `None`, defaults to `batch_size`.
519
520  Returns:
521    Returns tuple of:
522    - `Tensor` of string keys.
523    - A dict of `Tensor` or `SparseTensor` objects for each in `features`.
524
525  Raises:
526    ValueError: for invalid inputs.
527  """
528
529  with ops.name_scope(name, 'read_batch_features', [file_pattern]) as scope:
530    if read_batch_size is None:
531      read_batch_size = batch_size
532    keys, examples = read_keyed_batch_examples(
533        file_pattern,
534        batch_size,
535        reader,
536        randomize_input=randomize_input,
537        num_epochs=num_epochs,
538        queue_capacity=queue_capacity,
539        num_threads=reader_num_threads,
540        read_batch_size=read_batch_size,
541        parse_fn=parse_fn,
542        name=scope)
543    # Parse the example.
544    feature_map = parsing_ops.parse_example(examples, features)
545    return queue_parsed_features(
546        feature_map,
547        keys=keys,
548        feature_queue_capacity=feature_queue_capacity,
549        num_enqueue_threads=num_enqueue_threads,
550        name=scope)
551
552
553@deprecated(None, 'Use tf.data.')
554def read_keyed_batch_features_shared_queue(file_pattern,
555                                           batch_size,
556                                           features,
557                                           reader,
558                                           randomize_input=True,
559                                           num_epochs=None,
560                                           queue_capacity=10000,
561                                           reader_num_threads=1,
562                                           feature_queue_capacity=100,
563                                           num_queue_runners=2,
564                                           parse_fn=None,
565                                           name=None):
566  """Adds operations to read, queue, batch and parse `Example` protos.
567
568  Given file pattern (or list of files), will setup a shared queue for file
569  names, setup a worker queue that gets filenames from the shared queue,
570  read `Example` proto using provided `reader`, use batch queue to create
571  batches of examples of size `batch_size` and parse example given `features`
572  specification.
573
574  All queue runners are added to the queue runners collection, and may be
575  started via `start_queue_runners`.
576
577  All ops are added to the default graph.
578
579  Args:
580    file_pattern: List of files or patterns of file paths containing
581        `Example` records. See `tf.gfile.Glob` for pattern rules.
582    batch_size: An int or scalar `Tensor` specifying the batch size to use.
583    features: A `dict` mapping feature keys to `FixedLenFeature` or
584      `VarLenFeature` values.
585    reader: A function or class that returns an object with
586      `read` method, (filename tensor) -> (example tensor).
587    randomize_input: Whether the input should be randomized.
588    num_epochs: Integer specifying the number of times to read through the
589      dataset. If None, cycles through the dataset forever. NOTE - If specified,
590      creates a variable that must be initialized, so call
591      tf.local_variables_initializer() and run the op in a session.
592    queue_capacity: Capacity for input queue.
593    reader_num_threads: The number of threads to read examples.
594    feature_queue_capacity: Capacity of the parsed features queue.
595    num_queue_runners: Number of threads to enqueue the parsed example queue.
596      Using multiple threads to enqueue the parsed example queue helps maintain
597      a full queue when the subsequent computations overall are cheaper than
598      parsing.
599    parse_fn: Parsing function, takes `Example` Tensor returns parsed
600      representation. If `None`, no parsing is done.
601    name: Name of resulting op.
602
603  Returns:
604    Returns tuple of:
605    - `Tensor` of string keys.
606    - A dict of `Tensor` or `SparseTensor` objects for each in `features`.
607
608  Raises:
609    ValueError: for invalid inputs.
610  """
611
612  with ops.name_scope(name, 'read_batch_features', [file_pattern]) as scope:
613    keys, examples = read_keyed_batch_examples_shared_queue(
614        file_pattern,
615        batch_size,
616        reader,
617        randomize_input=randomize_input,
618        num_epochs=num_epochs,
619        queue_capacity=queue_capacity,
620        num_threads=reader_num_threads,
621        read_batch_size=batch_size,
622        parse_fn=parse_fn,
623        name=scope)
624    # Parse the example.
625    feature_map = parsing_ops.parse_example(examples, features)
626    return queue_parsed_features(
627        feature_map,
628        keys=keys,
629        feature_queue_capacity=feature_queue_capacity,
630        num_enqueue_threads=num_queue_runners,
631        name=scope)
632
633
634@deprecated(None, 'Use tf.data.')
635def queue_parsed_features(parsed_features,
636                          keys=None,
637                          feature_queue_capacity=100,
638                          num_enqueue_threads=2,
639                          name=None):
640  """Speeds up parsing by using queues to do it asynchronously.
641
642  This function adds the tensors in `parsed_features` to a queue, which allows
643  the parsing (or any other expensive op before this) to be asynchronous wrt the
644  rest of the training graph. This greatly improves read latency and speeds up
645  training since the data will already be parsed and ready when each step of
646  training needs it.
647
648  All queue runners are added to the queue runners collection, and may be
649  started via `start_queue_runners`.
650
651  All ops are added to the default graph.
652
653  Args:
654    parsed_features: A dict of string key to `Tensor` or `SparseTensor` objects.
655    keys: `Tensor` of string keys.
656    feature_queue_capacity: Capacity of the parsed features queue.
657    num_enqueue_threads: Number of threads to enqueue the parsed example queue.
658      Using multiple threads to enqueue the parsed example queue helps maintain
659      a full queue when the subsequent computations overall are cheaper than
660      parsing. In order to have predictable and repeatable order of reading and
661      enqueueing, such as in prediction and evaluation mode,
662      `num_enqueue_threads` should be 1.
663    name: Name of resulting op.
664
665  Returns:
666    Returns tuple of:
667    - `Tensor` corresponding to `keys` if provided, otherwise `None`.
668    -  A dict of string key to `Tensor` or `SparseTensor` objects corresponding
669       to `parsed_features`.
670  Raises:
671    ValueError: for invalid inputs.
672  """
673
674  args = list(parsed_features.values())
675  if keys is not None:
676    args += [keys]
677
678  with ops.name_scope(name, 'queue_parsed_features', args):
679    # Lets also add preprocessed tensors into the queue types for each item of
680    # the queue.
681    tensors_to_enqueue = []
682    # Each entry contains the key, and a boolean which indicates whether the
683    # tensor was a sparse tensor.
684    tensors_mapping = []
685    # TODO(sibyl-Aix6ihai): Most of the functionality here is about pushing sparse
686    # tensors into a queue. This could be taken care in somewhere else so others
687    # can reuse it. Also, QueueBase maybe extended to handle sparse tensors
688    # directly.
689    for key in sorted(parsed_features.keys()):
690      tensor = parsed_features[key]
691      if isinstance(tensor, sparse_tensor.SparseTensor):
692        tensors_mapping.append((key, True))
693        tensors_to_enqueue.extend(
694            [tensor.indices, tensor.values, tensor.dense_shape])
695      else:
696        tensors_mapping.append((key, False))
697        tensors_to_enqueue.append(tensor)
698
699    if keys is not None:
700      tensors_to_enqueue.append(keys)
701
702    queue_dtypes = [x.dtype for x in tensors_to_enqueue]
703    input_queue = data_flow_ops.FIFOQueue(feature_queue_capacity, queue_dtypes)
704
705    # Add a summary op to debug if our feature queue is full or not.
706    summary.scalar('queue/parsed_features/%s/fraction_of_%d_full' %
707                   (input_queue.name, feature_queue_capacity),
708                   math_ops.cast(input_queue.size(), dtypes.float32) *
709                   (1. / feature_queue_capacity))
710
711    # Use a single QueueRunner with multiple threads to enqueue so the queue is
712    # always full. The threads are coordinated so the last batch will not be
713    # lost.
714    enqueue_ops = [
715        input_queue.enqueue(tensors_to_enqueue)
716        for _ in range(num_enqueue_threads)
717    ]
718    queue_runner.add_queue_runner(
719        queue_runner.QueueRunner(
720            input_queue,
721            enqueue_ops,
722            queue_closed_exception_types=(errors.OutOfRangeError,
723                                          errors.CancelledError)))
724
725    dequeued_tensors = input_queue.dequeue()
726    if not isinstance(dequeued_tensors, list):
727      # input_queue.dequeue() returns a single tensor instead of a list of
728      # tensors if there is only one tensor to dequeue, which breaks the
729      # assumption of a list below.
730      dequeued_tensors = [dequeued_tensors]
731
732    # Reset shapes on dequeued tensors.
733    for i in range(len(tensors_to_enqueue)):
734      dequeued_tensors[i].set_shape(tensors_to_enqueue[i].get_shape())
735
736    # Recreate feature mapping according to the original dictionary.
737    dequeued_parsed_features = {}
738    index = 0
739    for key, is_sparse_tensor in tensors_mapping:
740      if is_sparse_tensor:
741        # Three tensors are (indices, values, shape).
742        dequeued_parsed_features[key] = sparse_tensor.SparseTensor(
743            dequeued_tensors[index], dequeued_tensors[index + 1],
744            dequeued_tensors[index + 2])
745        index += 3
746      else:
747        dequeued_parsed_features[key] = dequeued_tensors[index]
748        index += 1
749
750    dequeued_keys = None
751    if keys is not None:
752      dequeued_keys = dequeued_tensors[-1]
753
754    return dequeued_keys, dequeued_parsed_features
755
756
757@deprecated(None, 'Use tf.data.')
758def read_batch_features(file_pattern,
759                        batch_size,
760                        features,
761                        reader,
762                        randomize_input=True,
763                        num_epochs=None,
764                        queue_capacity=10000,
765                        feature_queue_capacity=100,
766                        reader_num_threads=1,
767                        num_enqueue_threads=2,
768                        parse_fn=None,
769                        name=None,
770                        read_batch_size=None):
771  """Adds operations to read, queue, batch and parse `Example` protos.
772
773  Given file pattern (or list of files), will setup a queue for file names,
774  read `Example` proto using provided `reader`, use batch queue to create
775  batches of examples of size `batch_size` and parse example given `features`
776  specification.
777
778  All queue runners are added to the queue runners collection, and may be
779  started via `start_queue_runners`.
780
781  All ops are added to the default graph.
782
783  Args:
784    file_pattern: List of files or patterns of file paths containing
785        `Example` records. See `tf.gfile.Glob` for pattern rules.
786    batch_size: An int or scalar `Tensor` specifying the batch size to use.
787    features: A `dict` mapping feature keys to `FixedLenFeature` or
788      `VarLenFeature` values.
789    reader: A function or class that returns an object with
790      `read` method, (filename tensor) -> (example tensor).
791    randomize_input: Whether the input should be randomized.
792    num_epochs: Integer specifying the number of times to read through the
793      dataset. If None, cycles through the dataset forever. NOTE - If specified,
794      creates a variable that must be initialized, so call
795      tf.local_variables_initializer() and run the op in a session.
796    queue_capacity: Capacity for input queue.
797    feature_queue_capacity: Capacity of the parsed features queue. Set this
798      value to a small number, for example 5 if the parsed features are large.
799    reader_num_threads: The number of threads to read examples. In order to have
800      predictable and repeatable order of reading and enqueueing, such as in
801      prediction and evaluation mode, `reader_num_threads` should be 1.
802    num_enqueue_threads: Number of threads to enqueue the parsed example queue.
803      Using multiple threads to enqueue the parsed example queue helps maintain
804      a full queue when the subsequent computations overall are cheaper than
805      parsing. In order to have predictable and repeatable order of reading and
806      enqueueing, such as in prediction and evaluation mode,
807      `num_enqueue_threads` should be 1.
808    parse_fn: Parsing function, takes `Example` Tensor returns parsed
809      representation. If `None`, no parsing is done.
810    name: Name of resulting op.
811    read_batch_size: An int or scalar `Tensor` specifying the number of
812      records to read at once. If `None`, defaults to `batch_size`.
813
814  Returns:
815    A dict of `Tensor` or `SparseTensor` objects for each in `features`.
816
817  Raises:
818    ValueError: for invalid inputs.
819  """
820  _, features = read_keyed_batch_features(
821      file_pattern,
822      batch_size,
823      features,
824      reader,
825      randomize_input=randomize_input,
826      num_epochs=num_epochs,
827      queue_capacity=queue_capacity,
828      reader_num_threads=reader_num_threads,
829      feature_queue_capacity=feature_queue_capacity,
830      num_enqueue_threads=num_enqueue_threads,
831      read_batch_size=read_batch_size,
832      parse_fn=parse_fn,
833      name=name)
834  return features
835
836
837@deprecated(None, 'Use tf.data.')
838def read_batch_record_features(file_pattern,
839                               batch_size,
840                               features,
841                               randomize_input=True,
842                               num_epochs=None,
843                               queue_capacity=10000,
844                               reader_num_threads=1,
845                               name='dequeue_record_examples'):
846  """Reads TFRecord, queues, batches and parses `Example` proto.
847
848  See more detailed description in `read_examples`.
849
850  Args:
851    file_pattern: List of files or patterns of file paths containing
852        `Example` records. See `tf.gfile.Glob` for pattern rules.
853    batch_size: An int or scalar `Tensor` specifying the batch size to use.
854    features: A `dict` mapping feature keys to `FixedLenFeature` or
855      `VarLenFeature` values.
856    randomize_input: Whether the input should be randomized.
857    num_epochs: Integer specifying the number of times to read through the
858      dataset. If None, cycles through the dataset forever. NOTE - If specified,
859      creates a variable that must be initialized, so call
860      tf.local_variables_initializer() and run the op in a session.
861    queue_capacity: Capacity for input queue.
862    reader_num_threads: The number of threads to read examples. In order to have
863      predictable and repeatable order of reading and enqueueing, such as in
864      prediction and evaluation mode, `reader_num_threads` should be 1.
865    name: Name of resulting op.
866
867  Returns:
868    A dict of `Tensor` or `SparseTensor` objects for each in `features`.
869
870  Raises:
871    ValueError: for invalid inputs.
872  """
873  return read_batch_features(
874      file_pattern=file_pattern,
875      batch_size=batch_size,
876      features=features,
877      reader=io_ops.TFRecordReader,
878      randomize_input=randomize_input,
879      num_epochs=num_epochs,
880      queue_capacity=queue_capacity,
881      reader_num_threads=reader_num_threads,
882      name=name)
883