1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_GRAPPLER_CLUSTERS_SINGLE_MACHINE_H_
17 #define TENSORFLOW_CORE_GRAPPLER_CLUSTERS_SINGLE_MACHINE_H_
18 
19 #include "tensorflow/cc/training/coordinator.h"
20 #include "tensorflow/core/framework/allocator.h"
21 #include "tensorflow/core/grappler/clusters/cluster.h"
22 #include "tensorflow/core/lib/core/threadpool.h"
23 #include "tensorflow/core/platform/mutex.h"
24 #include "tensorflow/core/public/session.h"
25 
26 namespace tensorflow {
27 namespace grappler {
28 
29 // Create a simple cluster that makes available to grappler a subset of the
30 // nodes available on a single local computer.
31 class SingleMachine : public Cluster {
32  public:
33   SingleMachine(int timeout_s, int num_cpu_cores, int num_gpus);
34   ~SingleMachine() override;
35 
type()36   string type() const override { return "single_machine"; }
37 
38   Status Provision() override;
39   Status Shutdown() override;
40 
41   Status Initialize(const GrapplerItem& item) override;
42   Status Run(const GraphDef& item,
43              const std::vector<std::pair<string, Tensor>>& feed,
44              const std::vector<string>& fetch, RunMetadata* metadata) override;
45 
GetDeviceSet()46   const DeviceSet* GetDeviceSet() const override { return device_set_.get(); }
47 
48   Status EnablePeakMemoryStats() override;
49 
50   // It requires EnableAllocatorStats(true) be called before Provision().
51   Status GetPeakMemoryUsage(
52       std::unordered_map<string, uint64>* device_peak_memory) const override;
53 
54  private:
55   Status RunWithTimeout(const std::vector<std::pair<string, Tensor>>& feed,
56                         const std::vector<string>& fetch,
57                         RunMetadata* run_metadata);
58   Status RunWithTimeout(const std::vector<std::pair<string, Tensor>>& feed,
59                         const std::vector<string>& fetch,
60                         RunMetadata* run_metadata, int64 timeout_s);
61   Status ResetSession();
62   Status CloseSession(bool use_timeout);
63   Status ShutdownSession();
64   void MergeCosts(CostGraphDef* graph_costs, const CostGraphDef& init_costs,
65                   const CostGraphDef& queue_costs);
66 
67   Status ClearAllocatorStats() const;
68 
69   std::unique_ptr<Session> session_;
70   std::vector<QueueRunnerDef> queue_runner_defs_;
71   string last_graph_id_;
72   mutex last_graph_mu_;
73   const GraphDef* last_graph_ TF_GUARDED_BY(last_graph_mu_) = nullptr;
74   std::vector<string> init_ops_;
75   int64 expected_init_time_s_;
76   std::unique_ptr<Coordinator> coordinator_;
77   std::unique_ptr<thread::ThreadPool> thread_pool_;
78   std::unique_ptr<DeviceSet> device_set_;
79 
80   RunMetadata init_metadata_;
81 
82   mutex close_mu_;
83   bool closing_ TF_GUARDED_BY(close_mu_);
84 
85   bool cpu_allocator_stats_enabled_ = false;
86 };
87 
88 }  // end namespace grappler
89 }  // end namespace tensorflow
90 
91 #endif  // TENSORFLOW_CORE_GRAPPLER_CLUSTERS_SINGLE_MACHINE_H_
92