1# -*- coding: utf-8 -*- 2 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Guide the user to perform gestures. Record and validate the gestures.""" 8 9import fcntl 10import glob 11import os 12import subprocess 13import sys 14 15import common_util 16import firmware_log 17import firmware_utils 18import fuzzy 19import mini_color 20import mtb 21import touchbotII_robot_wrapper as robot_wrapper 22import test_conf as conf 23import validators 24 25from firmware_utils import GestureList 26 27sys.path.append('../../bin/input') 28import input_device 29 30# Include some constants 31from firmware_constants import DEV, GV, MODE, OPTIONS, TFK 32 33 34class TestFlow: 35 """Guide the user to perform gestures. Record and validate the gestures.""" 36 37 def __init__(self, device_geometry, device, keyboard, win, parser, output, 38 test_version, board, firmware_version, options): 39 self.device_geometry = device_geometry 40 self.device = device 41 self.device_node = self.device.device_node 42 self.keyboard = keyboard 43 self.firmware_version = firmware_version 44 self.output = output 45 self.board = board 46 self.test_version = test_version 47 self.output.print_report('%s' % test_version) 48 self._get_record_cmd() 49 self.win = win 50 self.parser = parser 51 self.packets = None 52 self.gesture_file_name = None 53 self.prefix_space = self.output.get_prefix_space() 54 self.scores = [] 55 self.mode = options[OPTIONS.MODE] 56 self.fngenerator_only = options[OPTIONS.FNGENERATOR] 57 self.iterations = options[OPTIONS.ITERATIONS] 58 self.replay_dir = options[OPTIONS.REPLAY] 59 self.resume_dir = options[OPTIONS.RESUME] 60 self.recording = not any([bool(self.replay_dir), bool(self.resume_dir)]) 61 self.device_type = (DEV.TOUCHSCREEN if options[OPTIONS.TOUCHSCREEN] 62 else DEV.TOUCHPAD) 63 64 self.robot = robot_wrapper.RobotWrapper(self.board, options) 65 self.robot_waiting = False 66 67 self.gv_count = float('infinity') 68 gesture_names = self._get_gesture_names() 69 order = None 70 if self._is_robot_mode(): 71 order = lambda x: conf.finger_tips_required[x.name] 72 self.gesture_list = GestureList(gesture_names).get_gesture_list(order) 73 self._get_all_gesture_variations(options[OPTIONS.SIMPLIFIED]) 74 75 self.init_flag = False 76 self.system_device = self._non_blocking_open(self.device_node) 77 self.evdev_device = input_device.InputEvent() 78 self.screen_shot = firmware_utils.ScreenShot(self.geometry_str) 79 self.mtb_evemu = mtb.MtbEvemu(device) 80 81 self._rename_old_log_and_html_files() 82 self._set_static_prompt_messages() 83 self.gesture_image_name = None 84 self.gesture_continues_flag = False 85 self.use_existent_event_file_flag = False 86 87 def __del__(self): 88 self.system_device.close() 89 90 def _rename_old_log_and_html_files(self): 91 """When in replay or resume mode, rename the old log and html files.""" 92 if self.replay_dir or self.resume_dir: 93 for file_type in ['*.log', '*.html']: 94 path_names = os.path.join(self.output.log_dir, file_type) 95 for old_path_name in glob.glob(path_names): 96 new_path_name = '.'.join([old_path_name, 'old']) 97 os.rename(old_path_name, new_path_name) 98 99 def _is_robot_mode(self): 100 return self.robot.is_robot_action_mode() or self.mode == MODE.ROBOT_SIM 101 102 def _get_gesture_names(self): 103 """Determine the gesture names based on the mode.""" 104 if self.mode == MODE.QUICKSTEP: 105 return conf.gesture_names_quickstep 106 elif self.mode == MODE.NOISE: 107 return conf.gesture_names_noise_extended 108 elif self._is_robot_mode(): 109 # The mode could be MODE.ROBOT or MODE.ROBOT_SIM. 110 # The same gesture names list is used in both modes. 111 return conf.gesture_names_robot[self.device_type] 112 elif self.mode == MODE.MANUAL: 113 # Define the manual list which is gesture_names_complete: 114 # gesture_names_robot - gesture_names_equipment_required 115 manual_set = (set(conf.gesture_names_complete[self.device_type]) - 116 set(conf.gesture_names_robot[self.device_type])) 117 return list(manual_set - set(conf.gesture_names_fngenerator_required)) 118 119 elif self.mode == MODE.CALIBRATION: 120 return conf.gesture_names_calibration 121 else: 122 # Filter out tests that need a function generator for COMPLETE mode 123 # unless they've indicated that they have one 124 return [n for n in conf.gesture_names_complete[self.device_type] 125 if (self.fngenerator_only or 126 n not in conf.gesture_names_fngenerator_required)] 127 128 def _non_blocking_open(self, filename): 129 """Open the file in non-blocing mode.""" 130 fd = open(filename) 131 fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK) 132 return fd 133 134 def _non_blocking_read(self, dev, fd): 135 """Non-blocking read on fd.""" 136 try: 137 dev.read(fd) 138 event = (dev.tv_sec, dev.tv_usec, dev.type, dev.code, dev.value) 139 except Exception, e: 140 event = None 141 return event 142 143 def _reopen_system_device(self): 144 """Close the device and open a new one.""" 145 self.system_device.close() 146 self.system_device = open(self.device_node) 147 self.system_device = self._non_blocking_open(self.device_node) 148 149 def _set_static_prompt_messages(self): 150 """Set static prompt messages.""" 151 # Prompt for next gesture. 152 self._prompt_next = ( 153 "Press SPACE to save this file and go to next test,\n" 154 " 'm' to save this file and record again,\n" 155 " 'd' to delete this file and try again,\n" 156 " 'x' to discard this file and exit.") 157 158 # Prompt to see test result through timeout callback. 159 self._prompt_result = ( 160 "Perform the gesture now.\n" 161 "See the test result on the right after finger lifted.\n" 162 "Or press 'x' to exit.") 163 164 def _get_prompt_abnormal_gestures(self, warn_msg): 165 """Prompt for next gesture.""" 166 prompt = '\n'.join( 167 ["It is very likely that you perform a WRONG gesture!", 168 warn_msg, 169 "Press 'd' to delete this file and try again (recommended),", 170 " SPACE to save this file if you are sure it's correct,", 171 " 'x' to discard this file and exit."]) 172 return prompt 173 174 def _get_prompt_no_data(self): 175 """Prompt to remind user of performing gestures.""" 176 prompt = ("You need to perform the specified gestures " 177 "before pressing SPACE.\n") 178 return prompt + self._prompt_result 179 180 def _get_record_cmd(self): 181 """Get the device event record command.""" 182 # Run mtplot with settings to disable clearing the display if the robot 183 # clicks the pad, and adding a visible click indicator in the output 184 self.record_program = 'mtplot -s1 -c0 -m0' 185 if not common_util.program_exists(self.record_program): 186 msg = 'Error: the program "%s" does not exist in $PATH.' 187 self.output.print_report(msg % self.record_program) 188 exit(1) 189 190 display_name = firmware_utils.get_display_name() 191 self.geometry_str = '%dx%d+%d+%d' % self.device_geometry 192 format_str = '%s %s -d %s -g %s' 193 self.record_cmd = format_str % (self.record_program, 194 self.device_node, 195 display_name, 196 self.geometry_str) 197 self.output.print_report('Record program: %s' % self.record_cmd) 198 199 def _span_seq(self, seq1, seq2): 200 """Span sequence seq1 over sequence seq2. 201 202 E.g., seq1 = (('a', 'b'), 'c') 203 seq2 = ('1', ('2', '3')) 204 res = (('a', 'b', '1'), ('a', 'b', '2', '3'), 205 ('c', '1'), ('c', '2', '3')) 206 E.g., seq1 = ('a', 'b') 207 seq2 = ('1', '2', '3') 208 res = (('a', '1'), ('a', '2'), ('a', '3'), 209 ('b', '1'), ('b', '2'), ('b', '3')) 210 E.g., seq1 = (('a', 'b'), ('c', 'd')) 211 seq2 = ('1', '2', '3') 212 res = (('a', 'b', '1'), ('a', 'b', '2'), ('a', 'b', '3'), 213 ('c', 'd', '1'), ('c', 'd', '2'), ('c', 'd', '3')) 214 """ 215 to_list = lambda s: list(s) if isinstance(s, tuple) else [s] 216 return tuple(tuple(to_list(s1) + to_list(s2)) for s1 in seq1 217 for s2 in seq2) 218 219 def span_variations(self, seq): 220 """Span the variations of a gesture.""" 221 if seq is None: 222 return (None,) 223 elif isinstance(seq[0], tuple): 224 return reduce(self._span_seq, seq) 225 else: 226 return seq 227 228 def _stop(self): 229 """Terminate the recording process.""" 230 self.record_proc.poll() 231 # Terminate the process only when it was not terminated yet. 232 if self.record_proc.returncode is None: 233 self.record_proc.terminate() 234 self.record_proc.wait() 235 self.output.print_window('') 236 237 def _get_gesture_image_name(self): 238 """Get the gesture file base name without file extension.""" 239 filepath = os.path.splitext(self.gesture_file_name)[0] 240 self.gesture_image_name = filepath + '.png' 241 return filepath 242 243 def _close_gesture_file(self): 244 """Close the gesture file.""" 245 if self.gesture_file.closed: 246 return 247 248 filename = self.gesture_file.name 249 self.gesture_file.close() 250 251 # Strip off the header of the gesture file. 252 # 253 # Input driver version is 1.0.1 254 # Input device ID: bus 0x18 vendor 0x0 product 0x0 version 0x0 255 # Input device name: "Atmel maXTouch Touchpad" 256 # ... 257 # Testing ... (interrupt to exit) 258 # Event: time 519.855, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), 259 # value 884 260 # 261 tmp_filename = filename + '.tmp' 262 os.rename(filename, tmp_filename) 263 with open(tmp_filename) as src_f: 264 with open(filename, 'w') as dst_f: 265 for line in src_f: 266 if line.startswith('Event:'): 267 dst_f.write(line) 268 os.remove(tmp_filename) 269 270 def _stop_record_and_post_image(self): 271 """Terminate the recording process.""" 272 if self.record_new_file: 273 self._close_gesture_file() 274 self.screen_shot.dump_root(self._get_gesture_image_name()) 275 self.record_proc.terminate() 276 self.record_proc.wait() 277 else: 278 self._get_gesture_image_name() 279 self.win.set_image(self.gesture_image_name) 280 281 def _create_prompt(self, test, variation): 282 """Create a color prompt.""" 283 prompt = test.prompt 284 if isinstance(variation, tuple): 285 subprompt = reduce(lambda s1, s2: s1 + s2, 286 tuple(test.subprompt[s] for s in variation)) 287 elif variation is None or test.subprompt is None: 288 subprompt = None 289 else: 290 subprompt = test.subprompt[variation] 291 292 if subprompt is None: 293 color_prompt = prompt 294 monochrome_prompt = prompt 295 else: 296 color_prompt = mini_color.color_string(prompt, '{', '}', 'green') 297 color_prompt = color_prompt.format(*subprompt) 298 monochrome_prompt = prompt.format(*subprompt) 299 300 color_msg_format = mini_color.color_string('\n<%s>:\n%s%s', '<', '>', 301 'blue') 302 color_msg = color_msg_format % (test.name, self.prefix_space, 303 color_prompt) 304 msg = '%s: %s' % (test.name, monochrome_prompt) 305 306 glog = firmware_log.GestureLog() 307 glog.name = test.name 308 glog.variation = variation 309 glog.prompt = monochrome_prompt 310 311 return (msg, color_msg, glog) 312 313 def _choice_exit(self): 314 """Procedure to exit.""" 315 self._stop() 316 if os.path.exists(self.gesture_file_name): 317 os.remove(self.gesture_file_name) 318 self.output.print_report(self.deleted_msg) 319 320 def _stop_record_and_rm_file(self): 321 """Stop recording process and remove the current gesture file.""" 322 self._stop() 323 if os.path.exists(self.gesture_file_name): 324 os.remove(self.gesture_file_name) 325 self.output.print_report(self.deleted_msg) 326 327 def _create_gesture_file_name(self, gesture, variation): 328 """Create the gesture file name based on its variation. 329 330 Examples of different levels of file naming: 331 Primary name: 332 pinch_to_zoom.zoom_in-lumpy-fw_11.27 333 Root name: 334 pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510 335 Base name: 336 pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510.dat 337 """ 338 if variation is None: 339 gesture_name = gesture.name 340 else: 341 if type(variation) is tuple: 342 name_list = [gesture.name,] + list(variation) 343 else: 344 name_list = [gesture.name, variation] 345 gesture_name = '.'.join(name_list) 346 347 self.primary_name = conf.filename.sep.join([ 348 gesture_name, 349 self.board, 350 conf.fw_prefix + self.firmware_version]) 351 root_name = conf.filename.sep.join([ 352 self.primary_name, 353 self.mode, 354 firmware_utils.get_current_time_str()]) 355 basename = '.'.join([root_name, conf.filename.ext]) 356 return basename 357 358 def _add_scores(self, new_scores): 359 """Add the new scores of a single gesture to the scores list.""" 360 if new_scores is not None: 361 self.scores += new_scores 362 363 def _final_scores(self, scores): 364 """Print the final score.""" 365 # Note: conf.score_aggregator uses a function in fuzzy module. 366 final_score = eval(conf.score_aggregator)(scores) 367 self.output.print_report('\nFinal score: %s\n' % str(final_score)) 368 369 def _robot_action(self): 370 """Control the robot to perform the action.""" 371 if self._is_robot_mode() or self.robot.is_manual_noise_test_mode(): 372 self.robot.configure_noise(self.gesture, self.variation) 373 374 if self._is_robot_mode(): 375 self.robot.control(self.gesture, self.variation) 376 # Once the script terminates start a timeout to clean up if one 377 # hasn't already been set to keep the test suite from hanging. 378 if not self.gesture_begins_flag: 379 self.win.register_timeout_add(self.gesture_timeout_callback, 380 self.gesture.timeout) 381 382 def _handle_user_choice_save_after_parsing(self, next_gesture=True): 383 """Handle user choice for saving the parsed gesture file.""" 384 self.output.print_window('') 385 if self.use_existent_event_file_flag or self.recording: 386 if self.saved_msg: 387 self.output.print_report(self.saved_msg) 388 if self.new_scores: 389 self._add_scores(self.new_scores) 390 self.output.report_html.insert_image(self.gesture_image_name) 391 self.output.report_html.flush() 392 # After flushing to report_html, reset the gesture_image_name so that 393 # it will not be reused by next gesture variation accidentally. 394 self.gesture_image_name = None 395 396 if self._pre_setup_this_gesture_variation(next_gesture=next_gesture): 397 # There are more gestures. 398 self._setup_this_gesture_variation() 399 self._robot_action() 400 else: 401 # No more gesture. 402 self._final_scores(self.scores) 403 self.output.stop() 404 self.output.report_html.stop() 405 self.win.stop() 406 self.packets = None 407 408 def _handle_user_choice_discard_after_parsing(self): 409 """Handle user choice for discarding the parsed gesture file.""" 410 self.output.print_window('') 411 self._setup_this_gesture_variation() 412 self._robot_action() 413 self.packets = None 414 415 def _handle_user_choice_exit_after_parsing(self): 416 """Handle user choice to exit after the gesture file is parsed.""" 417 self._stop_record_and_rm_file() 418 self.output.stop() 419 self.output.report_html.stop() 420 self.win.stop() 421 422 def check_for_wrong_number_of_fingers(self, details): 423 flag_found = False 424 try: 425 position = details.index('CountTrackingIDValidator') 426 except ValueError as e: 427 return None 428 429 # An example of the count of tracking IDs: 430 # ' count of trackid IDs: 1' 431 number_tracking_ids = int(details[position + 1].split()[-1]) 432 # An example of the criteria_str looks like: 433 # ' criteria_str: == 2' 434 criteria = int(details[position + 2].split()[-1]) 435 if number_tracking_ids < criteria: 436 print ' CountTrackingIDValidator: ' 437 print ' number_tracking_ids: ', number_tracking_ids 438 print ' criteria: ', criteria 439 print ' number_tracking_ids should be larger!' 440 msg = 'Number of Tracking IDs should be %d instead of %d' 441 return msg % (criteria, number_tracking_ids) 442 return None 443 444 def _empty_packets_is_legal_result(self): 445 return ('tap' in self.gesture.name and self._is_robot_mode()) 446 447 def _handle_user_choice_validate_before_parsing(self): 448 """Handle user choice for validating before gesture file is parsed.""" 449 # Parse the device events. Make sure there are events. 450 self.packets = self.parser.parse_file(self.gesture_file_name) 451 if self.packets or self._empty_packets_is_legal_result(): 452 # Validate this gesture and get the results. 453 (self.new_scores, msg_list, vlogs) = validators.validate( 454 self.packets, self.gesture, self.variation) 455 456 # If the number of tracking IDs is less than the expected value, 457 # the user probably made a wrong gesture. 458 error = self.check_for_wrong_number_of_fingers(msg_list) 459 if error: 460 prompt = self._get_prompt_abnormal_gestures(error) 461 color = 'red' 462 else: 463 prompt = self._prompt_next 464 color = 'black' 465 466 self.output.print_window(msg_list) 467 self.output.buffer_report(msg_list) 468 self.output.report_html.insert_validator_logs(vlogs) 469 self.win.set_prompt(prompt, color=color) 470 print prompt 471 self._stop_record_and_post_image() 472 else: 473 self.win.set_prompt(self._get_prompt_no_data(), color='red') 474 475 def _handle_user_choice_exit_before_parsing(self): 476 """Handle user choice to exit before the gesture file is parsed.""" 477 self._close_gesture_file() 478 self._handle_user_choice_exit_after_parsing() 479 480 def _is_parsing_gesture_file_done(self): 481 """Is parsing the gesture file done?""" 482 return self.packets is not None 483 484 def _is_arrow_key(self, choice): 485 """Is this an arrow key?""" 486 return (choice in TFK.ARROW_KEY_LIST) 487 488 def user_choice_callback(self, fd, condition): 489 """A callback to handle the key pressed by the user. 490 491 This is the primary GUI event-driven method handling the user input. 492 """ 493 choice = self.keyboard.get_key_press_event(fd) 494 if choice: 495 self._handle_keyboard_event(choice) 496 return True 497 498 def _handle_keyboard_event(self, choice): 499 """Handle the keyboard event.""" 500 if self._is_arrow_key(choice): 501 self.win.scroll(choice) 502 elif self.robot_waiting: 503 # The user wants the robot to start its action. 504 if choice in (TFK.SAVE, TFK.SAVE2): 505 self.robot_waiting = False 506 self._robot_action() 507 # The user wants to exit. 508 elif choice == TFK.EXIT: 509 self._handle_user_choice_exit_after_parsing() 510 elif self._is_parsing_gesture_file_done(): 511 # Save this gesture file and go to next gesture. 512 if choice in (TFK.SAVE, TFK.SAVE2): 513 self._handle_user_choice_save_after_parsing() 514 # Save this file and perform the same gesture again. 515 elif choice == TFK.MORE: 516 self._handle_user_choice_save_after_parsing(next_gesture=False) 517 # Discard this file and perform the gesture again. 518 elif choice == TFK.DISCARD: 519 self._handle_user_choice_discard_after_parsing() 520 # The user wants to exit. 521 elif choice == TFK.EXIT: 522 self._handle_user_choice_exit_after_parsing() 523 # The user presses any wrong key. 524 else: 525 self.win.set_prompt(self._prompt_next, color='red') 526 else: 527 if choice == TFK.EXIT: 528 self._handle_user_choice_exit_before_parsing() 529 # The user presses any wrong key. 530 else: 531 self.win.set_prompt(self._prompt_result, color='red') 532 533 def _get_all_gesture_variations(self, simplified): 534 """Get all variations for all gestures.""" 535 gesture_variations_list = [] 536 self.variations_dict = {} 537 for gesture in self.gesture_list: 538 variations_list = [] 539 variations = self.span_variations(gesture.variations) 540 for variation in variations: 541 gesture_variations_list.append((gesture, variation)) 542 variations_list.append(variation) 543 if simplified: 544 break 545 self.variations_dict[gesture.name] = variations_list 546 self.gesture_variations = iter(gesture_variations_list) 547 548 def gesture_timeout_callback(self): 549 """A callback watching whether a gesture has timed out.""" 550 if self.replay_dir: 551 # There are event files to replay for this gesture variation. 552 if self.use_existent_event_file_flag: 553 self._handle_user_choice_validate_before_parsing() 554 self._handle_user_choice_save_after_parsing(next_gesture=True) 555 return False 556 557 # A gesture is stopped only when two conditions are met simultaneously: 558 # (1) there are no reported packets for a timeout interval, and 559 # (2) the number of tracking IDs is 0. 560 elif (self.gesture_continues_flag or 561 not self.mtb_evemu.all_fingers_leaving()): 562 self.gesture_continues_flag = False 563 return True 564 565 else: 566 self._handle_user_choice_validate_before_parsing() 567 self.win.remove_event_source(self.gesture_file_watch_tag) 568 if self._is_robot_mode(): 569 self._handle_keyboard_event(TFK.SAVE) 570 return False 571 572 def gesture_file_watch_callback(self, fd, condition, evdev_device): 573 """A callback to watch the device input.""" 574 # Read the device node continuously until end 575 event = True 576 while event: 577 event = self._non_blocking_read(evdev_device, fd) 578 if event: 579 self.mtb_evemu.process_event(event) 580 581 self.gesture_continues_flag = True 582 if (not self.gesture_begins_flag): 583 self.gesture_begins_flag = True 584 self.win.register_timeout_add(self.gesture_timeout_callback, 585 self.gesture.timeout) 586 return True 587 588 def init_gesture_setup_callback(self, widget, event): 589 """A callback to set up environment before a user starts a gesture.""" 590 if not self.init_flag: 591 self.init_flag = True 592 self._pre_setup_this_gesture_variation() 593 self._setup_this_gesture_variation() 594 self._robot_action() 595 596 def _get_existent_event_files(self): 597 """Get the existent event files that starts with the primary_name.""" 598 primary_pathnames = os.path.join(self.output.log_dir, 599 self.primary_name + '*.dat') 600 self.primary_gesture_files = glob.glob(primary_pathnames) 601 # Reverse sorting the file list so that we could pop from the tail. 602 self.primary_gesture_files.sort() 603 self.primary_gesture_files.reverse() 604 605 def _use_existent_event_file(self): 606 """If the replay flag is set in the command line, and there exists a 607 file(s) with the same primary name, then use the existent file(s) 608 instead of recording a new one. 609 """ 610 if self.primary_gesture_files: 611 self.gesture_file_name = self.primary_gesture_files.pop() 612 return True 613 return False 614 615 def _pre_setup_this_gesture_variation(self, next_gesture=True): 616 """Get gesture, variation, filename, prompt, etc.""" 617 next_gesture_first_time = False 618 if next_gesture: 619 if self.gv_count < self.iterations: 620 self.gv_count += 1 621 else: 622 self.gv_count = 1 623 gesture_variation = next(self.gesture_variations, None) 624 if gesture_variation is None: 625 return False 626 self.gesture, self.variation = gesture_variation 627 next_gesture_first_time = True 628 629 basename = self._create_gesture_file_name(self.gesture, self.variation) 630 if next_gesture_first_time: 631 self._get_existent_event_files() 632 633 if self.replay_dir or self.resume_dir: 634 self.use_existent_event_file_flag = self._use_existent_event_file() 635 636 if ((not self.replay_dir and not self.resume_dir) or 637 (self.resume_dir and not self.use_existent_event_file_flag)): 638 self.gesture_file_name = os.path.join(self.output.log_dir, basename) 639 self.saved_msg = '(saved: %s)\n' % self.gesture_file_name 640 self.deleted_msg = '(deleted: %s)\n' % self.gesture_file_name 641 else: 642 self.saved_msg = None 643 self.deleted_msg = None 644 self.new_scores = None 645 646 if self.robot.is_robot_action_mode() or self.robot.is_manual_noise_test_mode(): 647 self.robot.turn_off_noise() 648 649 (msg, color_msg, glog) = self._create_prompt(self.gesture, 650 self.variation) 651 self.win.set_gesture_name(msg) 652 self.output.report_html.insert_gesture_log(glog) 653 print color_msg 654 self.output.print_report(color_msg) 655 return True 656 657 def _setup_this_gesture_variation(self): 658 """Set up the recording process or use an existent event data file.""" 659 if self.replay_dir: 660 self.record_new_file = False 661 self.win.register_timeout_add(self.gesture_timeout_callback, 0) 662 return 663 664 if self.resume_dir and self.use_existent_event_file_flag: 665 self.record_new_file = False 666 self._handle_user_choice_validate_before_parsing() 667 self._handle_keyboard_event(TFK.SAVE) 668 return 669 670 # Initiate the MtbSanityValidator. Note that this should be done each 671 # time just before recording the gesture file since it requires a 672 # snapshot of the input device before any finger touching the device. 673 self.gesture.mtb_sanity_validator = validators.MtbSanityValidator() 674 675 # Now, we will record a new gesture event file. 676 # Fork a new process for mtplot. Add io watch for the gesture file. 677 self.record_new_file = True 678 self.gesture_file = open(self.gesture_file_name, 'w') 679 self.record_proc = subprocess.Popen(self.record_cmd.split(), 680 stdout=self.gesture_file) 681 682 # Watch if data come in to the monitored file. 683 self.gesture_begins_flag = False 684 self._reopen_system_device() 685 self.gesture_file_watch_tag = self.win.register_io_add_watch( 686 self.gesture_file_watch_callback, self.system_device, 687 self.evdev_device) 688