1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Writes events to disk in a logdir.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import os.path 22import threading 23import time 24 25import six 26 27from tensorflow.core.util import event_pb2 28from tensorflow.python import pywrap_tensorflow 29from tensorflow.python.platform import gfile 30from tensorflow.python.util import compat 31 32 33class EventFileWriter(object): 34 """Writes `Event` protocol buffers to an event file. 35 36 The `EventFileWriter` class creates an event file in the specified directory, 37 and asynchronously writes Event protocol buffers to the file. The Event file 38 is encoded using the tfrecord format, which is similar to RecordIO. 39 """ 40 41 def __init__(self, logdir, max_queue=10, flush_secs=120, 42 filename_suffix=None): 43 """Creates a `EventFileWriter` and an event file to write to. 44 45 On construction the summary writer creates a new event file in `logdir`. 46 This event file will contain `Event` protocol buffers, which are written to 47 disk via the add_event method. 48 49 The other arguments to the constructor control the asynchronous writes to 50 the event file: 51 52 * `flush_secs`: How often, in seconds, to flush the added summaries 53 and events to disk. 54 * `max_queue`: Maximum number of summaries or events pending to be 55 written to disk before one of the 'add' calls block. 56 57 Args: 58 logdir: A string. Directory where event file will be written. 59 max_queue: Integer. Size of the queue for pending events and summaries. 60 flush_secs: Number. How often, in seconds, to flush the 61 pending events and summaries to disk. 62 filename_suffix: A string. Every event file's name is suffixed with 63 `filename_suffix`. 64 """ 65 self._logdir = str(logdir) 66 if not gfile.IsDirectory(self._logdir): 67 gfile.MakeDirs(self._logdir) 68 self._event_queue = six.moves.queue.Queue(max_queue) 69 self._ev_writer = pywrap_tensorflow.EventsWriter( 70 compat.as_bytes(os.path.join(self._logdir, "events"))) 71 self._flush_secs = flush_secs 72 self._sentinel_event = self._get_sentinel_event() 73 if filename_suffix: 74 self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix)) 75 self._closed = False 76 self._worker = _EventLoggerThread(self._event_queue, self._ev_writer, 77 self._flush_secs, self._sentinel_event) 78 79 self._worker.start() 80 81 def _get_sentinel_event(self): 82 """Generate a sentinel event for terminating worker.""" 83 return event_pb2.Event() 84 85 def get_logdir(self): 86 """Returns the directory where event file will be written.""" 87 return self._logdir 88 89 def reopen(self): 90 """Reopens the EventFileWriter. 91 92 Can be called after `close()` to add more events in the same directory. 93 The events will go into a new events file. 94 95 Does nothing if the EventFileWriter was not closed. 96 """ 97 if self._closed: 98 self._worker = _EventLoggerThread(self._event_queue, self._ev_writer, 99 self._flush_secs, self._sentinel_event) 100 self._worker.start() 101 self._closed = False 102 103 def add_event(self, event): 104 """Adds an event to the event file. 105 106 Args: 107 event: An `Event` protocol buffer. 108 """ 109 if not self._closed: 110 self._event_queue.put(event) 111 112 def flush(self): 113 """Flushes the event file to disk. 114 115 Call this method to make sure that all pending events have been written to 116 disk. 117 """ 118 self._event_queue.join() 119 self._ev_writer.Flush() 120 121 def close(self): 122 """Flushes the event file to disk and close the file. 123 124 Call this method when you do not need the summary writer anymore. 125 """ 126 self.add_event(self._sentinel_event) 127 self.flush() 128 self._worker.join() 129 self._ev_writer.Close() 130 self._closed = True 131 132 133class _EventLoggerThread(threading.Thread): 134 """Thread that logs events.""" 135 136 def __init__(self, queue, ev_writer, flush_secs, sentinel_event): 137 """Creates an _EventLoggerThread. 138 139 Args: 140 queue: A Queue from which to dequeue events. 141 ev_writer: An event writer. Used to log brain events for 142 the visualizer. 143 flush_secs: How often, in seconds, to flush the 144 pending file to disk. 145 sentinel_event: A sentinel element in queue that tells this thread to 146 terminate. 147 """ 148 threading.Thread.__init__(self) 149 self.daemon = True 150 self._queue = queue 151 self._ev_writer = ev_writer 152 self._flush_secs = flush_secs 153 # The first event will be flushed immediately. 154 self._next_event_flush_time = 0 155 self._sentinel_event = sentinel_event 156 157 def run(self): 158 while True: 159 event = self._queue.get() 160 if event is self._sentinel_event: 161 self._queue.task_done() 162 break 163 try: 164 self._ev_writer.WriteEvent(event) 165 # Flush the event writer every so often. 166 now = time.time() 167 if now > self._next_event_flush_time: 168 self._ev_writer.Flush() 169 # Do it again in two minutes. 170 self._next_event_flush_time = now + self._flush_secs 171 finally: 172 self._queue.task_done() 173