1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include <fcntl.h>
17 #include <poll.h>
18 #include <signal.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/types.h>
22 #include <sys/wait.h>
23 #include <memory>
24 #include <vector>
25 
26 #include "tensorflow/core/platform/logging.h"
27 #include "tensorflow/core/platform/subprocess.h"
28 
29 // 1) FYI from m3b@ about fork():
30 // A danger of calling fork() (as opposed to clone() or vfork()) is that if
31 // many people have used pthread_atfork() to acquire locks, fork() can deadlock,
32 // because it's unlikely that the locking order will be correct in a large
33 // program where different layers are unaware of one another and using
34 // pthread_atfork() independently.
35 //
36 // The danger of not calling fork() is that if libc managed to use
37 // pthread_atfork() correctly (for example, to lock the environment), you'd
38 // miss out on that protection.  (But as far as I can see most libc's don't get
39 // that right; certainly glibc doesn't seem to.)
40 //
41 // clone() or vfork() are also frustrating because clone() exists only on Linux,
42 // and both clone(...CLONE_VM...) and vfork() have interesting issues around
43 // signals being delivered after the fork and before the exec.  It may be
44 // possible to work around the latter by blocking all signals before the fork
45 // and unblocking them afterwards.
46 //
47 // Fortunately, most people haven't heard of pthread_atfork().
48 //
49 //
50 // 2) FYI from m3b@ about execv():
51 // The execv() call implicitly uses the libc global variable environ, which was
52 // copied by fork(), and that copy could have raced with a setenv() call in
53 // another thread, since libc implementations are usually not very careful about
54 // this. (glibc isn't careful, for example.)
55 //
56 // If this were inside libc, we could use locks or memory barriers to avoid the
57 // race, but as it is, I see nothing you can do.  Even if you tried to copy the
58 // environment before the fork(), the copying could race with other threads
59 // calling setenv().  The good news is that few people call setenv().
60 //
61 // Amusingly, the standard says of fork(): "...to avoid errors, the child
62 // process may only execute async-signal-safe operations until such time as one
63 // of the exec functions is called."  Notice that execve() is listed as
64 // async-signal-safe, but execv() is not, and the difference is just the
65 // handling of the environment.
66 
67 namespace tensorflow {
68 
SubProcess(int nfds)69 SubProcess::SubProcess(int nfds)
70     : running_(false), pid_(-1), exec_path_(nullptr), exec_argv_(nullptr) {
71   // The input 'nfds' parameter is currently ignored and the internal constant
72   // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
73   for (int i = 0; i < kNFds; i++) {
74     action_[i] = ACTION_CLOSE;
75     parent_pipe_[i] = -1;
76     child_pipe_[i] = -1;
77   }
78 }
79 
~SubProcess()80 SubProcess::~SubProcess() {
81   mutex_lock procLock(proc_mu_);
82   mutex_lock dataLock(data_mu_);
83   pid_ = -1;
84   running_ = false;
85   FreeArgs();
86   ClosePipes();
87 }
88 
FreeArgs()89 void SubProcess::FreeArgs() {
90   free(exec_path_);
91   exec_path_ = nullptr;
92 
93   if (exec_argv_) {
94     for (char** p = exec_argv_; *p != nullptr; p++) {
95       free(*p);
96     }
97     delete[] exec_argv_;
98     exec_argv_ = nullptr;
99   }
100 }
101 
ClosePipes()102 void SubProcess::ClosePipes() {
103   for (int i = 0; i < kNFds; i++) {
104     if (parent_pipe_[i] >= 0) {
105       close(parent_pipe_[i]);
106       parent_pipe_[i] = -1;
107     }
108     if (child_pipe_[i] >= 0) {
109       close(child_pipe_[i]);
110       child_pipe_[i] = -1;
111     }
112   }
113 }
114 
SetProgram(const string & file,const std::vector<string> & argv)115 void SubProcess::SetProgram(const string& file,
116                             const std::vector<string>& argv) {
117   mutex_lock procLock(proc_mu_);
118   mutex_lock dataLock(data_mu_);
119   if (running_) {
120     LOG(FATAL) << "SetProgram called after the process was started.";
121     return;
122   }
123 
124   FreeArgs();
125   exec_path_ = strdup(file.c_str());
126   if (exec_path_ == nullptr) {
127     LOG(FATAL) << "SetProgram failed to allocate file string.";
128     return;
129   }
130 
131   int argc = argv.size();
132   exec_argv_ = new char*[argc + 1];
133   for (int i = 0; i < argc; i++) {
134     exec_argv_[i] = strdup(argv[i].c_str());
135     if (exec_argv_[i] == nullptr) {
136       LOG(FATAL) << "SetProgram failed to allocate command argument.";
137       return;
138     }
139   }
140   exec_argv_[argc] = nullptr;
141 }
142 
SetChannelAction(Channel chan,ChannelAction action)143 void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
144   mutex_lock procLock(proc_mu_);
145   mutex_lock dataLock(data_mu_);
146   if (running_) {
147     LOG(FATAL) << "SetChannelAction called after the process was started.";
148   } else if (!chan_valid(chan)) {
149     LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
150   } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
151              (action != ACTION_DUPPARENT)) {
152     LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
153   } else {
154     action_[chan] = action;
155   }
156 }
157 
Start()158 bool SubProcess::Start() {
159   mutex_lock procLock(proc_mu_);
160   mutex_lock dataLock(data_mu_);
161   if (running_) {
162     LOG(ERROR) << "Start called after the process was started.";
163     return false;
164   }
165   if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
166     LOG(ERROR) << "Start called without setting a program.";
167     return false;
168   }
169 
170   // Create parent/child pipes for the specified channels and make the
171   // parent-side of the pipes non-blocking.
172   for (int i = 0; i < kNFds; i++) {
173     if (action_[i] == ACTION_PIPE) {
174       int pipe_fds[2];
175       if (pipe(pipe_fds) < 0) {
176         LOG(ERROR) << "Start cannot create pipe: " << strerror(errno);
177         ClosePipes();
178         return false;
179       }
180       // Handle the direction of the pipe (stdin vs stdout/err).
181       if (i == 0) {
182         parent_pipe_[i] = pipe_fds[1];
183         child_pipe_[i] = pipe_fds[0];
184       } else {
185         parent_pipe_[i] = pipe_fds[0];
186         child_pipe_[i] = pipe_fds[1];
187       }
188 
189       if (fcntl(parent_pipe_[i], F_SETFL, O_NONBLOCK) < 0) {
190         LOG(ERROR) << "Start cannot make pipe non-blocking: "
191                    << strerror(errno);
192         ClosePipes();
193         return false;
194       }
195       if (fcntl(parent_pipe_[i], F_SETFD, FD_CLOEXEC) < 0) {
196         LOG(ERROR) << "Start cannot make pipe close-on-exec: "
197                    << strerror(errno);
198         ClosePipes();
199         return false;
200       }
201     }
202   }
203 
204   // Start the child process and setup the file descriptors of both processes.
205   // See comment (1) in the header about issues with the use of fork().
206   pid_ = fork();
207   if (pid_ < 0) {
208     LOG(ERROR) << "Start cannot fork() child process: " << strerror(errno);
209     ClosePipes();
210     return false;
211   }
212 
213   if (pid_ > 0) {
214     // Parent process: close the child-side pipes and return.
215     running_ = true;
216     for (int i = 0; i < kNFds; i++) {
217       if (child_pipe_[i] >= 0) {
218         close(child_pipe_[i]);
219         child_pipe_[i] = -1;
220       }
221     }
222     return true;
223   }
224 
225   // Child process: close parent-side pipes and channels marked for closing.
226   // For pipe channels, replace their file descriptors with the pipes.
227   int devnull_fd = -1;
228   for (int i = 0; i < kNFds; i++) {
229     if (parent_pipe_[i] >= 0) {
230       close(parent_pipe_[i]);
231       parent_pipe_[i] = -1;
232     }
233 
234     switch (action_[i]) {
235       case ACTION_DUPPARENT:
236         // Nothing to do, fork() took care of it.
237         break;
238 
239       case ACTION_PIPE:
240         while (dup2(child_pipe_[i], i) < 0) {
241           if (!retry(errno)) {
242             _exit(1);
243           }
244         }
245         close(child_pipe_[i]);
246         child_pipe_[i] = -1;
247         break;
248 
249       case ACTION_CLOSE:
250       default:
251         // Do not close stdin/out/err, instead redirect them to /dev/null so
252         // their file descriptors remain unavailable for reuse by open(), etc.
253         if (i <= CHAN_STDERR) {
254           if (devnull_fd < 0) {
255             while ((devnull_fd = open("/dev/null", O_RDWR, 0)) < 0) {
256               if (!retry(errno)) {
257                 _exit(1);
258               }
259             }
260           }
261           while (dup2(devnull_fd, i) < 0) {
262             if (!retry(errno)) {
263               _exit(1);
264             }
265           }
266         } else {
267           close(i);
268         }
269         break;
270     }
271   }
272 
273   if (devnull_fd >= 0) {
274     close(devnull_fd);
275   }
276 
277   // Execute the child program.
278   // See comment (2) in the header about issues with the use of execv().
279   execv(exec_path_, exec_argv_);
280   _exit(1);
281 }
282 
Wait()283 bool SubProcess::Wait() {
284   int status;
285   return WaitInternal(&status);
286 }
287 
WaitInternal(int * status)288 bool SubProcess::WaitInternal(int* status) {
289   // The waiter must release proc_mu_ while waiting in order for Kill() to work.
290   proc_mu_.lock();
291   bool running = running_;
292   pid_t pid = pid_;
293   proc_mu_.unlock();
294 
295   bool ret = false;
296   if (running && (pid > 1)) {
297     pid_t cpid;
298     int cstat;
299     bool done = false;
300     while (!done) {
301       cpid = waitpid(pid, &cstat, 0);
302       if ((cpid < 0) && !retry(errno)) {
303         done = true;
304       } else if ((cpid == pid) && (WIFEXITED(cstat) || WIFSIGNALED(cstat))) {
305         *status = cstat;
306         ret = true;
307         done = true;
308       }
309     }
310   }
311 
312   proc_mu_.lock();
313   if ((running_ == running) && (pid_ == pid)) {
314     running_ = false;
315     pid_ = -1;
316   }
317   proc_mu_.unlock();
318   return ret;
319 }
320 
Kill(int signal)321 bool SubProcess::Kill(int signal) {
322   proc_mu_.lock();
323   bool running = running_;
324   pid_t pid = pid_;
325   proc_mu_.unlock();
326 
327   bool ret = false;
328   if (running && (pid > 1)) {
329     ret = (kill(pid, signal) == 0);
330   }
331   return ret;
332 }
333 
Communicate(const string * stdin_input,string * stdout_output,string * stderr_output)334 int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
335                             string* stderr_output) {
336   struct pollfd fds[kNFds];
337   size_t nbytes[kNFds];
338   string* iobufs[kNFds];
339   int fd_count = 0;
340 
341   proc_mu_.lock();
342   bool running = running_;
343   proc_mu_.unlock();
344   if (!running) {
345     LOG(ERROR) << "Communicate called without a running process.";
346     return 1;
347   }
348 
349   // If SIGPIPE handling is the default action, change it to ignore SIGPIPE and
350   // keep it ignored, don't change it back.  This is needed while communicating
351   // with the child process so the parent process can survive the death of the
352   // child process while it is writing to its stdin.  If the application has
353   // registered a SIGPIPE handler, then let it deal with any signals generated
354   // by the premature death of the child process, don't overwrite its handler.
355   struct sigaction act;
356   if (sigaction(SIGPIPE, nullptr, &act) < 0) {
357     LOG(ERROR) << "Communicate cannot get SIGPIPE handler: " << strerror(errno);
358     return 1;
359   }
360   if (act.sa_handler == SIG_DFL) {
361     memset(&act, 0, sizeof(act));
362     act.sa_handler = SIG_IGN;
363     sigemptyset(&act.sa_mask);
364     if (sigaction(SIGPIPE, &act, nullptr) < 0) {
365       LOG(ERROR) << "Communicate cannot ignore SIGPIPE: " << strerror(errno);
366       return 1;
367     }
368   }
369 
370   // Lock data_mu_ but not proc_mu_ while communicating with the child process
371   // in order for Kill() to be able to terminate the child from another thread.
372   data_mu_.lock();
373 
374   // Initialize the poll() structures and buffer tracking.
375   for (int i = 0; i < kNFds; i++) {
376     if (action_[i] == ACTION_PIPE) {
377       switch (i) {
378         case CHAN_STDIN:
379           // Special case: if no data is given to send to the child process,
380           // close the pipe to unblock the child, and skip the file descriptor.
381           if (stdin_input == nullptr) {
382             close(parent_pipe_[i]);
383             parent_pipe_[i] = -1;
384             continue;
385           }
386           iobufs[fd_count] = const_cast<string*>(stdin_input);
387           break;
388         case CHAN_STDOUT:
389           iobufs[fd_count] = stdout_output;
390           break;
391         case CHAN_STDERR:
392           iobufs[fd_count] = stderr_output;
393           break;
394         default:
395           iobufs[fd_count] = nullptr;
396           break;
397       }
398       nbytes[fd_count] = 0;
399       fds[fd_count].fd = parent_pipe_[i];
400       fds[fd_count].events = (i > 0) ? POLLIN : POLLOUT;
401       fds[fd_count].revents = 0;
402       fd_count++;
403     }
404   }
405 
406   // Loop communicating with the child process.
407   int fd_remain = fd_count;
408   char buf[4096];
409   while (fd_remain > 0) {
410     int n = poll(fds, fd_count, -1);
411     if ((n < 0) && !retry(errno)) {
412       LOG(ERROR) << "Communicate cannot poll(): " << strerror(errno);
413       fd_remain = 0;
414     } else if (n == 0) {
415       LOG(ERROR) << "Communicate cannot poll(): timeout not possible";
416       fd_remain = 0;
417     } else if (n > 0) {
418       // Handle the pipes ready for I/O.
419       for (int i = 0; i < fd_count; i++) {
420         if ((fds[i].revents & (POLLIN | POLLHUP)) != 0) {
421           // Read from one of the child's outputs.
422           ssize_t n = read(fds[i].fd, buf, sizeof(buf));
423           if (n > 0) {
424             if (iobufs[i] != nullptr) {
425               iobufs[i]->append(buf, n);
426               nbytes[i] += n;
427             }
428           } else if ((n == 0) || !retry(errno)) {
429             fds[i].fd = -1;
430             fd_remain--;
431           }
432         } else if ((fds[i].revents & POLLOUT) != 0) {
433           // Write to the child's stdin.
434           ssize_t n = iobufs[i]->size() - nbytes[i];
435           if (n > 0) {
436             n = write(fds[i].fd, iobufs[i]->c_str() + nbytes[i], n);
437           }
438           if (n >= 0) {
439             nbytes[i] += n;
440             if (nbytes[i] >= iobufs[i]->size()) {
441               fds[i].fd = -1;
442               fd_remain--;
443               // Close the child's stdin pipe to unblock the process.
444               close(parent_pipe_[CHAN_STDIN]);
445               parent_pipe_[CHAN_STDIN] = -1;
446             }
447           } else if (!retry(errno)) {
448             fds[i].fd = -1;
449             fd_remain--;
450           }
451         } else if ((fds[i].revents & (POLLERR | POLLNVAL)) != 0) {
452           fds[i].fd = -1;
453           fd_remain--;
454         }
455       }
456     }
457   }
458 
459   data_mu_.unlock();
460 
461   // Wait for the child process to exit and return its status.
462   int status;
463   return WaitInternal(&status) ? status : -1;
464 }
465 
CreateSubProcess(const std::vector<string> & argv)466 std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
467   std::unique_ptr<SubProcess> proc(new SubProcess());
468   proc->SetProgram(argv[0], argv);
469   proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
470   proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
471   return proc;
472 }
473 
474 }  // namespace tensorflow
475