1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/profiler/rpc/profiler_service_impl.h"
17
18 #include <memory>
19
20 #include "grpcpp/support/status.h"
21 #include "absl/container/flat_hash_map.h"
22 #include "absl/memory/memory.h"
23 #include "absl/strings/str_replace.h"
24 #include "tensorflow/core/platform/env.h"
25 #include "tensorflow/core/platform/env_time.h"
26 #include "tensorflow/core/platform/errors.h"
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/platform/mutex.h"
30 #include "tensorflow/core/platform/status.h"
31 #include "tensorflow/core/profiler/lib/profiler_session.h"
32 #include "tensorflow/core/profiler/profiler_service.grpc.pb.h"
33 #include "tensorflow/core/profiler/profiler_service.pb.h"
34 #include "tensorflow/core/profiler/protobuf/xplane.pb.h"
35 #include "tensorflow/core/profiler/utils/file_system_utils.h"
36 #include "tensorflow/core/profiler/utils/xplane_utils.h"
37
38 namespace tensorflow {
39 namespace profiler {
40 namespace {
41
42 const absl::string_view kXPlanePb = "xplane.pb";
43
44 // Collects data in XSpace format. The data is saved to a repository
45 // unconditionally.
CollectDataToRepository(const ProfileRequest & request,ProfilerSession * profiler,ProfileResponse * response)46 Status CollectDataToRepository(const ProfileRequest& request,
47 ProfilerSession* profiler,
48 ProfileResponse* response) {
49 response->set_empty_trace(true);
50 // Read the profile data into xspace.
51 XSpace xspace;
52 TF_RETURN_IF_ERROR(profiler->CollectData(&xspace));
53 xspace.add_hostnames(request.host_name());
54 VLOG(3) << "Collected XSpace to repository.";
55 response->set_empty_trace(IsEmpty(xspace));
56
57 std::string log_dir_path =
58 ProfilerJoinPath(request.repository_root(), request.session_id());
59 VLOG(1) << "Creating " << log_dir_path;
60 TF_RETURN_IF_ERROR(Env::Default()->RecursivelyCreateDir(log_dir_path));
61
62 std::string file_name = absl::StrCat(request.host_name(), ".", kXPlanePb);
63 // Windows file names do not support colons.
64 absl::StrReplaceAll({{":", "_"}}, &file_name);
65 // Dumps profile data to <repository_root>/<run>/<host>_<port>.<kXPlanePb>
66 std::string out_path = ProfilerJoinPath(log_dir_path, file_name);
67 LOG(INFO) << "Collecting XSpace to repository: " << out_path;
68
69 return WriteBinaryProto(Env::Default(), out_path, xspace);
70 }
71
72 class ProfilerServiceImpl : public grpc::ProfilerService::Service {
73 public:
Monitor(::grpc::ServerContext * ctx,const MonitorRequest * req,MonitorResponse * response)74 ::grpc::Status Monitor(::grpc::ServerContext* ctx, const MonitorRequest* req,
75 MonitorResponse* response) override {
76 return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "unimplemented.");
77 }
78
Profile(::grpc::ServerContext * ctx,const ProfileRequest * req,ProfileResponse * response)79 ::grpc::Status Profile(::grpc::ServerContext* ctx, const ProfileRequest* req,
80 ProfileResponse* response) override {
81 VLOG(1) << "Received a profile request: " << req->DebugString();
82 std::unique_ptr<ProfilerSession> profiler =
83 ProfilerSession::Create(req->opts());
84 Status status = profiler->Status();
85 if (!status.ok()) {
86 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
87 status.error_message());
88 }
89
90 Env* env = Env::Default();
91 for (uint64 i = 0; i < req->opts().duration_ms(); ++i) {
92 env->SleepForMicroseconds(EnvTime::kMillisToMicros);
93 if (ctx->IsCancelled()) {
94 return ::grpc::Status::CANCELLED;
95 }
96 if (TF_PREDICT_FALSE(IsStopped(req->session_id()))) {
97 mutex_lock lock(mutex_);
98 stop_signals_per_session_.erase(req->session_id());
99 break;
100 }
101 }
102
103 status = CollectDataToRepository(*req, profiler.get(), response);
104 if (!status.ok()) {
105 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
106 status.error_message());
107 }
108
109 return ::grpc::Status::OK;
110 }
111
Terminate(::grpc::ServerContext * ctx,const TerminateRequest * req,TerminateResponse * response)112 ::grpc::Status Terminate(::grpc::ServerContext* ctx,
113 const TerminateRequest* req,
114 TerminateResponse* response) override {
115 mutex_lock lock(mutex_);
116 stop_signals_per_session_[req->session_id()] = true;
117 return ::grpc::Status::OK;
118 }
119
120 private:
IsStopped(const std::string & session_id)121 bool IsStopped(const std::string& session_id) {
122 mutex_lock lock(mutex_);
123 auto it = stop_signals_per_session_.find(session_id);
124 return it != stop_signals_per_session_.end() && it->second;
125 }
126
127 mutex mutex_;
128 absl::flat_hash_map<std::string, bool> stop_signals_per_session_
129 ABSL_GUARDED_BY(mutex_);
130 };
131
132 } // namespace
133
CreateProfilerService()134 std::unique_ptr<grpc::ProfilerService::Service> CreateProfilerService() {
135 return absl::make_unique<ProfilerServiceImpl>();
136 }
137
138 } // namespace profiler
139 } // namespace tensorflow
140