1#!/usr/bin/python 2""" 3A class and functions used for running and controlling child processes. 4 5@copyright: 2008-2009 Red Hat Inc. 6""" 7 8import os, sys, pty, select, termios, fcntl 9 10 11# The following helper functions are shared by the server and the client. 12 13def _lock(filename): 14 if not os.path.exists(filename): 15 open(filename, "w").close() 16 fd = os.open(filename, os.O_RDWR) 17 fcntl.lockf(fd, fcntl.LOCK_EX) 18 return fd 19 20 21def _unlock(fd): 22 fcntl.lockf(fd, fcntl.LOCK_UN) 23 os.close(fd) 24 25 26def _locked(filename): 27 try: 28 fd = os.open(filename, os.O_RDWR) 29 except: 30 return False 31 try: 32 fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 33 except: 34 os.close(fd) 35 return True 36 fcntl.lockf(fd, fcntl.LOCK_UN) 37 os.close(fd) 38 return False 39 40 41def _wait(filename): 42 fd = _lock(filename) 43 _unlock(fd) 44 45 46def _get_filenames(base_dir, id): 47 return [os.path.join(base_dir, s + id) for s in 48 "shell-pid-", "status-", "output-", "inpipe-", 49 "lock-server-running-", "lock-client-starting-"] 50 51 52def _get_reader_filename(base_dir, id, reader): 53 return os.path.join(base_dir, "outpipe-%s-%s" % (reader, id)) 54 55 56# The following is the server part of the module. 57 58if __name__ == "__main__": 59 id = sys.stdin.readline().strip() 60 echo = sys.stdin.readline().strip() == "True" 61 readers = sys.stdin.readline().strip().split(",") 62 command = sys.stdin.readline().strip() + " && echo %s > /dev/null" % id 63 64 # Define filenames to be used for communication 65 base_dir = "/tmp/kvm_spawn" 66 (shell_pid_filename, 67 status_filename, 68 output_filename, 69 inpipe_filename, 70 lock_server_running_filename, 71 lock_client_starting_filename) = _get_filenames(base_dir, id) 72 73 # Populate the reader filenames list 74 reader_filenames = [_get_reader_filename(base_dir, id, reader) 75 for reader in readers] 76 77 # Set $TERM = dumb 78 os.putenv("TERM", "dumb") 79 80 (shell_pid, shell_fd) = pty.fork() 81 if shell_pid == 0: 82 # Child process: run the command in a subshell 83 os.execv("/bin/sh", ["/bin/sh", "-c", command]) 84 else: 85 # Parent process 86 lock_server_running = _lock(lock_server_running_filename) 87 88 # Set terminal echo on/off and disable pre- and post-processing 89 attr = termios.tcgetattr(shell_fd) 90 attr[0] &= ~termios.INLCR 91 attr[0] &= ~termios.ICRNL 92 attr[0] &= ~termios.IGNCR 93 attr[1] &= ~termios.OPOST 94 if echo: 95 attr[3] |= termios.ECHO 96 else: 97 attr[3] &= ~termios.ECHO 98 termios.tcsetattr(shell_fd, termios.TCSANOW, attr) 99 100 # Open output file 101 output_file = open(output_filename, "w") 102 # Open input pipe 103 os.mkfifo(inpipe_filename) 104 inpipe_fd = os.open(inpipe_filename, os.O_RDWR) 105 # Open output pipes (readers) 106 reader_fds = [] 107 for filename in reader_filenames: 108 os.mkfifo(filename) 109 reader_fds.append(os.open(filename, os.O_RDWR)) 110 111 # Write shell PID to file 112 file = open(shell_pid_filename, "w") 113 file.write(str(shell_pid)) 114 file.close() 115 116 # Print something to stdout so the client can start working 117 print "Server %s ready" % id 118 sys.stdout.flush() 119 120 # Initialize buffers 121 buffers = ["" for reader in readers] 122 123 # Read from child and write to files/pipes 124 while True: 125 check_termination = False 126 # Make a list of reader pipes whose buffers are not empty 127 fds = [fd for (i, fd) in enumerate(reader_fds) if buffers[i]] 128 # Wait until there's something to do 129 r, w, x = select.select([shell_fd, inpipe_fd], fds, [], 0.5) 130 # If a reader pipe is ready for writing -- 131 for (i, fd) in enumerate(reader_fds): 132 if fd in w: 133 bytes_written = os.write(fd, buffers[i]) 134 buffers[i] = buffers[i][bytes_written:] 135 # If there's data to read from the child process -- 136 if shell_fd in r: 137 try: 138 data = os.read(shell_fd, 16384) 139 except OSError: 140 data = "" 141 if not data: 142 check_termination = True 143 # Remove carriage returns from the data -- they often cause 144 # trouble and are normally not needed 145 data = data.replace("\r", "") 146 output_file.write(data) 147 output_file.flush() 148 for i in range(len(readers)): 149 buffers[i] += data 150 # If os.read() raised an exception or there was nothing to read -- 151 if check_termination or shell_fd not in r: 152 pid, status = os.waitpid(shell_pid, os.WNOHANG) 153 if pid: 154 status = os.WEXITSTATUS(status) 155 break 156 # If there's data to read from the client -- 157 if inpipe_fd in r: 158 data = os.read(inpipe_fd, 1024) 159 os.write(shell_fd, data) 160 161 # Write the exit status to a file 162 file = open(status_filename, "w") 163 file.write(str(status)) 164 file.close() 165 166 # Wait for the client to finish initializing 167 _wait(lock_client_starting_filename) 168 169 # Delete FIFOs 170 for filename in reader_filenames + [inpipe_filename]: 171 try: 172 os.unlink(filename) 173 except OSError: 174 pass 175 176 # Close all files and pipes 177 output_file.close() 178 os.close(inpipe_fd) 179 for fd in reader_fds: 180 os.close(fd) 181 182 _unlock(lock_server_running) 183 exit(0) 184 185 186# The following is the client part of the module. 187 188import subprocess, time, signal, re, threading, logging 189import virt_utils 190 191 192class ExpectError(Exception): 193 def __init__(self, patterns, output): 194 Exception.__init__(self, patterns, output) 195 self.patterns = patterns 196 self.output = output 197 198 def _pattern_str(self): 199 if len(self.patterns) == 1: 200 return "pattern %r" % self.patterns[0] 201 else: 202 return "patterns %r" % self.patterns 203 204 def __str__(self): 205 return ("Unknown error occurred while looking for %s (output: %r)" % 206 (self._pattern_str(), self.output)) 207 208 209class ExpectTimeoutError(ExpectError): 210 def __str__(self): 211 return ("Timeout expired while looking for %s (output: %r)" % 212 (self._pattern_str(), self.output)) 213 214 215class ExpectProcessTerminatedError(ExpectError): 216 def __init__(self, patterns, status, output): 217 ExpectError.__init__(self, patterns, output) 218 self.status = status 219 220 def __str__(self): 221 return ("Process terminated while looking for %s " 222 "(status: %s, output: %r)" % (self._pattern_str(), 223 self.status, self.output)) 224 225 226class ShellError(Exception): 227 def __init__(self, cmd, output): 228 Exception.__init__(self, cmd, output) 229 self.cmd = cmd 230 self.output = output 231 232 def __str__(self): 233 return ("Could not execute shell command %r (output: %r)" % 234 (self.cmd, self.output)) 235 236 237class ShellTimeoutError(ShellError): 238 def __str__(self): 239 return ("Timeout expired while waiting for shell command to " 240 "complete: %r (output: %r)" % (self.cmd, self.output)) 241 242 243class ShellProcessTerminatedError(ShellError): 244 # Raised when the shell process itself (e.g. ssh, netcat, telnet) 245 # terminates unexpectedly 246 def __init__(self, cmd, status, output): 247 ShellError.__init__(self, cmd, output) 248 self.status = status 249 250 def __str__(self): 251 return ("Shell process terminated while waiting for command to " 252 "complete: %r (status: %s, output: %r)" % 253 (self.cmd, self.status, self.output)) 254 255 256class ShellCmdError(ShellError): 257 # Raised when a command executed in a shell terminates with a nonzero 258 # exit code (status) 259 def __init__(self, cmd, status, output): 260 ShellError.__init__(self, cmd, output) 261 self.status = status 262 263 def __str__(self): 264 return ("Shell command failed: %r (status: %s, output: %r)" % 265 (self.cmd, self.status, self.output)) 266 267 268class ShellStatusError(ShellError): 269 # Raised when the command's exit status cannot be obtained 270 def __str__(self): 271 return ("Could not get exit status of command: %r (output: %r)" % 272 (self.cmd, self.output)) 273 274 275def run_bg(command, termination_func=None, output_func=None, output_prefix="", 276 timeout=1.0): 277 """ 278 Run command as a subprocess. Call output_func with each line of output 279 from the subprocess (prefixed by output_prefix). Call termination_func 280 when the subprocess terminates. Return when timeout expires or when the 281 subprocess exits -- whichever occurs first. 282 283 @brief: Run a subprocess in the background and collect its output and 284 exit status. 285 286 @param command: The shell command to execute 287 @param termination_func: A function to call when the process terminates 288 (should take an integer exit status parameter) 289 @param output_func: A function to call with each line of output from 290 the subprocess (should take a string parameter) 291 @param output_prefix: A string to pre-pend to each line of the output, 292 before passing it to stdout_func 293 @param timeout: Time duration (in seconds) to wait for the subprocess to 294 terminate before returning 295 296 @return: A Tail object. 297 """ 298 process = Tail(command=command, 299 termination_func=termination_func, 300 output_func=output_func, 301 output_prefix=output_prefix) 302 303 end_time = time.time() + timeout 304 while time.time() < end_time and process.is_alive(): 305 time.sleep(0.1) 306 307 return process 308 309 310def run_fg(command, output_func=None, output_prefix="", timeout=1.0): 311 """ 312 Run command as a subprocess. Call output_func with each line of output 313 from the subprocess (prefixed by prefix). Return when timeout expires or 314 when the subprocess exits -- whichever occurs first. If timeout expires 315 and the subprocess is still running, kill it before returning. 316 317 @brief: Run a subprocess in the foreground and collect its output and 318 exit status. 319 320 @param command: The shell command to execute 321 @param output_func: A function to call with each line of output from 322 the subprocess (should take a string parameter) 323 @param output_prefix: A string to pre-pend to each line of the output, 324 before passing it to stdout_func 325 @param timeout: Time duration (in seconds) to wait for the subprocess to 326 terminate before killing it and returning 327 328 @return: A 2-tuple containing the exit status of the process and its 329 STDOUT/STDERR output. If timeout expires before the process 330 terminates, the returned status is None. 331 """ 332 process = run_bg(command, None, output_func, output_prefix, timeout) 333 output = process.get_output() 334 if process.is_alive(): 335 status = None 336 else: 337 status = process.get_status() 338 process.close() 339 return (status, output) 340 341 342class Spawn: 343 """ 344 This class is used for spawning and controlling a child process. 345 346 A new instance of this class can either run a new server (a small Python 347 program that reads output from the child process and reports it to the 348 client and to a text file) or attach to an already running server. 349 When a server is started it runs the child process. 350 The server writes output from the child's STDOUT and STDERR to a text file. 351 The text file can be accessed at any time using get_output(). 352 In addition, the server opens as many pipes as requested by the client and 353 writes the output to them. 354 The pipes are requested and accessed by classes derived from Spawn. 355 These pipes are referred to as "readers". 356 The server also receives input from the client and sends it to the child 357 process. 358 An instance of this class can be pickled. Every derived class is 359 responsible for restoring its own state by properly defining 360 __getinitargs__(). 361 362 The first named pipe is used by _tail(), a function that runs in the 363 background and reports new output from the child as it is produced. 364 The second named pipe is used by a set of functions that read and parse 365 output as requested by the user in an interactive manner, similar to 366 pexpect. 367 When unpickled it automatically 368 resumes _tail() if needed. 369 """ 370 371 def __init__(self, command=None, id=None, auto_close=False, echo=False, 372 linesep="\n"): 373 """ 374 Initialize the class and run command as a child process. 375 376 @param command: Command to run, or None if accessing an already running 377 server. 378 @param id: ID of an already running server, if accessing a running 379 server, or None if starting a new one. 380 @param auto_close: If True, close() the instance automatically when its 381 reference count drops to zero (default False). 382 @param echo: Boolean indicating whether echo should be initially 383 enabled for the pseudo terminal running the subprocess. This 384 parameter has an effect only when starting a new server. 385 @param linesep: Line separator to be appended to strings sent to the 386 child process by sendline(). 387 """ 388 self.id = id or virt_utils.generate_random_string(8) 389 390 # Define filenames for communication with server 391 base_dir = "/tmp/kvm_spawn" 392 try: 393 os.makedirs(base_dir) 394 except: 395 pass 396 (self.shell_pid_filename, 397 self.status_filename, 398 self.output_filename, 399 self.inpipe_filename, 400 self.lock_server_running_filename, 401 self.lock_client_starting_filename) = _get_filenames(base_dir, 402 self.id) 403 404 # Remember some attributes 405 self.auto_close = auto_close 406 self.echo = echo 407 self.linesep = linesep 408 409 # Make sure the 'readers' and 'close_hooks' attributes exist 410 if not hasattr(self, "readers"): 411 self.readers = [] 412 if not hasattr(self, "close_hooks"): 413 self.close_hooks = [] 414 415 # Define the reader filenames 416 self.reader_filenames = dict( 417 (reader, _get_reader_filename(base_dir, self.id, reader)) 418 for reader in self.readers) 419 420 # Let the server know a client intends to open some pipes; 421 # if the executed command terminates quickly, the server will wait for 422 # the client to release the lock before exiting 423 lock_client_starting = _lock(self.lock_client_starting_filename) 424 425 # Start the server (which runs the command) 426 if command: 427 sub = subprocess.Popen("%s %s" % (sys.executable, __file__), 428 shell=True, 429 stdin=subprocess.PIPE, 430 stdout=subprocess.PIPE, 431 stderr=subprocess.STDOUT) 432 # Send parameters to the server 433 sub.stdin.write("%s\n" % self.id) 434 sub.stdin.write("%s\n" % echo) 435 sub.stdin.write("%s\n" % ",".join(self.readers)) 436 sub.stdin.write("%s\n" % command) 437 # Wait for the server to complete its initialization 438 while not "Server %s ready" % self.id in sub.stdout.readline(): 439 pass 440 441 # Open the reading pipes 442 self.reader_fds = {} 443 try: 444 assert(_locked(self.lock_server_running_filename)) 445 for reader, filename in self.reader_filenames.items(): 446 self.reader_fds[reader] = os.open(filename, os.O_RDONLY) 447 except: 448 pass 449 450 # Allow the server to continue 451 _unlock(lock_client_starting) 452 453 454 # The following two functions are defined to make sure the state is set 455 # exclusively by the constructor call as specified in __getinitargs__(). 456 457 def __getstate__(self): 458 pass 459 460 461 def __setstate__(self, state): 462 pass 463 464 465 def __getinitargs__(self): 466 # Save some information when pickling -- will be passed to the 467 # constructor upon unpickling 468 return (None, self.id, self.auto_close, self.echo, self.linesep) 469 470 471 def __del__(self): 472 if self.auto_close: 473 self.close() 474 475 476 def _add_reader(self, reader): 477 """ 478 Add a reader whose file descriptor can be obtained with _get_fd(). 479 Should be called before __init__(). Intended for use by derived 480 classes. 481 482 @param reader: The name of the reader. 483 """ 484 if not hasattr(self, "readers"): 485 self.readers = [] 486 self.readers.append(reader) 487 488 489 def _add_close_hook(self, hook): 490 """ 491 Add a close hook function to be called when close() is called. 492 The function will be called after the process terminates but before 493 final cleanup. Intended for use by derived classes. 494 495 @param hook: The hook function. 496 """ 497 if not hasattr(self, "close_hooks"): 498 self.close_hooks = [] 499 self.close_hooks.append(hook) 500 501 502 def _get_fd(self, reader): 503 """ 504 Return an open file descriptor corresponding to the specified reader 505 pipe. If no such reader exists, or the pipe could not be opened, 506 return None. Intended for use by derived classes. 507 508 @param reader: The name of the reader. 509 """ 510 return self.reader_fds.get(reader) 511 512 513 def get_id(self): 514 """ 515 Return the instance's id attribute, which may be used to access the 516 process in the future. 517 """ 518 return self.id 519 520 521 def get_pid(self): 522 """ 523 Return the PID of the process. 524 525 Note: this may be the PID of the shell process running the user given 526 command. 527 """ 528 try: 529 file = open(self.shell_pid_filename, "r") 530 pid = int(file.read()) 531 file.close() 532 return pid 533 except: 534 return None 535 536 537 def get_status(self): 538 """ 539 Wait for the process to exit and return its exit status, or None 540 if the exit status is not available. 541 """ 542 _wait(self.lock_server_running_filename) 543 try: 544 file = open(self.status_filename, "r") 545 status = int(file.read()) 546 file.close() 547 return status 548 except: 549 return None 550 551 552 def get_output(self): 553 """ 554 Return the STDOUT and STDERR output of the process so far. 555 """ 556 try: 557 file = open(self.output_filename, "r") 558 output = file.read() 559 file.close() 560 return output 561 except: 562 return "" 563 564 565 def is_alive(self): 566 """ 567 Return True if the process is running. 568 """ 569 return _locked(self.lock_server_running_filename) 570 571 572 def close(self, sig=signal.SIGKILL): 573 """ 574 Kill the child process if it's alive and remove temporary files. 575 576 @param sig: The signal to send the process when attempting to kill it. 577 """ 578 # Kill it if it's alive 579 if self.is_alive(): 580 virt_utils.kill_process_tree(self.get_pid(), sig) 581 # Wait for the server to exit 582 _wait(self.lock_server_running_filename) 583 # Call all cleanup routines 584 for hook in self.close_hooks: 585 hook(self) 586 # Close reader file descriptors 587 for fd in self.reader_fds.values(): 588 try: 589 os.close(fd) 590 except: 591 pass 592 self.reader_fds = {} 593 # Remove all used files 594 for filename in (_get_filenames("/tmp/kvm_spawn", self.id) + 595 self.reader_filenames.values()): 596 try: 597 os.unlink(filename) 598 except OSError: 599 pass 600 601 602 def set_linesep(self, linesep): 603 """ 604 Sets the line separator string (usually "\\n"). 605 606 @param linesep: Line separator string. 607 """ 608 self.linesep = linesep 609 610 611 def send(self, str=""): 612 """ 613 Send a string to the child process. 614 615 @param str: String to send to the child process. 616 """ 617 try: 618 fd = os.open(self.inpipe_filename, os.O_RDWR) 619 os.write(fd, str) 620 os.close(fd) 621 except: 622 pass 623 624 625 def sendline(self, str=""): 626 """ 627 Send a string followed by a line separator to the child process. 628 629 @param str: String to send to the child process. 630 """ 631 self.send(str + self.linesep) 632 633 634_thread_kill_requested = False 635 636def kill_tail_threads(): 637 """ 638 Kill all Tail threads. 639 640 After calling this function no new threads should be started. 641 """ 642 global _thread_kill_requested 643 _thread_kill_requested = True 644 for t in threading.enumerate(): 645 if hasattr(t, "name") and t.name.startswith("tail_thread"): 646 t.join(10) 647 _thread_kill_requested = False 648 649 650class Tail(Spawn): 651 """ 652 This class runs a child process in the background and sends its output in 653 real time, line-by-line, to a callback function. 654 655 See Spawn's docstring. 656 657 This class uses a single pipe reader to read data in real time from the 658 child process and report it to a given callback function. 659 When the child process exits, its exit status is reported to an additional 660 callback function. 661 662 When this class is unpickled, it automatically resumes reporting output. 663 """ 664 665 def __init__(self, command=None, id=None, auto_close=False, echo=False, 666 linesep="\n", termination_func=None, termination_params=(), 667 output_func=None, output_params=(), output_prefix=""): 668 """ 669 Initialize the class and run command as a child process. 670 671 @param command: Command to run, or None if accessing an already running 672 server. 673 @param id: ID of an already running server, if accessing a running 674 server, or None if starting a new one. 675 @param auto_close: If True, close() the instance automatically when its 676 reference count drops to zero (default False). 677 @param echo: Boolean indicating whether echo should be initially 678 enabled for the pseudo terminal running the subprocess. This 679 parameter has an effect only when starting a new server. 680 @param linesep: Line separator to be appended to strings sent to the 681 child process by sendline(). 682 @param termination_func: Function to call when the process exits. The 683 function must accept a single exit status parameter. 684 @param termination_params: Parameters to send to termination_func 685 before the exit status. 686 @param output_func: Function to call whenever a line of output is 687 available from the STDOUT or STDERR streams of the process. 688 The function must accept a single string parameter. The string 689 does not include the final newline. 690 @param output_params: Parameters to send to output_func before the 691 output line. 692 @param output_prefix: String to prepend to lines sent to output_func. 693 """ 694 # Add a reader and a close hook 695 self._add_reader("tail") 696 self._add_close_hook(Tail._join_thread) 697 698 # Init the superclass 699 Spawn.__init__(self, command, id, auto_close, echo, linesep) 700 701 # Remember some attributes 702 self.termination_func = termination_func 703 self.termination_params = termination_params 704 self.output_func = output_func 705 self.output_params = output_params 706 self.output_prefix = output_prefix 707 708 # Start the thread in the background 709 self.tail_thread = None 710 if termination_func or output_func: 711 self._start_thread() 712 713 714 def __getinitargs__(self): 715 return Spawn.__getinitargs__(self) + (self.termination_func, 716 self.termination_params, 717 self.output_func, 718 self.output_params, 719 self.output_prefix) 720 721 722 def set_termination_func(self, termination_func): 723 """ 724 Set the termination_func attribute. See __init__() for details. 725 726 @param termination_func: Function to call when the process terminates. 727 Must take a single parameter -- the exit status. 728 """ 729 self.termination_func = termination_func 730 if termination_func and not self.tail_thread: 731 self._start_thread() 732 733 734 def set_termination_params(self, termination_params): 735 """ 736 Set the termination_params attribute. See __init__() for details. 737 738 @param termination_params: Parameters to send to termination_func 739 before the exit status. 740 """ 741 self.termination_params = termination_params 742 743 744 def set_output_func(self, output_func): 745 """ 746 Set the output_func attribute. See __init__() for details. 747 748 @param output_func: Function to call for each line of STDOUT/STDERR 749 output from the process. Must take a single string parameter. 750 """ 751 self.output_func = output_func 752 if output_func and not self.tail_thread: 753 self._start_thread() 754 755 756 def set_output_params(self, output_params): 757 """ 758 Set the output_params attribute. See __init__() for details. 759 760 @param output_params: Parameters to send to output_func before the 761 output line. 762 """ 763 self.output_params = output_params 764 765 766 def set_output_prefix(self, output_prefix): 767 """ 768 Set the output_prefix attribute. See __init__() for details. 769 770 @param output_prefix: String to pre-pend to each line sent to 771 output_func (see set_output_callback()). 772 """ 773 self.output_prefix = output_prefix 774 775 776 def _tail(self): 777 def print_line(text): 778 # Pre-pend prefix and remove trailing whitespace 779 text = self.output_prefix + text.rstrip() 780 # Pass text to output_func 781 try: 782 params = self.output_params + (text,) 783 self.output_func(*params) 784 except TypeError: 785 pass 786 787 try: 788 fd = self._get_fd("tail") 789 buffer = "" 790 while True: 791 global _thread_kill_requested 792 if _thread_kill_requested: 793 return 794 try: 795 # See if there's any data to read from the pipe 796 r, w, x = select.select([fd], [], [], 0.05) 797 except: 798 break 799 if fd in r: 800 # Some data is available; read it 801 new_data = os.read(fd, 1024) 802 if not new_data: 803 break 804 buffer += new_data 805 # Send the output to output_func line by line 806 # (except for the last line) 807 if self.output_func: 808 lines = buffer.split("\n") 809 for line in lines[:-1]: 810 print_line(line) 811 # Leave only the last line 812 last_newline_index = buffer.rfind("\n") 813 buffer = buffer[last_newline_index+1:] 814 else: 815 # No output is available right now; flush the buffer 816 if buffer: 817 print_line(buffer) 818 buffer = "" 819 # The process terminated; print any remaining output 820 if buffer: 821 print_line(buffer) 822 # Get the exit status, print it and send it to termination_func 823 status = self.get_status() 824 if status is None: 825 return 826 print_line("(Process terminated with status %s)" % status) 827 try: 828 params = self.termination_params + (status,) 829 self.termination_func(*params) 830 except TypeError: 831 pass 832 finally: 833 self.tail_thread = None 834 835 836 def _start_thread(self): 837 self.tail_thread = threading.Thread(target=self._tail, 838 name="tail_thread_%s" % self.id) 839 self.tail_thread.start() 840 841 842 def _join_thread(self): 843 # Wait for the tail thread to exit 844 # (it's done this way because self.tail_thread may become None at any 845 # time) 846 t = self.tail_thread 847 if t: 848 t.join() 849 850 851class Expect(Tail): 852 """ 853 This class runs a child process in the background and provides expect-like 854 services. 855 856 It also provides all of Tail's functionality. 857 """ 858 859 def __init__(self, command=None, id=None, auto_close=True, echo=False, 860 linesep="\n", termination_func=None, termination_params=(), 861 output_func=None, output_params=(), output_prefix=""): 862 """ 863 Initialize the class and run command as a child process. 864 865 @param command: Command to run, or None if accessing an already running 866 server. 867 @param id: ID of an already running server, if accessing a running 868 server, or None if starting a new one. 869 @param auto_close: If True, close() the instance automatically when its 870 reference count drops to zero (default False). 871 @param echo: Boolean indicating whether echo should be initially 872 enabled for the pseudo terminal running the subprocess. This 873 parameter has an effect only when starting a new server. 874 @param linesep: Line separator to be appended to strings sent to the 875 child process by sendline(). 876 @param termination_func: Function to call when the process exits. The 877 function must accept a single exit status parameter. 878 @param termination_params: Parameters to send to termination_func 879 before the exit status. 880 @param output_func: Function to call whenever a line of output is 881 available from the STDOUT or STDERR streams of the process. 882 The function must accept a single string parameter. The string 883 does not include the final newline. 884 @param output_params: Parameters to send to output_func before the 885 output line. 886 @param output_prefix: String to prepend to lines sent to output_func. 887 """ 888 # Add a reader 889 self._add_reader("expect") 890 891 # Init the superclass 892 Tail.__init__(self, command, id, auto_close, echo, linesep, 893 termination_func, termination_params, 894 output_func, output_params, output_prefix) 895 896 897 def __getinitargs__(self): 898 return Tail.__getinitargs__(self) 899 900 901 def read_nonblocking(self, timeout=None): 902 """ 903 Read from child until there is nothing to read for timeout seconds. 904 905 @param timeout: Time (seconds) to wait before we give up reading from 906 the child process, or None to use the default value. 907 """ 908 if timeout is None: 909 timeout = 0.1 910 fd = self._get_fd("expect") 911 data = "" 912 while True: 913 try: 914 r, w, x = select.select([fd], [], [], timeout) 915 except: 916 return data 917 if fd in r: 918 new_data = os.read(fd, 1024) 919 if not new_data: 920 return data 921 data += new_data 922 else: 923 return data 924 925 926 def match_patterns(self, str, patterns): 927 """ 928 Match str against a list of patterns. 929 930 Return the index of the first pattern that matches a substring of str. 931 None and empty strings in patterns are ignored. 932 If no match is found, return None. 933 934 @param patterns: List of strings (regular expression patterns). 935 """ 936 for i in range(len(patterns)): 937 if not patterns[i]: 938 continue 939 if re.search(patterns[i], str): 940 return i 941 942 943 def read_until_output_matches(self, patterns, filter=lambda x: x, 944 timeout=60, internal_timeout=None, 945 print_func=None): 946 """ 947 Read using read_nonblocking until a match is found using match_patterns, 948 or until timeout expires. Before attempting to search for a match, the 949 data is filtered using the filter function provided. 950 951 @brief: Read from child using read_nonblocking until a pattern 952 matches. 953 @param patterns: List of strings (regular expression patterns) 954 @param filter: Function to apply to the data read from the child before 955 attempting to match it against the patterns (should take and 956 return a string) 957 @param timeout: The duration (in seconds) to wait until a match is 958 found 959 @param internal_timeout: The timeout to pass to read_nonblocking 960 @param print_func: A function to be used to print the data being read 961 (should take a string parameter) 962 @return: Tuple containing the match index and the data read so far 963 @raise ExpectTimeoutError: Raised if timeout expires 964 @raise ExpectProcessTerminatedError: Raised if the child process 965 terminates while waiting for output 966 @raise ExpectError: Raised if an unknown error occurs 967 """ 968 fd = self._get_fd("expect") 969 o = "" 970 end_time = time.time() + timeout 971 while True: 972 try: 973 r, w, x = select.select([fd], [], [], 974 max(0, end_time - time.time())) 975 except (select.error, TypeError): 976 break 977 if not r: 978 raise ExpectTimeoutError(patterns, o) 979 # Read data from child 980 data = self.read_nonblocking(internal_timeout) 981 if not data: 982 break 983 # Print it if necessary 984 if print_func: 985 for line in data.splitlines(): 986 print_func(line) 987 # Look for patterns 988 o += data 989 match = self.match_patterns(filter(o), patterns) 990 if match is not None: 991 return match, o 992 993 # Check if the child has terminated 994 if virt_utils.wait_for(lambda: not self.is_alive(), 5, 0, 0.1): 995 raise ExpectProcessTerminatedError(patterns, self.get_status(), o) 996 else: 997 # This shouldn't happen 998 raise ExpectError(patterns, o) 999 1000 1001 def read_until_last_word_matches(self, patterns, timeout=60, 1002 internal_timeout=None, print_func=None): 1003 """ 1004 Read using read_nonblocking until the last word of the output matches 1005 one of the patterns (using match_patterns), or until timeout expires. 1006 1007 @param patterns: A list of strings (regular expression patterns) 1008 @param timeout: The duration (in seconds) to wait until a match is 1009 found 1010 @param internal_timeout: The timeout to pass to read_nonblocking 1011 @param print_func: A function to be used to print the data being read 1012 (should take a string parameter) 1013 @return: A tuple containing the match index and the data read so far 1014 @raise ExpectTimeoutError: Raised if timeout expires 1015 @raise ExpectProcessTerminatedError: Raised if the child process 1016 terminates while waiting for output 1017 @raise ExpectError: Raised if an unknown error occurs 1018 """ 1019 def get_last_word(str): 1020 if str: 1021 return str.split()[-1] 1022 else: 1023 return "" 1024 1025 return self.read_until_output_matches(patterns, get_last_word, 1026 timeout, internal_timeout, 1027 print_func) 1028 1029 1030 def read_until_last_line_matches(self, patterns, timeout=60, 1031 internal_timeout=None, print_func=None): 1032 """ 1033 Read using read_nonblocking until the last non-empty line of the output 1034 matches one of the patterns (using match_patterns), or until timeout 1035 expires. Return a tuple containing the match index (or None if no match 1036 was found) and the data read so far. 1037 1038 @brief: Read using read_nonblocking until the last non-empty line 1039 matches a pattern. 1040 1041 @param patterns: A list of strings (regular expression patterns) 1042 @param timeout: The duration (in seconds) to wait until a match is 1043 found 1044 @param internal_timeout: The timeout to pass to read_nonblocking 1045 @param print_func: A function to be used to print the data being read 1046 (should take a string parameter) 1047 @return: A tuple containing the match index and the data read so far 1048 @raise ExpectTimeoutError: Raised if timeout expires 1049 @raise ExpectProcessTerminatedError: Raised if the child process 1050 terminates while waiting for output 1051 @raise ExpectError: Raised if an unknown error occurs 1052 """ 1053 def get_last_nonempty_line(str): 1054 nonempty_lines = [l for l in str.splitlines() if l.strip()] 1055 if nonempty_lines: 1056 return nonempty_lines[-1] 1057 else: 1058 return "" 1059 1060 return self.read_until_output_matches(patterns, get_last_nonempty_line, 1061 timeout, internal_timeout, 1062 print_func) 1063 1064 1065class ShellSession(Expect): 1066 """ 1067 This class runs a child process in the background. It it suited for 1068 processes that provide an interactive shell, such as SSH and Telnet. 1069 1070 It provides all services of Expect and Tail. In addition, it 1071 provides command running services, and a utility function to test the 1072 process for responsiveness. 1073 """ 1074 1075 def __init__(self, command=None, id=None, auto_close=True, echo=False, 1076 linesep="\n", termination_func=None, termination_params=(), 1077 output_func=None, output_params=(), output_prefix="", 1078 prompt=r"[\#\$]\s*$", status_test_command="echo $?"): 1079 """ 1080 Initialize the class and run command as a child process. 1081 1082 @param command: Command to run, or None if accessing an already running 1083 server. 1084 @param id: ID of an already running server, if accessing a running 1085 server, or None if starting a new one. 1086 @param auto_close: If True, close() the instance automatically when its 1087 reference count drops to zero (default True). 1088 @param echo: Boolean indicating whether echo should be initially 1089 enabled for the pseudo terminal running the subprocess. This 1090 parameter has an effect only when starting a new server. 1091 @param linesep: Line separator to be appended to strings sent to the 1092 child process by sendline(). 1093 @param termination_func: Function to call when the process exits. The 1094 function must accept a single exit status parameter. 1095 @param termination_params: Parameters to send to termination_func 1096 before the exit status. 1097 @param output_func: Function to call whenever a line of output is 1098 available from the STDOUT or STDERR streams of the process. 1099 The function must accept a single string parameter. The string 1100 does not include the final newline. 1101 @param output_params: Parameters to send to output_func before the 1102 output line. 1103 @param output_prefix: String to prepend to lines sent to output_func. 1104 @param prompt: Regular expression describing the shell's prompt line. 1105 @param status_test_command: Command to be used for getting the last 1106 exit status of commands run inside the shell (used by 1107 cmd_status_output() and friends). 1108 """ 1109 # Init the superclass 1110 Expect.__init__(self, command, id, auto_close, echo, linesep, 1111 termination_func, termination_params, 1112 output_func, output_params, output_prefix) 1113 1114 # Remember some attributes 1115 self.prompt = prompt 1116 self.status_test_command = status_test_command 1117 1118 1119 def __getinitargs__(self): 1120 return Expect.__getinitargs__(self) + (self.prompt, 1121 self.status_test_command) 1122 1123 1124 def set_prompt(self, prompt): 1125 """ 1126 Set the prompt attribute for later use by read_up_to_prompt. 1127 1128 @param: String that describes the prompt contents. 1129 """ 1130 self.prompt = prompt 1131 1132 1133 def set_status_test_command(self, status_test_command): 1134 """ 1135 Set the command to be sent in order to get the last exit status. 1136 1137 @param status_test_command: Command that will be sent to get the last 1138 exit status. 1139 """ 1140 self.status_test_command = status_test_command 1141 1142 1143 def is_responsive(self, timeout=5.0): 1144 """ 1145 Return True if the process responds to STDIN/terminal input. 1146 1147 Send a newline to the child process (e.g. SSH or Telnet) and read some 1148 output using read_nonblocking(). 1149 If all is OK, some output should be available (e.g. the shell prompt). 1150 In that case return True. Otherwise return False. 1151 1152 @param timeout: Time duration to wait before the process is considered 1153 unresponsive. 1154 """ 1155 # Read all output that's waiting to be read, to make sure the output 1156 # we read next is in response to the newline sent 1157 self.read_nonblocking(timeout=0) 1158 # Send a newline 1159 self.sendline() 1160 # Wait up to timeout seconds for some output from the child 1161 end_time = time.time() + timeout 1162 while time.time() < end_time: 1163 time.sleep(0.5) 1164 if self.read_nonblocking(timeout=0).strip(): 1165 return True 1166 # No output -- report unresponsive 1167 return False 1168 1169 1170 def read_up_to_prompt(self, timeout=60, internal_timeout=None, 1171 print_func=None): 1172 """ 1173 Read using read_nonblocking until the last non-empty line of the output 1174 matches the prompt regular expression set by set_prompt, or until 1175 timeout expires. 1176 1177 @brief: Read using read_nonblocking until the last non-empty line 1178 matches the prompt. 1179 1180 @param timeout: The duration (in seconds) to wait until a match is 1181 found 1182 @param internal_timeout: The timeout to pass to read_nonblocking 1183 @param print_func: A function to be used to print the data being 1184 read (should take a string parameter) 1185 1186 @return: The data read so far 1187 @raise ExpectTimeoutError: Raised if timeout expires 1188 @raise ExpectProcessTerminatedError: Raised if the shell process 1189 terminates while waiting for output 1190 @raise ExpectError: Raised if an unknown error occurs 1191 """ 1192 m, o = self.read_until_last_line_matches([self.prompt], timeout, 1193 internal_timeout, print_func) 1194 return o 1195 1196 1197 def cmd_output(self, cmd, timeout=60, internal_timeout=None, 1198 print_func=None): 1199 """ 1200 Send a command and return its output. 1201 1202 @param cmd: Command to send (must not contain newline characters) 1203 @param timeout: The duration (in seconds) to wait for the prompt to 1204 return 1205 @param internal_timeout: The timeout to pass to read_nonblocking 1206 @param print_func: A function to be used to print the data being read 1207 (should take a string parameter) 1208 1209 @return: The output of cmd 1210 @raise ShellTimeoutError: Raised if timeout expires 1211 @raise ShellProcessTerminatedError: Raised if the shell process 1212 terminates while waiting for output 1213 @raise ShellError: Raised if an unknown error occurs 1214 """ 1215 def remove_command_echo(str, cmd): 1216 if str and str.splitlines()[0] == cmd: 1217 str = "".join(str.splitlines(True)[1:]) 1218 return str 1219 1220 def remove_last_nonempty_line(str): 1221 return "".join(str.rstrip().splitlines(True)[:-1]) 1222 1223 logging.debug("Sending command: %s" % cmd) 1224 self.read_nonblocking(timeout=0) 1225 self.sendline(cmd) 1226 try: 1227 o = self.read_up_to_prompt(timeout, internal_timeout, print_func) 1228 except ExpectError, e: 1229 o = remove_command_echo(e.output, cmd) 1230 if isinstance(e, ExpectTimeoutError): 1231 raise ShellTimeoutError(cmd, o) 1232 elif isinstance(e, ExpectProcessTerminatedError): 1233 raise ShellProcessTerminatedError(cmd, e.status, o) 1234 else: 1235 raise ShellError(cmd, o) 1236 1237 # Remove the echoed command and the final shell prompt 1238 return remove_last_nonempty_line(remove_command_echo(o, cmd)) 1239 1240 1241 def cmd_status_output(self, cmd, timeout=60, internal_timeout=None, 1242 print_func=None): 1243 """ 1244 Send a command and return its exit status and output. 1245 1246 @param cmd: Command to send (must not contain newline characters) 1247 @param timeout: The duration (in seconds) to wait for the prompt to 1248 return 1249 @param internal_timeout: The timeout to pass to read_nonblocking 1250 @param print_func: A function to be used to print the data being read 1251 (should take a string parameter) 1252 1253 @return: A tuple (status, output) where status is the exit status and 1254 output is the output of cmd 1255 @raise ShellTimeoutError: Raised if timeout expires 1256 @raise ShellProcessTerminatedError: Raised if the shell process 1257 terminates while waiting for output 1258 @raise ShellStatusError: Raised if the exit status cannot be obtained 1259 @raise ShellError: Raised if an unknown error occurs 1260 """ 1261 o = self.cmd_output(cmd, timeout, internal_timeout, print_func) 1262 try: 1263 # Send the 'echo $?' (or equivalent) command to get the exit status 1264 s = self.cmd_output(self.status_test_command, 10, internal_timeout) 1265 except ShellError: 1266 raise ShellStatusError(cmd, o) 1267 1268 # Get the first line consisting of digits only 1269 digit_lines = [l for l in s.splitlines() if l.strip().isdigit()] 1270 if digit_lines: 1271 return int(digit_lines[0].strip()), o 1272 else: 1273 raise ShellStatusError(cmd, o) 1274 1275 1276 def cmd_status(self, cmd, timeout=60, internal_timeout=None, 1277 print_func=None): 1278 """ 1279 Send a command and return its exit status. 1280 1281 @param cmd: Command to send (must not contain newline characters) 1282 @param timeout: The duration (in seconds) to wait for the prompt to 1283 return 1284 @param internal_timeout: The timeout to pass to read_nonblocking 1285 @param print_func: A function to be used to print the data being read 1286 (should take a string parameter) 1287 1288 @return: The exit status of cmd 1289 @raise ShellTimeoutError: Raised if timeout expires 1290 @raise ShellProcessTerminatedError: Raised if the shell process 1291 terminates while waiting for output 1292 @raise ShellStatusError: Raised if the exit status cannot be obtained 1293 @raise ShellError: Raised if an unknown error occurs 1294 """ 1295 s, o = self.cmd_status_output(cmd, timeout, internal_timeout, 1296 print_func) 1297 return s 1298 1299 1300 def cmd(self, cmd, timeout=60, internal_timeout=None, print_func=None): 1301 """ 1302 Send a command and return its output. If the command's exit status is 1303 nonzero, raise an exception. 1304 1305 @param cmd: Command to send (must not contain newline characters) 1306 @param timeout: The duration (in seconds) to wait for the prompt to 1307 return 1308 @param internal_timeout: The timeout to pass to read_nonblocking 1309 @param print_func: A function to be used to print the data being read 1310 (should take a string parameter) 1311 1312 @return: The output of cmd 1313 @raise ShellTimeoutError: Raised if timeout expires 1314 @raise ShellProcessTerminatedError: Raised if the shell process 1315 terminates while waiting for output 1316 @raise ShellError: Raised if the exit status cannot be obtained or if 1317 an unknown error occurs 1318 @raise ShellStatusError: Raised if the exit status cannot be obtained 1319 @raise ShellError: Raised if an unknown error occurs 1320 @raise ShellCmdError: Raised if the exit status is nonzero 1321 """ 1322 s, o = self.cmd_status_output(cmd, timeout, internal_timeout, 1323 print_func) 1324 if s != 0: 1325 raise ShellCmdError(cmd, s, o) 1326 return o 1327 1328 1329 def get_command_output(self, cmd, timeout=60, internal_timeout=None, 1330 print_func=None): 1331 """ 1332 Alias for cmd_output() for backward compatibility. 1333 """ 1334 return self.cmd_output(cmd, timeout, internal_timeout, print_func) 1335 1336 1337 def get_command_status_output(self, cmd, timeout=60, internal_timeout=None, 1338 print_func=None): 1339 """ 1340 Alias for cmd_status_output() for backward compatibility. 1341 """ 1342 return self.cmd_status_output(cmd, timeout, internal_timeout, 1343 print_func) 1344 1345 1346 def get_command_status(self, cmd, timeout=60, internal_timeout=None, 1347 print_func=None): 1348 """ 1349 Alias for cmd_status() for backward compatibility. 1350 """ 1351 return self.cmd_status(cmd, timeout, internal_timeout, print_func) 1352