1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/subprocess.h"
17 
18 #include <io.h>
19 #include <signal.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/types.h>
23 #include <windows.h>
24 
25 #include <vector>
26 
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/strcat.h"
29 
30 #define PIPE_BUF_SIZE 4096
31 
32 namespace tensorflow {
33 
34 namespace {
35 
IsProcessFinished(HANDLE h)36 static bool IsProcessFinished(HANDLE h) {
37   DWORD process_return_code = STILL_ACTIVE;
38   // TODO handle failure
39   GetExitCodeProcess(h, &process_return_code);
40   return process_return_code != STILL_ACTIVE;
41 }
42 
43 struct ThreadData {
44   string* iobuf;
45   HANDLE iohandle;
46 };
47 
InputThreadFunction(LPVOID param)48 DWORD WINAPI InputThreadFunction(LPVOID param) {
49   ThreadData* args = reinterpret_cast<ThreadData*>(param);
50   string* input = args->iobuf;
51   HANDLE in_handle = args->iohandle;
52   size_t buffer_pointer = 0;
53 
54   size_t total_bytes_written = 0;
55   bool ok = true;
56   while (ok && total_bytes_written < input->size()) {
57     DWORD bytes_written_this_time;
58     ok = WriteFile(in_handle, input->data() + total_bytes_written,
59                    input->size() - total_bytes_written,
60                    &bytes_written_this_time, nullptr);
61     total_bytes_written += bytes_written_this_time;
62   }
63   CloseHandle(in_handle);
64 
65   if (!ok) {
66     return GetLastError();
67   } else {
68     return 0;
69   }
70 }
71 
OutputThreadFunction(LPVOID param)72 DWORD WINAPI OutputThreadFunction(LPVOID param) {
73   ThreadData* args = reinterpret_cast<ThreadData*>(param);
74   string* output = args->iobuf;
75   HANDLE out_handle = args->iohandle;
76 
77   char buf[PIPE_BUF_SIZE];
78   DWORD bytes_read;
79 
80   bool wait_result = WaitForSingleObject(out_handle, INFINITE);
81   if (wait_result != WAIT_OBJECT_0) {
82     LOG(FATAL) << "WaitForSingleObject on child process output failed. "
83                   "Error code: "
84                << wait_result;
85   }
86   while (ReadFile(out_handle, buf, sizeof(buf), &bytes_read, nullptr) &&
87          bytes_read > 0) {
88     output->append(buf, bytes_read);
89   }
90   CloseHandle(out_handle);
91   return 0;
92 }
93 
94 }  // namespace
95 
SubProcess(int nfds)96 SubProcess::SubProcess(int nfds)
97     : running_(false),
98       exec_path_(nullptr),
99       exec_argv_(nullptr),
100       win_pi_(nullptr) {
101   // The input 'nfds' parameter is currently ignored and the internal constant
102   // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
103   for (int i = 0; i < kNFds; i++) {
104     action_[i] = ACTION_CLOSE;
105     parent_pipe_[i] = nullptr;
106   }
107 }
108 
~SubProcess()109 SubProcess::~SubProcess() {
110   mutex_lock procLock(proc_mu_);
111   mutex_lock dataLock(data_mu_);
112   if (win_pi_) {
113     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
114     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
115     delete win_pi_;
116   }
117   running_ = false;
118   FreeArgs();
119   ClosePipes();
120 }
121 
FreeArgs()122 void SubProcess::FreeArgs() {
123   free(exec_path_);
124   exec_path_ = nullptr;
125 
126   if (exec_argv_) {
127     for (int i = 0; exec_argv_[i]; i++) {
128       free(exec_argv_[i]);
129     }
130     delete[] exec_argv_;
131     exec_argv_ = nullptr;
132   }
133 }
134 
ClosePipes()135 void SubProcess::ClosePipes() {
136   for (int i = 0; i < kNFds; i++) {
137     if (parent_pipe_[i] != nullptr) {
138       CloseHandle(parent_pipe_[i]);
139       parent_pipe_[i] = nullptr;
140     }
141   }
142 }
143 
SetProgram(const string & file,const std::vector<string> & argv)144 void SubProcess::SetProgram(const string& file,
145                             const std::vector<string>& argv) {
146   mutex_lock procLock(proc_mu_);
147   mutex_lock dataLock(data_mu_);
148   if (running_) {
149     LOG(FATAL) << "SetProgram called after the process was started.";
150     return;
151   }
152 
153   FreeArgs();
154   exec_path_ = _strdup(file.c_str());
155   if (exec_path_ == nullptr) {
156     LOG(FATAL) << "SetProgram failed to allocate file string.";
157     return;
158   }
159 
160   int argc = argv.size();
161   exec_argv_ = new char*[argc + 1];
162   for (int i = 0; i < argc; i++) {
163     exec_argv_[i] = _strdup(argv[i].c_str());
164     if (exec_argv_[i] == nullptr) {
165       LOG(FATAL) << "SetProgram failed to allocate command argument.";
166       return;
167     }
168   }
169   exec_argv_[argc] = nullptr;
170 }
171 
SetChannelAction(Channel chan,ChannelAction action)172 void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
173   mutex_lock procLock(proc_mu_);
174   mutex_lock dataLock(data_mu_);
175   if (running_) {
176     LOG(FATAL) << "SetChannelAction called after the process was started.";
177   } else if (!chan_valid(chan)) {
178     LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
179   } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
180              (action != ACTION_DUPPARENT)) {
181     LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
182   } else {
183     action_[chan] = action;
184   }
185 }
186 
Start()187 bool SubProcess::Start() {
188   mutex_lock procLock(proc_mu_);
189   mutex_lock dataLock(data_mu_);
190   if (running_) {
191     LOG(ERROR) << "Start called after the process was started.";
192     return false;
193   }
194   if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
195     LOG(ERROR) << "Start called without setting a program.";
196     return false;
197   }
198 
199   // SecurityAttributes to use in winapi calls below.
200   SECURITY_ATTRIBUTES attrs;
201   attrs.nLength = sizeof(SECURITY_ATTRIBUTES);
202   attrs.bInheritHandle = TRUE;
203   attrs.lpSecurityDescriptor = nullptr;
204 
205   // No need to store subprocess end of the pipes, they will be closed before
206   // this function terminates.
207   HANDLE child_pipe_[kNFds] TF_GUARDED_BY(data_mu_);
208 
209   // Create parent/child pipes for the specified channels and make the
210   // parent-side of the pipes non-blocking.
211   for (int i = 0; i < kNFds; i++) {
212     if (action_[i] == ACTION_PIPE) {
213       if (!CreatePipe(i == CHAN_STDIN ? child_pipe_ + i : parent_pipe_ + i,
214                       i == CHAN_STDIN ? parent_pipe_ + i : child_pipe_ + i,
215                       &attrs, PIPE_BUF_SIZE)) {
216         LOG(ERROR) << "Cannot create pipe. Error code: " << GetLastError();
217         ClosePipes();
218         return false;
219       }
220 
221       // Parent pipes should not be inherited by the child process
222       if (!SetHandleInformation(parent_pipe_[i], HANDLE_FLAG_INHERIT, 0)) {
223         LOG(ERROR) << "Cannot set pipe handle attributes.";
224         ClosePipes();
225         return false;
226       }
227     } else if (action_[i] == ACTION_DUPPARENT) {
228       if (i == CHAN_STDIN) {
229         child_pipe_[i] = GetStdHandle(STD_INPUT_HANDLE);
230       } else if (i == CHAN_STDOUT) {
231         child_pipe_[i] = GetStdHandle(STD_OUTPUT_HANDLE);
232       } else {
233         child_pipe_[i] = GetStdHandle(STD_ERROR_HANDLE);
234       }
235     } else {  // ACTION_CLOSE
236       parent_pipe_[i] = nullptr;
237       child_pipe_[i] = nullptr;
238     }
239   }
240 
241   // Concatanate argv, because winapi wants it so.
242   string command_line = strings::StrCat("\"", exec_path_, "\"");
243   for (int i = 1; exec_argv_[i]; i++) {
244     command_line.append(strings::StrCat(" \"", exec_argv_[i], "\""));
245   }
246 
247   // Set up the STARTUPINFO struct with information about the pipe handles.
248   STARTUPINFOA si;
249   ZeroMemory(&si, sizeof(STARTUPINFO));
250   si.cb = sizeof(STARTUPINFO);
251   si.dwFlags |= STARTF_USESTDHANDLES;
252 
253   // Handle the pipes for the child process.
254   if (child_pipe_[CHAN_STDIN]) {
255     si.hStdInput = child_pipe_[CHAN_STDIN];
256   }
257   if (child_pipe_[CHAN_STDOUT]) {
258     si.hStdOutput = child_pipe_[CHAN_STDOUT];
259   }
260   if (child_pipe_[CHAN_STDERR]) {
261     si.hStdError = child_pipe_[CHAN_STDERR];
262   }
263 
264   // Allocate the POROCESS_INFORMATION struct.
265   win_pi_ = new PROCESS_INFORMATION;
266 
267   // Execute the child program.
268   bool bSuccess =
269       CreateProcessA(nullptr, const_cast<char*>(command_line.c_str()), nullptr,
270                      nullptr, TRUE, 0, nullptr, nullptr, &si,
271                      reinterpret_cast<PROCESS_INFORMATION*>(win_pi_));
272 
273   if (bSuccess) {
274     for (int i = 0; i < kNFds; i++) {
275       if (child_pipe_[i] != nullptr) {
276         CloseHandle(child_pipe_[i]);
277         child_pipe_[i] = nullptr;
278       }
279     }
280     running_ = true;
281     return true;
282   } else {
283     LOG(ERROR) << "Call to CreateProcess failed. Error code: "
284                << GetLastError();
285     ClosePipes();
286     return false;
287   }
288 }
289 
Wait()290 bool SubProcess::Wait() {
291   int status;
292   return WaitInternal(&status);
293 }
294 
WaitInternal(int * status)295 bool SubProcess::WaitInternal(int* status) {
296   // The waiter must release proc_mu_ while waiting in order for Kill() to work.
297   proc_mu_.lock();
298   bool running = running_;
299   PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
300   proc_mu_.unlock();
301 
302   bool ret = false;
303   if (running && pi_.hProcess) {
304     DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE);
305     if (wait_status == WAIT_OBJECT_0) {
306       DWORD process_exit_code = 0;
307       if (GetExitCodeProcess(pi_.hProcess, &process_exit_code)) {
308         *status = static_cast<int>(process_exit_code);
309       } else {
310         LOG(FATAL) << "Wait failed with code: " << GetLastError();
311       }
312     } else {
313       LOG(FATAL) << "WaitForSingleObject call on the process handle failed. "
314                     "Error code: "
315                  << wait_status;
316     }
317   }
318 
319   proc_mu_.lock();
320   if ((running_ == running) &&
321       (pi_.hProcess ==
322        reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess)) {
323     running_ = false;
324     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
325     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
326     reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess = nullptr;
327     reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread = nullptr;
328   }
329   proc_mu_.unlock();
330   return *status == 0;
331 }
332 
Kill(int unused_signal)333 bool SubProcess::Kill(int unused_signal) {
334   proc_mu_.lock();
335   bool running = running_;
336   PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
337   proc_mu_.unlock();
338 
339   bool ret = false;
340   if (running && pi_.hProcess) {
341     ret = TerminateProcess(pi_.hProcess, 0);
342   }
343   return ret;
344 }
345 
Communicate(const string * stdin_input,string * stdout_output,string * stderr_output)346 int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
347                             string* stderr_output) {
348   proc_mu_.lock();
349   bool running = running_;
350   proc_mu_.unlock();
351   if (!running) {
352     LOG(ERROR) << "Communicate called without a running process.";
353     return 1;
354   }
355 
356   HANDLE thread_handles[kNFds];
357   int thread_count = 0;
358   ThreadData thread_params[kNFds];
359 
360   // Lock data_mu_ but not proc_mu_ while communicating with the child process
361   // in order for Kill() to be able to terminate the child from another thread.
362   data_mu_.lock();
363   if (!IsProcessFinished(
364           reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess) ||
365       (parent_pipe_[CHAN_STDOUT] != nullptr) ||
366       (parent_pipe_[CHAN_STDERR] != nullptr)) {
367     if (parent_pipe_[CHAN_STDIN] != nullptr) {
368       if (stdin_input) {
369         thread_params[thread_count].iobuf = const_cast<string*>(stdin_input);
370         thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDIN];
371         parent_pipe_[CHAN_STDIN] = nullptr;
372         thread_handles[thread_count] =
373             CreateThread(NULL, 0, InputThreadFunction,
374                          thread_params + thread_count, 0, NULL);
375         thread_count++;
376       }
377     } else {
378       CloseHandle(parent_pipe_[CHAN_STDIN]);
379       parent_pipe_[CHAN_STDIN] = NULL;
380     }
381 
382     if (parent_pipe_[CHAN_STDOUT] != nullptr) {
383       if (stdout_output != nullptr) {
384         thread_params[thread_count].iobuf = stdout_output;
385         thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDOUT];
386         parent_pipe_[CHAN_STDOUT] = NULL;
387         thread_handles[thread_count] =
388             CreateThread(NULL, 0, OutputThreadFunction,
389                          thread_params + thread_count, 0, NULL);
390         thread_count++;
391       } else {
392         CloseHandle(parent_pipe_[CHAN_STDOUT]);
393         parent_pipe_[CHAN_STDOUT] = nullptr;
394       }
395     }
396 
397     if (parent_pipe_[CHAN_STDERR] != nullptr) {
398       if (stderr_output != nullptr) {
399         thread_params[thread_count].iobuf = stderr_output;
400         thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDERR];
401         parent_pipe_[CHAN_STDERR] = NULL;
402         thread_handles[thread_count] =
403             CreateThread(NULL, 0, OutputThreadFunction,
404                          thread_params + thread_count, 0, NULL);
405         thread_count++;
406       } else {
407         CloseHandle(parent_pipe_[CHAN_STDERR]);
408         parent_pipe_[CHAN_STDERR] = nullptr;
409       }
410     }
411   }
412 
413   // Wait for all IO threads to exit.
414   if (thread_count > 0) {
415     DWORD wait_result = WaitForMultipleObjects(thread_count, thread_handles,
416                                                true,  // wait all threads
417                                                INFINITE);
418     if (wait_result != WAIT_OBJECT_0) {
419       LOG(ERROR) << "Waiting on the io threads failed! result: " << wait_result
420                  << std::endl;
421       return -1;
422     }
423 
424     for (int i = 0; i < thread_count; i++) {
425       DWORD exit_code;
426       if (GetExitCodeThread(thread_handles[i], &exit_code)) {
427         if (exit_code) {
428           LOG(ERROR) << "One of the IO threads failed with code: " << exit_code;
429         }
430       } else {
431         LOG(ERROR) << "Error checking io thread exit statuses. Error Code: "
432                    << GetLastError();
433       }
434     }
435   }
436 
437   data_mu_.unlock();
438 
439   // Wait for the child process to exit and return its status.
440   int status;
441   return WaitInternal(&status) ? status : -1;
442 }
443 
CreateSubProcess(const std::vector<string> & argv)444 std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
445   std::unique_ptr<SubProcess> proc(new SubProcess());
446   proc->SetProgram(argv[0], argv);
447   proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
448   proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
449   return proc;
450 }
451 
452 }  // namespace tensorflow
453