1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
18 
19 #include <functional>
20 #include <list>
21 #include <map>
22 #include <memory>
23 #include <string>
24 #include <vector>
25 #include "tensorflow/core/lib/core/status.h"
26 #include "tensorflow/core/lib/core/stringpiece.h"
27 #include "tensorflow/core/platform/cloud/file_block_cache.h"
28 #include "tensorflow/core/platform/env.h"
29 #include "tensorflow/core/platform/mutex.h"
30 #include "tensorflow/core/platform/notification.h"
31 #include "tensorflow/core/platform/thread_annotations.h"
32 #include "tensorflow/core/platform/types.h"
33 
34 namespace tensorflow {
35 
36 /// \brief An LRU block cache of file contents, keyed by {filename, offset}.
37 ///
38 /// This class should be shared by read-only random access files on a remote
39 /// filesystem (e.g. GCS).
40 class RamFileBlockCache : public FileBlockCache {
41  public:
42   /// The callback executed when a block is not found in the cache, and needs to
43   /// be fetched from the backing filesystem. This callback is provided when the
44   /// cache is constructed. The returned Status should be OK as long as the
45   /// read from the remote filesystem succeeded (similar to the semantics of the
46   /// read(2) system call).
47   typedef std::function<Status(const string& filename, size_t offset,
48                                size_t buffer_size, char* buffer,
49                                size_t* bytes_transferred)>
50       BlockFetcher;
51 
52   RamFileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness,
53                     BlockFetcher block_fetcher, Env* env = Env::Default())
block_size_(block_size)54       : block_size_(block_size),
55         max_bytes_(max_bytes),
56         max_staleness_(max_staleness),
57         block_fetcher_(block_fetcher),
58         env_(env) {
59     if (max_staleness_ > 0) {
60       pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC",
61                                               [this] { Prune(); }));
62     }
63     VLOG(1) << "GCS file block cache is "
64             << (IsCacheEnabled() ? "enabled" : "disabled");
65   }
66 
~RamFileBlockCache()67   ~RamFileBlockCache() override {
68     if (pruning_thread_) {
69       stop_pruning_thread_.Notify();
70       // Destroying pruning_thread_ will block until Prune() receives the above
71       // notification and returns.
72       pruning_thread_.reset();
73     }
74   }
75 
76   /// Read `n` bytes from `filename` starting at `offset` into `out`. This
77   /// method will return:
78   ///
79   /// 1) The error from the remote filesystem, if the read from the remote
80   ///    filesystem failed.
81   /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded,
82   ///    but the read returned a partial block, and the LRU cache contained a
83   ///    block at a higher offset (indicating that the partial block should have
84   ///    been a full block).
85   /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but
86   ///    the file contents do not extend past `offset` and thus nothing was
87   ///    placed in `out`.
88   /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed
89   ///    in `out`).
90   Status Read(const string& filename, size_t offset, size_t n, char* buffer,
91               size_t* bytes_transferred) override;
92 
93   // Validate the given file signature with the existing file signature in the
94   // cache. Returns true if the signature doesn't change or the file doesn't
95   // exist before. If the signature changes, update the existing signature with
96   // the new one and remove the file from cache.
97   bool ValidateAndUpdateFileSignature(const string& filename,
98                                       int64 file_signature) override
99       LOCKS_EXCLUDED(mu_);
100 
101   /// Remove all cached blocks for `filename`.
102   void RemoveFile(const string& filename) override LOCKS_EXCLUDED(mu_);
103 
104   /// Remove all cached data.
105   void Flush() override LOCKS_EXCLUDED(mu_);
106 
107   /// Accessors for cache parameters.
block_size()108   size_t block_size() const override { return block_size_; }
max_bytes()109   size_t max_bytes() const override { return max_bytes_; }
max_staleness()110   uint64 max_staleness() const override { return max_staleness_; }
111 
112   /// The current size (in bytes) of the cache.
113   size_t CacheSize() const override LOCKS_EXCLUDED(mu_);
114 
115   // Returns true if the cache is enabled. If false, the BlockFetcher callback
116   // is always executed during Read.
IsCacheEnabled()117   bool IsCacheEnabled() const override {
118     return block_size_ > 0 && max_bytes_ > 0;
119   }
120 
121  private:
122   /// The size of the blocks stored in the LRU cache, as well as the size of the
123   /// reads from the underlying filesystem.
124   const size_t block_size_;
125   /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache.
126   const size_t max_bytes_;
127   /// The maximum staleness of any block in the LRU cache, in seconds.
128   const uint64 max_staleness_;
129   /// The callback to read a block from the underlying filesystem.
130   const BlockFetcher block_fetcher_;
131   /// The Env from which we read timestamps.
132   Env* const env_;  // not owned
133 
134   /// \brief The key type for the file block cache.
135   ///
136   /// The file block cache key is a {filename, offset} pair.
137   typedef std::pair<string, size_t> Key;
138 
139   /// \brief The state of a block.
140   ///
141   /// A block begins in the CREATED stage. The first thread will attempt to read
142   /// the block from the filesystem, transitioning the state of the block to
143   /// FETCHING. After completing, if the read was successful the state should
144   /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can
145   /// re-fetch the block if the state is ERROR.
146   enum class FetchState {
147     CREATED,
148     FETCHING,
149     FINISHED,
150     ERROR,
151   };
152 
153   /// \brief A block of a file.
154   ///
155   /// A file block consists of the block data, the block's current position in
156   /// the LRU cache, the timestamp (seconds since epoch) at which the block
157   /// was cached, a coordination lock, and state & condition variables.
158   ///
159   /// Thread safety:
160   /// The iterator and timestamp fields should only be accessed while holding
161   /// the block-cache-wide mu_ instance variable. The state variable should only
162   /// be accessed while holding the Block's mu lock. The data vector should only
163   /// be accessed after state == FINISHED, and it should never be modified.
164   ///
165   /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock
166   /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking
167   /// mu_.
168   struct Block {
169     /// The block data.
170     std::vector<char> data;
171     /// A list iterator pointing to the block's position in the LRU list.
172     std::list<Key>::iterator lru_iterator;
173     /// A list iterator pointing to the block's position in the LRA list.
174     std::list<Key>::iterator lra_iterator;
175     /// The timestamp (seconds since epoch) at which the block was cached.
176     uint64 timestamp;
177     /// Mutex to guard state variable
178     mutex mu;
179     /// The state of the block.
180     FetchState state GUARDED_BY(mu) = FetchState::CREATED;
181     /// Wait on cond_var if state is FETCHING.
182     condition_variable cond_var;
183   };
184 
185   /// \brief The block map type for the file block cache.
186   ///
187   /// The block map is an ordered map from Key to Block.
188   typedef std::map<Key, std::shared_ptr<Block>> BlockMap;
189 
190   /// Prune the cache by removing files with expired blocks.
191   void Prune() LOCKS_EXCLUDED(mu_);
192 
193   bool BlockNotStale(const std::shared_ptr<Block>& block)
194       EXCLUSIVE_LOCKS_REQUIRED(mu_);
195 
196   /// Look up a Key in the block cache.
197   std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_);
198 
199   Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block)
200       LOCKS_EXCLUDED(mu_);
201 
202   /// Trim the block cache to make room for another entry.
203   void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_);
204 
205   /// Update the LRU iterator for the block at `key`.
206   Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block)
207       LOCKS_EXCLUDED(mu_);
208 
209   /// Remove all blocks of a file, with mu_ already held.
210   void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_);
211 
212   /// Remove the block `entry` from the block map and LRU list, and update the
213   /// cache size accordingly.
214   void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_);
215 
216   /// The cache pruning thread that removes files with expired blocks.
217   std::unique_ptr<Thread> pruning_thread_;
218 
219   /// Notification for stopping the cache pruning thread.
220   Notification stop_pruning_thread_;
221 
222   /// Guards access to the block map, LRU list, and cached byte count.
223   mutable mutex mu_;
224 
225   /// The block map (map from Key to Block).
226   BlockMap block_map_ GUARDED_BY(mu_);
227 
228   /// The LRU list of block keys. The front of the list identifies the most
229   /// recently accessed block.
230   std::list<Key> lru_list_ GUARDED_BY(mu_);
231 
232   /// The LRA (least recently added) list of block keys. The front of the list
233   /// identifies the most recently added block.
234   ///
235   /// Note: blocks are added to lra_list_ only after they have successfully been
236   /// fetched from the underlying block store.
237   std::list<Key> lra_list_ GUARDED_BY(mu_);
238 
239   /// The combined number of bytes in all of the cached blocks.
240   size_t cache_size_ GUARDED_BY(mu_) = 0;
241 
242   // A filename->file_signature map.
243   std::map<string, int64> file_signature_map_ GUARDED_BY(mu_);
244 };
245 
246 }  // namespace tensorflow
247 
248 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
249