1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
17 #include <cstring>
18 #include <memory>
19 #include "tensorflow/core/lib/gtl/cleanup.h"
20 #include "tensorflow/core/platform/env.h"
21 
22 namespace tensorflow {
23 
BlockNotStale(const std::shared_ptr<Block> & block)24 bool RamFileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) {
25   mutex_lock l(block->mu);
26   if (block->state != FetchState::FINISHED) {
27     return true;  // No need to check for staleness.
28   }
29   if (max_staleness_ == 0) return true;  // Not enforcing staleness.
30   return env_->NowSeconds() - block->timestamp <= max_staleness_;
31 }
32 
Lookup(const Key & key)33 std::shared_ptr<RamFileBlockCache::Block> RamFileBlockCache::Lookup(
34     const Key& key) {
35   mutex_lock lock(mu_);
36   auto entry = block_map_.find(key);
37   if (entry != block_map_.end()) {
38     if (BlockNotStale(entry->second)) {
39       return entry->second;
40     } else {
41       // Remove the stale block and continue.
42       RemoveFile_Locked(key.first);
43     }
44   }
45 
46   // Insert a new empty block, setting the bookkeeping to sentinel values
47   // in order to update them as appropriate.
48   auto new_entry = std::make_shared<Block>();
49   lru_list_.push_front(key);
50   lra_list_.push_front(key);
51   new_entry->lru_iterator = lru_list_.begin();
52   new_entry->lra_iterator = lra_list_.begin();
53   new_entry->timestamp = env_->NowSeconds();
54   block_map_.emplace(std::make_pair(key, new_entry));
55   return new_entry;
56 }
57 
58 // Remove blocks from the cache until we do not exceed our maximum size.
Trim()59 void RamFileBlockCache::Trim() {
60   while (!lru_list_.empty() && cache_size_ > max_bytes_) {
61     RemoveBlock(block_map_.find(lru_list_.back()));
62   }
63 }
64 
65 /// Move the block to the front of the LRU list if it isn't already there.
UpdateLRU(const Key & key,const std::shared_ptr<Block> & block)66 Status RamFileBlockCache::UpdateLRU(const Key& key,
67                                     const std::shared_ptr<Block>& block) {
68   mutex_lock lock(mu_);
69   if (block->timestamp == 0) {
70     // The block was evicted from another thread. Allow it to remain evicted.
71     return Status::OK();
72   }
73   if (block->lru_iterator != lru_list_.begin()) {
74     lru_list_.erase(block->lru_iterator);
75     lru_list_.push_front(key);
76     block->lru_iterator = lru_list_.begin();
77   }
78 
79   // Check for inconsistent state. If there is a block later in the same file
80   // in the cache, and our current block is not block size, this likely means
81   // we have inconsistent state within the cache. Note: it's possible some
82   // incomplete reads may still go undetected.
83   if (block->data.size() < block_size_) {
84     Key fmax = std::make_pair(key.first, std::numeric_limits<size_t>::max());
85     auto fcmp = block_map_.upper_bound(fmax);
86     if (fcmp != block_map_.begin() && key < (--fcmp)->first) {
87       return errors::Internal("Block cache contents are inconsistent.");
88     }
89   }
90 
91   Trim();
92 
93   return Status::OK();
94 }
95 
MaybeFetch(const Key & key,const std::shared_ptr<Block> & block)96 Status RamFileBlockCache::MaybeFetch(const Key& key,
97                                      const std::shared_ptr<Block>& block) {
98   bool downloaded_block = false;
99   auto reconcile_state =
100       gtl::MakeCleanup([this, &downloaded_block, &key, &block] {
101         // Perform this action in a cleanup callback to avoid locking mu_ after
102         // locking block->mu.
103         if (downloaded_block) {
104           mutex_lock l(mu_);
105           // Do not update state if the block is already to be evicted.
106           if (block->timestamp != 0) {
107             cache_size_ += block->data.size();
108             // Put to beginning of LRA list.
109             lra_list_.erase(block->lra_iterator);
110             lra_list_.push_front(key);
111             block->lra_iterator = lra_list_.begin();
112             block->timestamp = env_->NowSeconds();
113           }
114         }
115       });
116   // Loop until either block content is successfully fetched, or our request
117   // encounters an error.
118   mutex_lock l(block->mu);
119   Status status = Status::OK();
120   while (true) {
121     switch (block->state) {
122       case FetchState::ERROR:
123         TF_FALLTHROUGH_INTENDED;
124       case FetchState::CREATED:
125         block->state = FetchState::FETCHING;
126         block->mu.unlock();  // Release the lock while making the API call.
127         block->data.clear();
128         block->data.resize(block_size_, 0);
129         size_t bytes_transferred;
130         status.Update(block_fetcher_(key.first, key.second, block_size_,
131                                      block->data.data(), &bytes_transferred));
132         block->mu.lock();  // Reacquire the lock immediately afterwards
133         if (status.ok()) {
134           block->data.resize(bytes_transferred, 0);
135           block->data.shrink_to_fit();
136           downloaded_block = true;
137           block->state = FetchState::FINISHED;
138         } else {
139           block->state = FetchState::ERROR;
140         }
141         block->cond_var.notify_all();
142         return status;
143       case FetchState::FETCHING:
144         block->cond_var.wait_for(l, std::chrono::seconds(60));
145         if (block->state == FetchState::FINISHED) {
146           return Status::OK();
147         }
148         // Re-loop in case of errors.
149         break;
150       case FetchState::FINISHED:
151         return Status::OK();
152     }
153   }
154   return errors::Internal(
155       "Control flow should never reach the end of RamFileBlockCache::Fetch.");
156 }
157 
Read(const string & filename,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)158 Status RamFileBlockCache::Read(const string& filename, size_t offset, size_t n,
159                                char* buffer, size_t* bytes_transferred) {
160   *bytes_transferred = 0;
161   if (n == 0) {
162     return Status::OK();
163   }
164   if (!IsCacheEnabled()) {
165     // The cache is effectively disabled, so we pass the read through to the
166     // fetcher without breaking it up into blocks.
167     return block_fetcher_(filename, offset, n, buffer, bytes_transferred);
168   }
169   // Calculate the block-aligned start and end of the read.
170   size_t start = block_size_ * (offset / block_size_);
171   size_t finish = block_size_ * ((offset + n) / block_size_);
172   if (finish < offset + n) {
173     finish += block_size_;
174   }
175   size_t total_bytes_transferred = 0;
176   // Now iterate through the blocks, reading them one at a time.
177   for (size_t pos = start; pos < finish; pos += block_size_) {
178     Key key = std::make_pair(filename, pos);
179     // Look up the block, fetching and inserting it if necessary, and update the
180     // LRU iterator for the key and block.
181     std::shared_ptr<Block> block = Lookup(key);
182     DCHECK(block) << "No block for key " << key.first << "@" << key.second;
183     TF_RETURN_IF_ERROR(MaybeFetch(key, block));
184     TF_RETURN_IF_ERROR(UpdateLRU(key, block));
185     // Copy the relevant portion of the block into the result buffer.
186     const auto& data = block->data;
187     if (offset >= pos + data.size()) {
188       // The requested offset is at or beyond the end of the file. This can
189       // happen if `offset` is not block-aligned, and the read returns the last
190       // block in the file, which does not extend all the way out to `offset`.
191       *bytes_transferred = total_bytes_transferred;
192       return errors::OutOfRange("EOF at offset ", offset, " in file ", filename,
193                                 " at position ", pos, "with data size ",
194                                 data.size());
195     }
196     auto begin = data.begin();
197     if (offset > pos) {
198       // The block begins before the slice we're reading.
199       begin += offset - pos;
200     }
201     auto end = data.end();
202     if (pos + data.size() > offset + n) {
203       // The block extends past the end of the slice we're reading.
204       end -= (pos + data.size()) - (offset + n);
205     }
206     if (begin < end) {
207       size_t bytes_to_copy = end - begin;
208       memcpy(&buffer[total_bytes_transferred], &*begin, bytes_to_copy);
209       total_bytes_transferred += bytes_to_copy;
210     }
211     if (data.size() < block_size_) {
212       // The block was a partial block and thus signals EOF at its upper bound.
213       break;
214     }
215   }
216   *bytes_transferred = total_bytes_transferred;
217   return Status::OK();
218 }
219 
ValidateAndUpdateFileSignature(const string & filename,int64 file_signature)220 bool RamFileBlockCache::ValidateAndUpdateFileSignature(const string& filename,
221                                                        int64 file_signature) {
222   mutex_lock lock(mu_);
223   auto it = file_signature_map_.find(filename);
224   if (it != file_signature_map_.end()) {
225     if (it->second == file_signature) {
226       return true;
227     }
228     // Remove the file from cache if the signatures don't match.
229     RemoveFile_Locked(filename);
230     it->second = file_signature;
231     return false;
232   }
233   file_signature_map_[filename] = file_signature;
234   return true;
235 }
236 
CacheSize() const237 size_t RamFileBlockCache::CacheSize() const {
238   mutex_lock lock(mu_);
239   return cache_size_;
240 }
241 
Prune()242 void RamFileBlockCache::Prune() {
243   while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) {
244     mutex_lock lock(mu_);
245     uint64 now = env_->NowSeconds();
246     while (!lra_list_.empty()) {
247       auto it = block_map_.find(lra_list_.back());
248       if (now - it->second->timestamp <= max_staleness_) {
249         // The oldest block is not yet expired. Come back later.
250         break;
251       }
252       // We need to make a copy of the filename here, since it could otherwise
253       // be used within RemoveFile_Locked after `it` is deleted.
254       RemoveFile_Locked(std::string(it->first.first));
255     }
256   }
257 }
258 
Flush()259 void RamFileBlockCache::Flush() {
260   mutex_lock lock(mu_);
261   block_map_.clear();
262   lru_list_.clear();
263   lra_list_.clear();
264   cache_size_ = 0;
265 }
266 
RemoveFile(const string & filename)267 void RamFileBlockCache::RemoveFile(const string& filename) {
268   mutex_lock lock(mu_);
269   RemoveFile_Locked(filename);
270 }
271 
RemoveFile_Locked(const string & filename)272 void RamFileBlockCache::RemoveFile_Locked(const string& filename) {
273   Key begin = std::make_pair(filename, 0);
274   auto it = block_map_.lower_bound(begin);
275   while (it != block_map_.end() && it->first.first == filename) {
276     auto next = std::next(it);
277     RemoveBlock(it);
278     it = next;
279   }
280 }
281 
RemoveBlock(BlockMap::iterator entry)282 void RamFileBlockCache::RemoveBlock(BlockMap::iterator entry) {
283   // This signals that the block is removed, and should not be inadvertently
284   // reinserted into the cache in UpdateLRU.
285   entry->second->timestamp = 0;
286   lru_list_.erase(entry->second->lru_iterator);
287   lra_list_.erase(entry->second->lra_iterator);
288   cache_size_ -= entry->second->data.size();
289   block_map_.erase(entry);
290 }
291 
292 }  // namespace tensorflow
293