1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/profiler/rpc/profiler_service_impl.h"
17 
18 #include <memory>
19 
20 #include "grpcpp/support/status.h"
21 #include "absl/container/flat_hash_map.h"
22 #include "absl/memory/memory.h"
23 #include "absl/strings/str_replace.h"
24 #include "tensorflow/core/platform/env.h"
25 #include "tensorflow/core/platform/env_time.h"
26 #include "tensorflow/core/platform/errors.h"
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/platform/mutex.h"
30 #include "tensorflow/core/platform/status.h"
31 #include "tensorflow/core/profiler/lib/profiler_session.h"
32 #include "tensorflow/core/profiler/profiler_service.grpc.pb.h"
33 #include "tensorflow/core/profiler/profiler_service.pb.h"
34 #include "tensorflow/core/profiler/protobuf/xplane.pb.h"
35 #include "tensorflow/core/profiler/utils/file_system_utils.h"
36 #include "tensorflow/core/profiler/utils/xplane_utils.h"
37 
38 namespace tensorflow {
39 namespace profiler {
40 namespace {
41 
42 const absl::string_view kXPlanePb = "xplane.pb";
43 
44 // Collects data in XSpace format. The data is saved to a repository
45 // unconditionally.
CollectDataToRepository(const ProfileRequest & request,ProfilerSession * profiler,ProfileResponse * response)46 Status CollectDataToRepository(const ProfileRequest& request,
47                                ProfilerSession* profiler,
48                                ProfileResponse* response) {
49   response->set_empty_trace(true);
50   // Read the profile data into xspace.
51   XSpace xspace;
52   TF_RETURN_IF_ERROR(profiler->CollectData(&xspace));
53   xspace.add_hostnames(request.host_name());
54   VLOG(3) << "Collected XSpace to repository.";
55   response->set_empty_trace(IsEmpty(xspace));
56 
57   std::string log_dir_path =
58       ProfilerJoinPath(request.repository_root(), request.session_id());
59   VLOG(1) << "Creating " << log_dir_path;
60   TF_RETURN_IF_ERROR(Env::Default()->RecursivelyCreateDir(log_dir_path));
61 
62   std::string file_name = absl::StrCat(request.host_name(), ".", kXPlanePb);
63   // Windows file names do not support colons.
64   absl::StrReplaceAll({{":", "_"}}, &file_name);
65   // Dumps profile data to <repository_root>/<run>/<host>_<port>.<kXPlanePb>
66   std::string out_path = ProfilerJoinPath(log_dir_path, file_name);
67   LOG(INFO) << "Collecting XSpace to repository: " << out_path;
68 
69   return WriteBinaryProto(Env::Default(), out_path, xspace);
70 }
71 
72 class ProfilerServiceImpl : public grpc::ProfilerService::Service {
73  public:
Monitor(::grpc::ServerContext * ctx,const MonitorRequest * req,MonitorResponse * response)74   ::grpc::Status Monitor(::grpc::ServerContext* ctx, const MonitorRequest* req,
75                          MonitorResponse* response) override {
76     return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "unimplemented.");
77   }
78 
Profile(::grpc::ServerContext * ctx,const ProfileRequest * req,ProfileResponse * response)79   ::grpc::Status Profile(::grpc::ServerContext* ctx, const ProfileRequest* req,
80                          ProfileResponse* response) override {
81     VLOG(1) << "Received a profile request: " << req->DebugString();
82     std::unique_ptr<ProfilerSession> profiler =
83         ProfilerSession::Create(req->opts());
84     Status status = profiler->Status();
85     if (!status.ok()) {
86       return ::grpc::Status(::grpc::StatusCode::INTERNAL,
87                             status.error_message());
88     }
89 
90     Env* env = Env::Default();
91     for (uint64 i = 0; i < req->opts().duration_ms(); ++i) {
92       env->SleepForMicroseconds(EnvTime::kMillisToMicros);
93       if (ctx->IsCancelled()) {
94         return ::grpc::Status::CANCELLED;
95       }
96       if (TF_PREDICT_FALSE(IsStopped(req->session_id()))) {
97         mutex_lock lock(mutex_);
98         stop_signals_per_session_.erase(req->session_id());
99         break;
100       }
101     }
102 
103     status = CollectDataToRepository(*req, profiler.get(), response);
104     if (!status.ok()) {
105       return ::grpc::Status(::grpc::StatusCode::INTERNAL,
106                             status.error_message());
107     }
108 
109     return ::grpc::Status::OK;
110   }
111 
Terminate(::grpc::ServerContext * ctx,const TerminateRequest * req,TerminateResponse * response)112   ::grpc::Status Terminate(::grpc::ServerContext* ctx,
113                            const TerminateRequest* req,
114                            TerminateResponse* response) override {
115     mutex_lock lock(mutex_);
116     stop_signals_per_session_[req->session_id()] = true;
117     return ::grpc::Status::OK;
118   }
119 
120  private:
IsStopped(const std::string & session_id)121   bool IsStopped(const std::string& session_id) {
122     mutex_lock lock(mutex_);
123     auto it = stop_signals_per_session_.find(session_id);
124     return it != stop_signals_per_session_.end() && it->second;
125   }
126 
127   mutex mutex_;
128   absl::flat_hash_map<std::string, bool> stop_signals_per_session_
129       ABSL_GUARDED_BY(mutex_);
130 };
131 
132 }  // namespace
133 
CreateProfilerService()134 std::unique_ptr<grpc::ProfilerService::Service> CreateProfilerService() {
135   return absl::make_unique<ProfilerServiceImpl>();
136 }
137 
138 }  // namespace profiler
139 }  // namespace tensorflow
140