1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include <fcntl.h>
17 #include <poll.h>
18 #include <signal.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/types.h>
22 #include <sys/wait.h>
23 #include <memory>
24 #include <vector>
25
26 #include "tensorflow/core/platform/logging.h"
27 #include "tensorflow/core/platform/subprocess.h"
28
29 // 1) FYI from m3b@ about fork():
30 // A danger of calling fork() (as opposed to clone() or vfork()) is that if
31 // many people have used pthread_atfork() to acquire locks, fork() can deadlock,
32 // because it's unlikely that the locking order will be correct in a large
33 // program where different layers are unaware of one another and using
34 // pthread_atfork() independently.
35 //
36 // The danger of not calling fork() is that if libc managed to use
37 // pthread_atfork() correctly (for example, to lock the environment), you'd
38 // miss out on that protection. (But as far as I can see most libc's don't get
39 // that right; certainly glibc doesn't seem to.)
40 //
41 // clone() or vfork() are also frustrating because clone() exists only on Linux,
42 // and both clone(...CLONE_VM...) and vfork() have interesting issues around
43 // signals being delivered after the fork and before the exec. It may be
44 // possible to work around the latter by blocking all signals before the fork
45 // and unblocking them afterwards.
46 //
47 // Fortunately, most people haven't heard of pthread_atfork().
48 //
49 //
50 // 2) FYI from m3b@ about execv():
51 // The execv() call implicitly uses the libc global variable environ, which was
52 // copied by fork(), and that copy could have raced with a setenv() call in
53 // another thread, since libc implementations are usually not very careful about
54 // this. (glibc isn't careful, for example.)
55 //
56 // If this were inside libc, we could use locks or memory barriers to avoid the
57 // race, but as it is, I see nothing you can do. Even if you tried to copy the
58 // environment before the fork(), the copying could race with other threads
59 // calling setenv(). The good news is that few people call setenv().
60 //
61 // Amusingly, the standard says of fork(): "...to avoid errors, the child
62 // process may only execute async-signal-safe operations until such time as one
63 // of the exec functions is called." Notice that execve() is listed as
64 // async-signal-safe, but execv() is not, and the difference is just the
65 // handling of the environment.
66
67 namespace tensorflow {
68
SubProcess(int nfds)69 SubProcess::SubProcess(int nfds)
70 : running_(false), pid_(-1), exec_path_(nullptr), exec_argv_(nullptr) {
71 // The input 'nfds' parameter is currently ignored and the internal constant
72 // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
73 for (int i = 0; i < kNFds; i++) {
74 action_[i] = ACTION_CLOSE;
75 parent_pipe_[i] = -1;
76 child_pipe_[i] = -1;
77 }
78 }
79
~SubProcess()80 SubProcess::~SubProcess() {
81 mutex_lock procLock(proc_mu_);
82 mutex_lock dataLock(data_mu_);
83 pid_ = -1;
84 running_ = false;
85 FreeArgs();
86 ClosePipes();
87 }
88
FreeArgs()89 void SubProcess::FreeArgs() {
90 free(exec_path_);
91 exec_path_ = nullptr;
92
93 if (exec_argv_) {
94 for (char** p = exec_argv_; *p != nullptr; p++) {
95 free(*p);
96 }
97 delete[] exec_argv_;
98 exec_argv_ = nullptr;
99 }
100 }
101
ClosePipes()102 void SubProcess::ClosePipes() {
103 for (int i = 0; i < kNFds; i++) {
104 if (parent_pipe_[i] >= 0) {
105 close(parent_pipe_[i]);
106 parent_pipe_[i] = -1;
107 }
108 if (child_pipe_[i] >= 0) {
109 close(child_pipe_[i]);
110 child_pipe_[i] = -1;
111 }
112 }
113 }
114
SetProgram(const string & file,const std::vector<string> & argv)115 void SubProcess::SetProgram(const string& file,
116 const std::vector<string>& argv) {
117 mutex_lock procLock(proc_mu_);
118 mutex_lock dataLock(data_mu_);
119 if (running_) {
120 LOG(FATAL) << "SetProgram called after the process was started.";
121 return;
122 }
123
124 FreeArgs();
125 exec_path_ = strdup(file.c_str());
126 if (exec_path_ == nullptr) {
127 LOG(FATAL) << "SetProgram failed to allocate file string.";
128 return;
129 }
130
131 int argc = argv.size();
132 exec_argv_ = new char*[argc + 1];
133 for (int i = 0; i < argc; i++) {
134 exec_argv_[i] = strdup(argv[i].c_str());
135 if (exec_argv_[i] == nullptr) {
136 LOG(FATAL) << "SetProgram failed to allocate command argument.";
137 return;
138 }
139 }
140 exec_argv_[argc] = nullptr;
141 }
142
SetChannelAction(Channel chan,ChannelAction action)143 void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
144 mutex_lock procLock(proc_mu_);
145 mutex_lock dataLock(data_mu_);
146 if (running_) {
147 LOG(FATAL) << "SetChannelAction called after the process was started.";
148 } else if (!chan_valid(chan)) {
149 LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
150 } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
151 (action != ACTION_DUPPARENT)) {
152 LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
153 } else {
154 action_[chan] = action;
155 }
156 }
157
Start()158 bool SubProcess::Start() {
159 mutex_lock procLock(proc_mu_);
160 mutex_lock dataLock(data_mu_);
161 if (running_) {
162 LOG(ERROR) << "Start called after the process was started.";
163 return false;
164 }
165 if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
166 LOG(ERROR) << "Start called without setting a program.";
167 return false;
168 }
169
170 // Create parent/child pipes for the specified channels and make the
171 // parent-side of the pipes non-blocking.
172 for (int i = 0; i < kNFds; i++) {
173 if (action_[i] == ACTION_PIPE) {
174 int pipe_fds[2];
175 if (pipe(pipe_fds) < 0) {
176 LOG(ERROR) << "Start cannot create pipe: " << strerror(errno);
177 ClosePipes();
178 return false;
179 }
180 // Handle the direction of the pipe (stdin vs stdout/err).
181 if (i == 0) {
182 parent_pipe_[i] = pipe_fds[1];
183 child_pipe_[i] = pipe_fds[0];
184 } else {
185 parent_pipe_[i] = pipe_fds[0];
186 child_pipe_[i] = pipe_fds[1];
187 }
188
189 if (fcntl(parent_pipe_[i], F_SETFL, O_NONBLOCK) < 0) {
190 LOG(ERROR) << "Start cannot make pipe non-blocking: "
191 << strerror(errno);
192 ClosePipes();
193 return false;
194 }
195 if (fcntl(parent_pipe_[i], F_SETFD, FD_CLOEXEC) < 0) {
196 LOG(ERROR) << "Start cannot make pipe close-on-exec: "
197 << strerror(errno);
198 ClosePipes();
199 return false;
200 }
201 }
202 }
203
204 // Start the child process and setup the file descriptors of both processes.
205 // See comment (1) in the header about issues with the use of fork().
206 pid_ = fork();
207 if (pid_ < 0) {
208 LOG(ERROR) << "Start cannot fork() child process: " << strerror(errno);
209 ClosePipes();
210 return false;
211 }
212
213 if (pid_ > 0) {
214 // Parent process: close the child-side pipes and return.
215 running_ = true;
216 for (int i = 0; i < kNFds; i++) {
217 if (child_pipe_[i] >= 0) {
218 close(child_pipe_[i]);
219 child_pipe_[i] = -1;
220 }
221 }
222 return true;
223 }
224
225 // Child process: close parent-side pipes and channels marked for closing.
226 // For pipe channels, replace their file descriptors with the pipes.
227 int devnull_fd = -1;
228 for (int i = 0; i < kNFds; i++) {
229 if (parent_pipe_[i] >= 0) {
230 close(parent_pipe_[i]);
231 parent_pipe_[i] = -1;
232 }
233
234 switch (action_[i]) {
235 case ACTION_DUPPARENT:
236 // Nothing to do, fork() took care of it.
237 break;
238
239 case ACTION_PIPE:
240 while (dup2(child_pipe_[i], i) < 0) {
241 if (!retry(errno)) {
242 _exit(1);
243 }
244 }
245 close(child_pipe_[i]);
246 child_pipe_[i] = -1;
247 break;
248
249 case ACTION_CLOSE:
250 default:
251 // Do not close stdin/out/err, instead redirect them to /dev/null so
252 // their file descriptors remain unavailable for reuse by open(), etc.
253 if (i <= CHAN_STDERR) {
254 if (devnull_fd < 0) {
255 while ((devnull_fd = open("/dev/null", O_RDWR, 0)) < 0) {
256 if (!retry(errno)) {
257 _exit(1);
258 }
259 }
260 }
261 while (dup2(devnull_fd, i) < 0) {
262 if (!retry(errno)) {
263 _exit(1);
264 }
265 }
266 } else {
267 close(i);
268 }
269 break;
270 }
271 }
272
273 if (devnull_fd >= 0) {
274 close(devnull_fd);
275 }
276
277 // Execute the child program.
278 // See comment (2) in the header about issues with the use of execv().
279 execv(exec_path_, exec_argv_);
280 _exit(1);
281 }
282
Wait()283 bool SubProcess::Wait() {
284 int status;
285 return WaitInternal(&status);
286 }
287
WaitInternal(int * status)288 bool SubProcess::WaitInternal(int* status) {
289 // The waiter must release proc_mu_ while waiting in order for Kill() to work.
290 proc_mu_.lock();
291 bool running = running_;
292 pid_t pid = pid_;
293 proc_mu_.unlock();
294
295 bool ret = false;
296 if (running && (pid > 1)) {
297 pid_t cpid;
298 int cstat;
299 bool done = false;
300 while (!done) {
301 cpid = waitpid(pid, &cstat, 0);
302 if ((cpid < 0) && !retry(errno)) {
303 done = true;
304 } else if ((cpid == pid) && (WIFEXITED(cstat) || WIFSIGNALED(cstat))) {
305 *status = cstat;
306 ret = true;
307 done = true;
308 }
309 }
310 }
311
312 proc_mu_.lock();
313 if ((running_ == running) && (pid_ == pid)) {
314 running_ = false;
315 pid_ = -1;
316 }
317 proc_mu_.unlock();
318 return ret;
319 }
320
Kill(int signal)321 bool SubProcess::Kill(int signal) {
322 proc_mu_.lock();
323 bool running = running_;
324 pid_t pid = pid_;
325 proc_mu_.unlock();
326
327 bool ret = false;
328 if (running && (pid > 1)) {
329 ret = (kill(pid, signal) == 0);
330 }
331 return ret;
332 }
333
Communicate(const string * stdin_input,string * stdout_output,string * stderr_output)334 int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
335 string* stderr_output) {
336 struct pollfd fds[kNFds];
337 size_t nbytes[kNFds];
338 string* iobufs[kNFds];
339 int fd_count = 0;
340
341 proc_mu_.lock();
342 bool running = running_;
343 proc_mu_.unlock();
344 if (!running) {
345 LOG(ERROR) << "Communicate called without a running process.";
346 return 1;
347 }
348
349 // If SIGPIPE handling is the default action, change it to ignore SIGPIPE and
350 // keep it ignored, don't change it back. This is needed while communicating
351 // with the child process so the parent process can survive the death of the
352 // child process while it is writing to its stdin. If the application has
353 // registered a SIGPIPE handler, then let it deal with any signals generated
354 // by the premature death of the child process, don't overwrite its handler.
355 struct sigaction act;
356 if (sigaction(SIGPIPE, nullptr, &act) < 0) {
357 LOG(ERROR) << "Communicate cannot get SIGPIPE handler: " << strerror(errno);
358 return 1;
359 }
360 if (act.sa_handler == SIG_DFL) {
361 memset(&act, 0, sizeof(act));
362 act.sa_handler = SIG_IGN;
363 sigemptyset(&act.sa_mask);
364 if (sigaction(SIGPIPE, &act, nullptr) < 0) {
365 LOG(ERROR) << "Communicate cannot ignore SIGPIPE: " << strerror(errno);
366 return 1;
367 }
368 }
369
370 // Lock data_mu_ but not proc_mu_ while communicating with the child process
371 // in order for Kill() to be able to terminate the child from another thread.
372 data_mu_.lock();
373
374 // Initialize the poll() structures and buffer tracking.
375 for (int i = 0; i < kNFds; i++) {
376 if (action_[i] == ACTION_PIPE) {
377 switch (i) {
378 case CHAN_STDIN:
379 // Special case: if no data is given to send to the child process,
380 // close the pipe to unblock the child, and skip the file descriptor.
381 if (stdin_input == nullptr) {
382 close(parent_pipe_[i]);
383 parent_pipe_[i] = -1;
384 continue;
385 }
386 iobufs[fd_count] = const_cast<string*>(stdin_input);
387 break;
388 case CHAN_STDOUT:
389 iobufs[fd_count] = stdout_output;
390 break;
391 case CHAN_STDERR:
392 iobufs[fd_count] = stderr_output;
393 break;
394 default:
395 iobufs[fd_count] = nullptr;
396 break;
397 }
398 nbytes[fd_count] = 0;
399 fds[fd_count].fd = parent_pipe_[i];
400 fds[fd_count].events = (i > 0) ? POLLIN : POLLOUT;
401 fds[fd_count].revents = 0;
402 fd_count++;
403 }
404 }
405
406 // Loop communicating with the child process.
407 int fd_remain = fd_count;
408 char buf[4096];
409 while (fd_remain > 0) {
410 int n = poll(fds, fd_count, -1);
411 if ((n < 0) && !retry(errno)) {
412 LOG(ERROR) << "Communicate cannot poll(): " << strerror(errno);
413 fd_remain = 0;
414 } else if (n == 0) {
415 LOG(ERROR) << "Communicate cannot poll(): timeout not possible";
416 fd_remain = 0;
417 } else if (n > 0) {
418 // Handle the pipes ready for I/O.
419 for (int i = 0; i < fd_count; i++) {
420 if ((fds[i].revents & (POLLIN | POLLHUP)) != 0) {
421 // Read from one of the child's outputs.
422 ssize_t n = read(fds[i].fd, buf, sizeof(buf));
423 if (n > 0) {
424 if (iobufs[i] != nullptr) {
425 iobufs[i]->append(buf, n);
426 nbytes[i] += n;
427 }
428 } else if ((n == 0) || !retry(errno)) {
429 fds[i].fd = -1;
430 fd_remain--;
431 }
432 } else if ((fds[i].revents & POLLOUT) != 0) {
433 // Write to the child's stdin.
434 ssize_t n = iobufs[i]->size() - nbytes[i];
435 if (n > 0) {
436 n = write(fds[i].fd, iobufs[i]->c_str() + nbytes[i], n);
437 }
438 if (n >= 0) {
439 nbytes[i] += n;
440 if (nbytes[i] >= iobufs[i]->size()) {
441 fds[i].fd = -1;
442 fd_remain--;
443 // Close the child's stdin pipe to unblock the process.
444 close(parent_pipe_[CHAN_STDIN]);
445 parent_pipe_[CHAN_STDIN] = -1;
446 }
447 } else if (!retry(errno)) {
448 fds[i].fd = -1;
449 fd_remain--;
450 }
451 } else if ((fds[i].revents & (POLLERR | POLLNVAL)) != 0) {
452 fds[i].fd = -1;
453 fd_remain--;
454 }
455 }
456 }
457 }
458
459 data_mu_.unlock();
460
461 // Wait for the child process to exit and return its status.
462 int status;
463 return WaitInternal(&status) ? status : -1;
464 }
465
CreateSubProcess(const std::vector<string> & argv)466 std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
467 std::unique_ptr<SubProcess> proc(new SubProcess());
468 proc->SetProgram(argv[0], argv);
469 proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
470 proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
471 return proc;
472 }
473
474 } // namespace tensorflow
475