1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Writes events to disk in a logdir."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import os.path
22import threading
23import time
24
25import six
26
27from tensorflow.core.util import event_pb2
28from tensorflow.python import pywrap_tensorflow
29from tensorflow.python.platform import gfile
30from tensorflow.python.util import compat
31
32
33class EventFileWriter(object):
34  """Writes `Event` protocol buffers to an event file.
35
36  The `EventFileWriter` class creates an event file in the specified directory,
37  and asynchronously writes Event protocol buffers to the file. The Event file
38  is encoded using the tfrecord format, which is similar to RecordIO.
39  """
40
41  def __init__(self, logdir, max_queue=10, flush_secs=120,
42               filename_suffix=None):
43    """Creates a `EventFileWriter` and an event file to write to.
44
45    On construction the summary writer creates a new event file in `logdir`.
46    This event file will contain `Event` protocol buffers, which are written to
47    disk via the add_event method.
48
49    The other arguments to the constructor control the asynchronous writes to
50    the event file:
51
52    *  `flush_secs`: How often, in seconds, to flush the added summaries
53       and events to disk.
54    *  `max_queue`: Maximum number of summaries or events pending to be
55       written to disk before one of the 'add' calls block.
56
57    Args:
58      logdir: A string. Directory where event file will be written.
59      max_queue: Integer. Size of the queue for pending events and summaries.
60      flush_secs: Number. How often, in seconds, to flush the
61        pending events and summaries to disk.
62      filename_suffix: A string. Every event file's name is suffixed with
63        `filename_suffix`.
64    """
65    self._logdir = str(logdir)
66    if not gfile.IsDirectory(self._logdir):
67      gfile.MakeDirs(self._logdir)
68    self._event_queue = six.moves.queue.Queue(max_queue)
69    self._ev_writer = pywrap_tensorflow.EventsWriter(
70        compat.as_bytes(os.path.join(self._logdir, "events")))
71    self._flush_secs = flush_secs
72    self._sentinel_event = self._get_sentinel_event()
73    if filename_suffix:
74      self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix))
75    self._closed = False
76    self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
77                                      self._flush_secs, self._sentinel_event)
78
79    self._worker.start()
80
81  def _get_sentinel_event(self):
82    """Generate a sentinel event for terminating worker."""
83    return event_pb2.Event()
84
85  def get_logdir(self):
86    """Returns the directory where event file will be written."""
87    return self._logdir
88
89  def reopen(self):
90    """Reopens the EventFileWriter.
91
92    Can be called after `close()` to add more events in the same directory.
93    The events will go into a new events file.
94
95    Does nothing if the EventFileWriter was not closed.
96    """
97    if self._closed:
98      self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
99                                        self._flush_secs, self._sentinel_event)
100      self._worker.start()
101      self._closed = False
102
103  def add_event(self, event):
104    """Adds an event to the event file.
105
106    Args:
107      event: An `Event` protocol buffer.
108    """
109    if not self._closed:
110      self._event_queue.put(event)
111
112  def flush(self):
113    """Flushes the event file to disk.
114
115    Call this method to make sure that all pending events have been written to
116    disk.
117    """
118    self._event_queue.join()
119    self._ev_writer.Flush()
120
121  def close(self):
122    """Flushes the event file to disk and close the file.
123
124    Call this method when you do not need the summary writer anymore.
125    """
126    self.add_event(self._sentinel_event)
127    self.flush()
128    self._worker.join()
129    self._ev_writer.Close()
130    self._closed = True
131
132
133class _EventLoggerThread(threading.Thread):
134  """Thread that logs events."""
135
136  def __init__(self, queue, ev_writer, flush_secs, sentinel_event):
137    """Creates an _EventLoggerThread.
138
139    Args:
140      queue: A Queue from which to dequeue events.
141      ev_writer: An event writer. Used to log brain events for
142       the visualizer.
143      flush_secs: How often, in seconds, to flush the
144        pending file to disk.
145      sentinel_event: A sentinel element in queue that tells this thread to
146        terminate.
147    """
148    threading.Thread.__init__(self)
149    self.daemon = True
150    self._queue = queue
151    self._ev_writer = ev_writer
152    self._flush_secs = flush_secs
153    # The first event will be flushed immediately.
154    self._next_event_flush_time = 0
155    self._sentinel_event = sentinel_event
156
157  def run(self):
158    while True:
159      event = self._queue.get()
160      if event is self._sentinel_event:
161        self._queue.task_done()
162        break
163      try:
164        self._ev_writer.WriteEvent(event)
165        # Flush the event writer every so often.
166        now = time.time()
167        if now > self._next_event_flush_time:
168          self._ev_writer.Flush()
169          # Do it again in two minutes.
170          self._next_event_flush_time = now + self._flush_secs
171      finally:
172        self._queue.task_done()
173