1# Copyright 2020 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utility functions for sensor_fusion hardware rig.""" 15 16 17import bisect 18import codecs 19import logging 20import struct 21import time 22import unittest 23 24import numpy as np 25import scipy.spatial 26import serial 27from serial.tools import list_ports 28 29# Constants for Rotation Rig 30ARDUINO_ANGLE_MAX = 180.0 # degrees 31ARDUINO_ANGLES = [0]*5 +list(range(0, 90, 3)) + [90]*5 +list(range(90, -1, -3)) 32ARDUINO_BAUDRATE = 9600 33ARDUINO_CMD_LENGTH = 3 34ARDUINO_CMD_TIME = 2.0 * ARDUINO_CMD_LENGTH / ARDUINO_BAUDRATE # round trip 35ARDUINO_MOVE_TIME = 0.06 - ARDUINO_CMD_TIME # seconds 36ARDUINO_PID = 0x0043 37ARDUINO_START_BYTE = 255 38ARDUINO_START_NUM_TRYS = 3 39ARDUINO_TEST_CMD = (b'\x01', b'\x02', b'\x03') 40ARDUINO_VALID_CH = ('1', '2', '3', '4', '5', '6') 41ARDUINO_VIDS = (0x2341, 0x2a03) 42 43CANAKIT_BAUDRATE = 115200 44CANAKIT_CMD_TIME = 0.05 # seconds (found experimentally) 45CANAKIT_DATA_DELIMITER = '\r\n' 46CANAKIT_PID = 0xfc73 47CANAKIT_SEND_TIMEOUT = 0.02 # seconds 48CANAKIT_SET_CMD = 'REL' 49CANAKIT_SLEEP_TIME = 2 # seconds (for full 90 degree rotation) 50CANAKIT_VALID_CMD = ('ON', 'OFF') 51CANAKIT_VALID_CH = ('1', '2', '3', '4') 52CANAKIT_VID = 0x04d8 53 54HS755HB_ANGLE_MAX = 202.0 # throw for rotation motor in degrees 55 56_COARSE_FIT_RANGE = 20 # Range area around coarse fit to do optimization. 57_CORR_TIME_OFFSET_MAX = 50 # ms max shift to try and match camera/gyro times. 58_CORR_TIME_OFFSET_STEP = 0.5 # ms step for shifts. 59 60# Unit translators 61_MSEC_TO_NSEC = 1000000 62_NSEC_TO_SEC = 1E-9 63_SEC_TO_NSEC = int(1/_NSEC_TO_SEC) 64 65 66def serial_port_def(name): 67 """Determine the serial port and open. 68 69 Args: 70 name: string of device to locate (ie. 'Arduino', 'Canakit' or 'Default') 71 Returns: 72 serial port object 73 """ 74 serial_port = None 75 devices = list_ports.comports() 76 for device in devices: 77 if not (device.vid and device.pid): # Not all comm ports have vid and pid 78 continue 79 if name.lower() == 'arduino': 80 if (device.vid in ARDUINO_VIDS and device.pid == ARDUINO_PID): 81 logging.debug('Arduino: %s', str(device)) 82 serial_port = device.device 83 return serial.Serial(serial_port, ARDUINO_BAUDRATE, timeout=1) 84 85 elif name.lower() in ('canakit', 'default'): 86 if (device.vid == CANAKIT_VID and device.pid == CANAKIT_PID): 87 logging.debug('Canakit: %s', str(device)) 88 serial_port = device.device 89 return serial.Serial(serial_port, CANAKIT_BAUDRATE, 90 timeout=CANAKIT_SEND_TIMEOUT, 91 parity=serial.PARITY_EVEN, 92 stopbits=serial.STOPBITS_ONE, 93 bytesize=serial.EIGHTBITS) 94 raise ValueError(f'{name} device not connected.') 95 96 97def canakit_cmd_send(canakit_serial_port, cmd_str): 98 """Wrapper for sending serial command to Canakit. 99 100 Args: 101 canakit_serial_port: port to write for canakit 102 cmd_str: str; value to send to device. 103 """ 104 try: 105 logging.debug('writing port...') 106 canakit_serial_port.write(CANAKIT_DATA_DELIMITER.encode()) 107 time.sleep(CANAKIT_CMD_TIME) # This is critical for relay. 108 canakit_serial_port.write(cmd_str.encode()) 109 110 except IOError: 111 raise IOError(f'Port {CANAKIT_VID}:{CANAKIT_PID} is not open!') 112 113 114def canakit_set_relay_channel_state(canakit_port, ch, state): 115 """Set Canakit relay channel and state. 116 117 Waits CANAKIT_SLEEP_TIME for rotation to occur. 118 119 Args: 120 canakit_port: serial port object for the Canakit port. 121 ch: string for channel number of relay to set. '1', '2', '3', or '4' 122 state: string of either 'ON' or 'OFF' 123 """ 124 logging.debug('Setting relay state %s', state) 125 if ch in CANAKIT_VALID_CH and state in CANAKIT_VALID_CMD: 126 canakit_cmd_send(canakit_port, CANAKIT_SET_CMD + ch + '.' + state + '\r\n') 127 time.sleep(CANAKIT_SLEEP_TIME) 128 else: 129 logging.debug('Invalid ch (%s) or state (%s), no command sent.', ch, state) 130 131 132def arduino_read_cmd(port): 133 """Read back Arduino command from serial port.""" 134 cmd = [] 135 for _ in range(ARDUINO_CMD_LENGTH): 136 cmd.append(port.read()) 137 return cmd 138 139 140def arduino_send_cmd(port, cmd): 141 """Send command to serial port.""" 142 for i in range(ARDUINO_CMD_LENGTH): 143 port.write(cmd[i]) 144 145 146def arduino_loopback_cmd(port, cmd): 147 """Send command to serial port.""" 148 arduino_send_cmd(port, cmd) 149 time.sleep(ARDUINO_CMD_TIME) 150 return arduino_read_cmd(port) 151 152 153def establish_serial_comm(port): 154 """Establish connection with serial port.""" 155 logging.debug('Establishing communication with %s', port.name) 156 trys = 1 157 hex_test = convert_to_hex(ARDUINO_TEST_CMD) 158 logging.debug(' test tx: %s %s %s', hex_test[0], hex_test[1], hex_test[2]) 159 while trys <= ARDUINO_START_NUM_TRYS: 160 cmd_read = arduino_loopback_cmd(port, ARDUINO_TEST_CMD) 161 hex_read = convert_to_hex(cmd_read) 162 logging.debug(' test rx: %s %s %s', hex_read[0], hex_read[1], hex_read[2]) 163 if cmd_read != list(ARDUINO_TEST_CMD): 164 trys += 1 165 else: 166 logging.debug(' Arduino comm established after %d try(s)', trys) 167 break 168 169 170def convert_to_hex(cmd): 171 return [('%0.2x' % int(codecs.encode(x, 'hex_codec'), 16) if x else '--') 172 for x in cmd] 173 174 175def arduino_rotate_servo_to_angle(ch, angle, serial_port, delay=0): 176 """Rotate servo to the specified angle. 177 178 Args: 179 ch: str; servo to rotate in ARDUINO_VALID_CH 180 angle: int; servo angle to move to 181 serial_port: object; serial port 182 delay: int; time in seconds 183 """ 184 if angle < 0 or angle > ARDUINO_ANGLE_MAX: 185 logging.debug('Angle must be between 0 and %d.', ARDUINO_ANGLE_MAX) 186 angle = 0 187 if angle > ARDUINO_ANGLE_MAX: 188 angle = ARDUINO_ANGLE_MAX 189 190 cmd = [struct.pack('B', i) for i in [ARDUINO_START_BYTE, int(ch), angle]] 191 arduino_send_cmd(serial_port, cmd) 192 time.sleep(delay) 193 194 195def arduino_rotate_servo(ch, serial_port): 196 """Rotate servo between 0 --> 90 --> 0. 197 198 Args: 199 ch: str; servo to rotate 200 serial_port: object; serial port 201 """ 202 for angle in ARDUINO_ANGLES: 203 angle_norm = int(round(angle*ARDUINO_ANGLE_MAX/HS755HB_ANGLE_MAX, 0)) 204 arduino_rotate_servo_to_angle( 205 ch, angle_norm, serial_port, ARDUINO_MOVE_TIME) 206 207 208def rotation_rig(rotate_cntl, rotate_ch, num_rotations): 209 """Rotate the phone n times using rotate_cntl and rotate_ch defined. 210 211 rotate_ch is hard wired and must be determined from physical setup. 212 213 First initialize the port and send a test string defined by ARDUINO_TEST_CMD 214 to establish communications. Then rotate servo motor to origin position. 215 216 Args: 217 rotate_cntl: str to identify as 'arduino' or 'canakit' controller. 218 rotate_ch: str to identify rotation channel number. 219 num_rotations: int number of rotations. 220 """ 221 222 logging.debug('Controller: %s, ch: %s', rotate_cntl, rotate_ch) 223 if rotate_cntl.lower() == 'arduino': 224 # identify port 225 arduino_serial_port = serial_port_def('Arduino') 226 227 # send test cmd to Arduino until cmd returns properly 228 establish_serial_comm(arduino_serial_port) 229 230 # initialize servo at origin 231 logging.debug('Moving servo to origin') 232 arduino_rotate_servo_to_angle(rotate_ch, 0, arduino_serial_port, 1) 233 234 elif rotate_cntl.lower() == 'canakit': 235 canakit_serial_port = serial_port_def('Canakit') 236 237 else: 238 logging.info('No rotation rig defined. Manual test: rotate phone by hand.') 239 240 # rotate phone 241 logging.debug('Rotating phone %dx', num_rotations) 242 for _ in range(num_rotations): 243 if rotate_cntl == 'arduino': 244 arduino_rotate_servo(rotate_ch, arduino_serial_port) 245 elif rotate_cntl == 'canakit': 246 canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'ON') 247 canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'OFF') 248 logging.debug('Finished rotations') 249 250 251def get_gyro_rotations(gyro_events, cam_times): 252 """Get the rotation values of the gyro. 253 254 Integrates the gyro data between each camera frame to compute an angular 255 displacement. 256 257 Args: 258 gyro_events: List of gyro event objects. 259 cam_times: Array of N camera times, one for each frame. 260 261 Returns: 262 Array of N-1 gyro rotation measurements (rads/s). 263 """ 264 gyro_times = np.array([e['time'] for e in gyro_events]) 265 all_gyro_rots = np.array([e['z'] for e in gyro_events]) 266 gyro_rots = [] 267 if gyro_times[0] > cam_times[0] or gyro_times[-1] < cam_times[-1]: 268 raise AssertionError('Gyro times do not bound camera times! ' 269 f'gyro: {gyro_times[0]:.0f} -> {gyro_times[-1]:.0f} ' 270 f'cam: {cam_times[0]} -> {cam_times[-1]} (ns).') 271 # Integrate the gyro data between each pair of camera frame times. 272 for i_cam in range(len(cam_times)-1): 273 # Get the window of gyro samples within the current pair of frames. 274 # Note: bisect always picks first gyro index after camera time. 275 t_cam0 = cam_times[i_cam] 276 t_cam1 = cam_times[i_cam+1] 277 i_gyro_window0 = bisect.bisect(gyro_times, t_cam0) 278 i_gyro_window1 = bisect.bisect(gyro_times, t_cam1) 279 gyro_sum = 0 280 281 # Integrate samples within the window. 282 for i_gyro in range(i_gyro_window0, i_gyro_window1): 283 gyro_val = all_gyro_rots[i_gyro+1] 284 t_gyro0 = gyro_times[i_gyro] 285 t_gyro1 = gyro_times[i_gyro+1] 286 t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC 287 gyro_sum += gyro_val * t_gyro_delta 288 289 # Handle the fractional intervals at the sides of the window. 290 for side, i_gyro in enumerate([i_gyro_window0-1, i_gyro_window1]): 291 gyro_val = all_gyro_rots[i_gyro+1] 292 t_gyro0 = gyro_times[i_gyro] 293 t_gyro1 = gyro_times[i_gyro+1] 294 t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC 295 if side == 0: 296 f = (t_cam0 - t_gyro0) / (t_gyro1 - t_gyro0) 297 frac_correction = gyro_val * t_gyro_delta * (1.0 - f) 298 gyro_sum += frac_correction 299 else: 300 f = (t_cam1 - t_gyro0) / (t_gyro1 - t_gyro0) 301 frac_correction = gyro_val * t_gyro_delta * f 302 gyro_sum += frac_correction 303 304 gyro_rots.append(gyro_sum) 305 gyro_rots = np.array(gyro_rots) 306 return gyro_rots 307 308 309def get_best_alignment_offset(cam_times, cam_rots, gyro_events): 310 """Find the best offset to align the camera and gyro motion traces. 311 312 This function integrates the shifted gyro data between camera samples 313 for a range of candidate shift values, and returns the shift that 314 result in the best correlation. 315 316 Uses a correlation distance metric between the curves, where a smaller 317 value means that the curves are better-correlated. 318 319 Fits a curve to the correlation distance data to measure the minima more 320 accurately, by looking at the correlation distances within a range of 321 +/- 10ms from the measured best score; note that this will use fewer 322 than the full +/- 10 range for the curve fit if the measured score 323 (which is used as the center of the fit) is within 10ms of the edge of 324 the +/- 50ms candidate range. 325 326 Args: 327 cam_times: Array of N camera times, one for each frame. 328 cam_rots: Array of N-1 camera rotation displacements (rad). 329 gyro_events: List of gyro event objects. 330 331 Returns: 332 Best alignment offset(ms), fit coefficients, candidates, and distances. 333 """ 334 # Measure the correlation distance over defined shift 335 shift_candidates = np.arange(-_CORR_TIME_OFFSET_MAX, 336 _CORR_TIME_OFFSET_MAX+_CORR_TIME_OFFSET_STEP, 337 _CORR_TIME_OFFSET_STEP).tolist() 338 spatial_distances = [] 339 for shift in shift_candidates: 340 shifted_cam_times = cam_times + shift*_MSEC_TO_NSEC 341 gyro_rots = get_gyro_rotations(gyro_events, shifted_cam_times) 342 spatial_distance = scipy.spatial.distance.correlation(cam_rots, gyro_rots) 343 logging.debug('shift %.1fms spatial distance: %.5f', shift, 344 spatial_distance) 345 spatial_distances.append(spatial_distance) 346 347 best_corr_dist = min(spatial_distances) 348 coarse_best_shift = shift_candidates[spatial_distances.index(best_corr_dist)] 349 logging.debug('Best shift without fitting is %.4f ms', coarse_best_shift) 350 351 # Fit a 2nd order polynomial around coarse_best_shift to extract best fit 352 i = spatial_distances.index(best_corr_dist) 353 i_poly_fit_min = i - _COARSE_FIT_RANGE 354 i_poly_fit_max = i + _COARSE_FIT_RANGE + 1 355 shift_candidates = shift_candidates[i_poly_fit_min:i_poly_fit_max] 356 spatial_distances = spatial_distances[i_poly_fit_min:i_poly_fit_max] 357 fit_coeffs = np.polyfit(shift_candidates, spatial_distances, 2) # ax^2+bx+c 358 exact_best_shift = -fit_coeffs[1]/(2*fit_coeffs[0]) 359 if abs(coarse_best_shift - exact_best_shift) > 2.0: 360 raise AssertionError( 361 f'Test failed. Bad fit to time-shift curve. Coarse best shift: ' 362 f'{coarse_best_shift}, Exact best shift: {exact_best_shift}.') 363 if fit_coeffs[0] <= 0 or fit_coeffs[2] <= 0: 364 raise AssertionError( 365 f'Coefficients are < 0: a: {fit_coeffs[0]}, c: {fit_coeffs[2]}.') 366 367 return exact_best_shift, fit_coeffs, shift_candidates, spatial_distances 368 369 370class SensorFusionUtilsTests(unittest.TestCase): 371 """Run a suite of unit tests on this module.""" 372 373 _CAM_FRAME_TIME = 30 * _MSEC_TO_NSEC # Similar to 30FPS 374 _CAM_ROT_AMPLITUDE = 0.04 # Empirical number for rotation per frame (rads/s). 375 376 def _generate_pwl_waveform(self, pts, step, amplitude): 377 """Helper function to generate piece wise linear waveform.""" 378 pwl_waveform = [] 379 for t in range(pts[0], pts[1], step): 380 pwl_waveform.append(0) 381 for t in range(pts[1], pts[2], step): 382 pwl_waveform.append((t-pts[1])/(pts[2]-pts[1])*amplitude) 383 for t in range(pts[2], pts[3], step): 384 pwl_waveform.append(amplitude) 385 for t in range(pts[3], pts[4], step): 386 pwl_waveform.append((pts[4]-t)/(pts[4]-pts[3])*amplitude) 387 for t in range(pts[4], pts[5], step): 388 pwl_waveform.append(0) 389 for t in range(pts[5], pts[6], step): 390 pwl_waveform.append((-1*(t-pts[5])/(pts[6]-pts[5]))*amplitude) 391 for t in range(pts[6], pts[7], step): 392 pwl_waveform.append(-1*amplitude) 393 for t in range(pts[7], pts[8], step): 394 pwl_waveform.append((t-pts[8])/(pts[8]-pts[7])*amplitude) 395 for t in range(pts[8], pts[9], step): 396 pwl_waveform.append(0) 397 return pwl_waveform 398 399 def _generate_test_waveforms(self, gyro_sampling_rate, t_offset=0): 400 """Define ideal camera/gryo behavior. 401 402 Args: 403 gyro_sampling_rate: Value in samples/sec. 404 t_offset: Value in ns for gyro/camera timing offset. 405 406 Returns: 407 cam_times: numpy array of camera times N values long. 408 cam_rots: numpy array of camera rotations N-1 values long. 409 gyro_events: list of dicts of gyro events N*gyro_sampling_rate/30 long. 410 411 Round trip for motor is ~2 seconds (~60 frames) 412 1111111111111111 413 i i 414 i i 415 i i 416 0000 0000 0000 417 i i 418 i i 419 i i 420 -1-1-1-1-1-1-1-1 421 t_0 t_1 t_2 t_3 t_4 t_5 t_6 t_7 t_8 t_9 422 423 Note gyro waveform must extend +/- _CORR_TIME_OFFSET_MAX to enable shifting 424 of camera waveform to find best correlation. 425 426 """ 427 428 t_ramp = 4 * self._CAM_FRAME_TIME 429 pts = {} 430 pts[0] = 3 * self._CAM_FRAME_TIME 431 pts[1] = pts[0] + 3 * self._CAM_FRAME_TIME 432 pts[2] = pts[1] + t_ramp 433 pts[3] = pts[2] + 32 * self._CAM_FRAME_TIME 434 pts[4] = pts[3] + t_ramp 435 pts[5] = pts[4] + 4 * self._CAM_FRAME_TIME 436 pts[6] = pts[5] + t_ramp 437 pts[7] = pts[6] + 32 * self._CAM_FRAME_TIME 438 pts[8] = pts[7] + t_ramp 439 pts[9] = pts[8] + 4 * self._CAM_FRAME_TIME 440 cam_times = np.array(range(pts[0], pts[9], self._CAM_FRAME_TIME)) 441 cam_rots = self._generate_pwl_waveform( 442 pts, self._CAM_FRAME_TIME, self._CAM_ROT_AMPLITUDE) 443 cam_rots.pop() # rots is N-1 for N length times. 444 cam_rots = np.array(cam_rots) 445 446 # Generate gyro waveform. 447 gyro_step = int(round(_SEC_TO_NSEC/gyro_sampling_rate, 0)) 448 gyro_pts = {k: v+t_offset+self._CAM_FRAME_TIME//2 for k, v in pts.items()} 449 gyro_pts[0] = 0 # adjust end pts to bound camera 450 gyro_pts[9] += self._CAM_FRAME_TIME*2 # adjust end pt to bound camera 451 gyro_rot_amplitude = ( 452 self._CAM_ROT_AMPLITUDE / self._CAM_FRAME_TIME * _SEC_TO_NSEC) 453 gyro_rots = self._generate_pwl_waveform( 454 gyro_pts, gyro_step, gyro_rot_amplitude) 455 456 # Create gyro events list of dicts. 457 gyro_events = [] 458 for i, t in enumerate(range(gyro_pts[0], gyro_pts[9], gyro_step)): 459 gyro_events.append({'time': t, 'z': gyro_rots[i]}) 460 461 return cam_times, cam_rots, gyro_events 462 463 def test_get_gyro_rotations(self): 464 """Tests that gyro rotations are masked properly by camera rotations. 465 466 Note that waveform ideal waveform generation only works properly with 467 integer multiples of frame rate. 468 """ 469 # Run with different sampling rates to validate. 470 for gyro_sampling_rate in [200, 1000]: # 6x, 30x frame rate 471 cam_times, cam_rots, gyro_events = self._generate_test_waveforms( 472 gyro_sampling_rate) 473 gyro_rots = get_gyro_rotations(gyro_events, cam_times) 474 e_msg = f'gyro sampling rate = {gyro_sampling_rate}\n' 475 e_msg += f'cam_times = {list(cam_times)}\n' 476 e_msg += f'cam_rots = {list(cam_rots)}\n' 477 e_msg += f'gyro_rots = {list(gyro_rots)}' 478 479 self.assertTrue(np.allclose( 480 gyro_rots, cam_rots, atol=self._CAM_ROT_AMPLITUDE*0.10), e_msg) 481 482 def test_get_best_alignment_offset(self): 483 """Unittest for alignment offset check.""" 484 485 gyro_sampling_rate = 5000 486 for t_offset_ms in [0, 1]: # Run with different offsets to validate. 487 t_offset = int(t_offset_ms * _MSEC_TO_NSEC) 488 cam_times, cam_rots, gyro_events = self._generate_test_waveforms( 489 gyro_sampling_rate, t_offset) 490 491 best_fit_offset, coeffs, x, y = get_best_alignment_offset( 492 cam_times, cam_rots, gyro_events) 493 e_msg = f'best: {best_fit_offset} ms\n' 494 e_msg += f'coeffs: {coeffs}\n' 495 e_msg += f'x: {x}\n' 496 e_msg += f'y: {y}' 497 self.assertTrue(np.isclose(t_offset_ms, best_fit_offset, atol=0.1), e_msg) 498 499 500if __name__ == '__main__': 501 unittest.main() 502 503