1# Copyright 2020 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utility functions for sensor_fusion hardware rig."""
15
16
17import bisect
18import codecs
19import logging
20import struct
21import time
22import unittest
23
24import numpy as np
25import scipy.spatial
26import serial
27from serial.tools import list_ports
28
29# Constants for Rotation Rig
30ARDUINO_ANGLE_MAX = 180.0  # degrees
31ARDUINO_ANGLES = [0]*5 +list(range(0, 90, 3)) + [90]*5 +list(range(90, -1, -3))
32ARDUINO_BAUDRATE = 9600
33ARDUINO_CMD_LENGTH = 3
34ARDUINO_CMD_TIME = 2.0 * ARDUINO_CMD_LENGTH / ARDUINO_BAUDRATE  # round trip
35ARDUINO_MOVE_TIME = 0.06 - ARDUINO_CMD_TIME  # seconds
36ARDUINO_PID = 0x0043
37ARDUINO_START_BYTE = 255
38ARDUINO_START_NUM_TRYS = 3
39ARDUINO_TEST_CMD = (b'\x01', b'\x02', b'\x03')
40ARDUINO_VALID_CH = ('1', '2', '3', '4', '5', '6')
41ARDUINO_VIDS = (0x2341, 0x2a03)
42
43CANAKIT_BAUDRATE = 115200
44CANAKIT_CMD_TIME = 0.05  # seconds (found experimentally)
45CANAKIT_DATA_DELIMITER = '\r\n'
46CANAKIT_PID = 0xfc73
47CANAKIT_SEND_TIMEOUT = 0.02  # seconds
48CANAKIT_SET_CMD = 'REL'
49CANAKIT_SLEEP_TIME = 2  # seconds (for full 90 degree rotation)
50CANAKIT_VALID_CMD = ('ON', 'OFF')
51CANAKIT_VALID_CH = ('1', '2', '3', '4')
52CANAKIT_VID = 0x04d8
53
54HS755HB_ANGLE_MAX = 202.0  # throw for rotation motor in degrees
55
56_COARSE_FIT_RANGE = 20  # Range area around coarse fit to do optimization.
57_CORR_TIME_OFFSET_MAX = 50  # ms max shift to try and match camera/gyro times.
58_CORR_TIME_OFFSET_STEP = 0.5  # ms step for shifts.
59
60# Unit translators
61_MSEC_TO_NSEC = 1000000
62_NSEC_TO_SEC = 1E-9
63_SEC_TO_NSEC = int(1/_NSEC_TO_SEC)
64
65
66def serial_port_def(name):
67  """Determine the serial port and open.
68
69  Args:
70    name: string of device to locate (ie. 'Arduino', 'Canakit' or 'Default')
71  Returns:
72    serial port object
73  """
74  serial_port = None
75  devices = list_ports.comports()
76  for device in devices:
77    if not (device.vid and device.pid):  # Not all comm ports have vid and pid
78      continue
79    if name.lower() == 'arduino':
80      if (device.vid in ARDUINO_VIDS and device.pid == ARDUINO_PID):
81        logging.debug('Arduino: %s', str(device))
82        serial_port = device.device
83        return serial.Serial(serial_port, ARDUINO_BAUDRATE, timeout=1)
84
85    elif name.lower() in ('canakit', 'default'):
86      if (device.vid == CANAKIT_VID and device.pid == CANAKIT_PID):
87        logging.debug('Canakit: %s', str(device))
88        serial_port = device.device
89        return serial.Serial(serial_port, CANAKIT_BAUDRATE,
90                             timeout=CANAKIT_SEND_TIMEOUT,
91                             parity=serial.PARITY_EVEN,
92                             stopbits=serial.STOPBITS_ONE,
93                             bytesize=serial.EIGHTBITS)
94  raise ValueError(f'{name} device not connected.')
95
96
97def canakit_cmd_send(canakit_serial_port, cmd_str):
98  """Wrapper for sending serial command to Canakit.
99
100  Args:
101    canakit_serial_port: port to write for canakit
102    cmd_str: str; value to send to device.
103  """
104  try:
105    logging.debug('writing port...')
106    canakit_serial_port.write(CANAKIT_DATA_DELIMITER.encode())
107    time.sleep(CANAKIT_CMD_TIME)  # This is critical for relay.
108    canakit_serial_port.write(cmd_str.encode())
109
110  except IOError:
111    raise IOError(f'Port {CANAKIT_VID}:{CANAKIT_PID} is not open!')
112
113
114def canakit_set_relay_channel_state(canakit_port, ch, state):
115  """Set Canakit relay channel and state.
116
117  Waits CANAKIT_SLEEP_TIME for rotation to occur.
118
119  Args:
120    canakit_port: serial port object for the Canakit port.
121    ch: string for channel number of relay to set. '1', '2', '3', or '4'
122    state: string of either 'ON' or 'OFF'
123  """
124  logging.debug('Setting relay state %s', state)
125  if ch in CANAKIT_VALID_CH and state in CANAKIT_VALID_CMD:
126    canakit_cmd_send(canakit_port, CANAKIT_SET_CMD + ch + '.' + state + '\r\n')
127    time.sleep(CANAKIT_SLEEP_TIME)
128  else:
129    logging.debug('Invalid ch (%s) or state (%s), no command sent.', ch, state)
130
131
132def arduino_read_cmd(port):
133  """Read back Arduino command from serial port."""
134  cmd = []
135  for _ in range(ARDUINO_CMD_LENGTH):
136    cmd.append(port.read())
137  return cmd
138
139
140def arduino_send_cmd(port, cmd):
141  """Send command to serial port."""
142  for i in range(ARDUINO_CMD_LENGTH):
143    port.write(cmd[i])
144
145
146def arduino_loopback_cmd(port, cmd):
147  """Send command to serial port."""
148  arduino_send_cmd(port, cmd)
149  time.sleep(ARDUINO_CMD_TIME)
150  return arduino_read_cmd(port)
151
152
153def establish_serial_comm(port):
154  """Establish connection with serial port."""
155  logging.debug('Establishing communication with %s', port.name)
156  trys = 1
157  hex_test = convert_to_hex(ARDUINO_TEST_CMD)
158  logging.debug(' test tx: %s %s %s', hex_test[0], hex_test[1], hex_test[2])
159  while trys <= ARDUINO_START_NUM_TRYS:
160    cmd_read = arduino_loopback_cmd(port, ARDUINO_TEST_CMD)
161    hex_read = convert_to_hex(cmd_read)
162    logging.debug(' test rx: %s %s %s', hex_read[0], hex_read[1], hex_read[2])
163    if cmd_read != list(ARDUINO_TEST_CMD):
164      trys += 1
165    else:
166      logging.debug(' Arduino comm established after %d try(s)', trys)
167      break
168
169
170def convert_to_hex(cmd):
171  return [('%0.2x' % int(codecs.encode(x, 'hex_codec'), 16) if x else '--')
172          for x in cmd]
173
174
175def arduino_rotate_servo_to_angle(ch, angle, serial_port, delay=0):
176  """Rotate servo to the specified angle.
177
178  Args:
179    ch: str; servo to rotate in ARDUINO_VALID_CH
180    angle: int; servo angle to move to
181    serial_port: object; serial port
182    delay: int; time in seconds
183  """
184  if angle < 0 or angle > ARDUINO_ANGLE_MAX:
185    logging.debug('Angle must be between 0 and %d.', ARDUINO_ANGLE_MAX)
186    angle = 0
187    if angle > ARDUINO_ANGLE_MAX:
188      angle = ARDUINO_ANGLE_MAX
189
190  cmd = [struct.pack('B', i) for i in [ARDUINO_START_BYTE, int(ch), angle]]
191  arduino_send_cmd(serial_port, cmd)
192  time.sleep(delay)
193
194
195def arduino_rotate_servo(ch, serial_port):
196  """Rotate servo between 0 --> 90 --> 0.
197
198  Args:
199    ch: str; servo to rotate
200    serial_port: object; serial port
201  """
202  for angle in ARDUINO_ANGLES:
203    angle_norm = int(round(angle*ARDUINO_ANGLE_MAX/HS755HB_ANGLE_MAX, 0))
204    arduino_rotate_servo_to_angle(
205        ch, angle_norm, serial_port, ARDUINO_MOVE_TIME)
206
207
208def rotation_rig(rotate_cntl, rotate_ch, num_rotations):
209  """Rotate the phone n times using rotate_cntl and rotate_ch defined.
210
211  rotate_ch is hard wired and must be determined from physical setup.
212
213  First initialize the port and send a test string defined by ARDUINO_TEST_CMD
214  to establish communications. Then rotate servo motor to origin position.
215
216  Args:
217    rotate_cntl: str to identify as 'arduino' or 'canakit' controller.
218    rotate_ch: str to identify rotation channel number.
219    num_rotations: int number of rotations.
220  """
221
222  logging.debug('Controller: %s, ch: %s', rotate_cntl, rotate_ch)
223  if rotate_cntl.lower() == 'arduino':
224    # identify port
225    arduino_serial_port = serial_port_def('Arduino')
226
227    # send test cmd to Arduino until cmd returns properly
228    establish_serial_comm(arduino_serial_port)
229
230    # initialize servo at origin
231    logging.debug('Moving servo to origin')
232    arduino_rotate_servo_to_angle(rotate_ch, 0, arduino_serial_port, 1)
233
234  elif rotate_cntl.lower() == 'canakit':
235    canakit_serial_port = serial_port_def('Canakit')
236
237  else:
238    logging.info('No rotation rig defined. Manual test: rotate phone by hand.')
239
240  # rotate phone
241  logging.debug('Rotating phone %dx', num_rotations)
242  for _ in range(num_rotations):
243    if rotate_cntl == 'arduino':
244      arduino_rotate_servo(rotate_ch, arduino_serial_port)
245    elif rotate_cntl == 'canakit':
246      canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'ON')
247      canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'OFF')
248  logging.debug('Finished rotations')
249
250
251def get_gyro_rotations(gyro_events, cam_times):
252  """Get the rotation values of the gyro.
253
254  Integrates the gyro data between each camera frame to compute an angular
255  displacement.
256
257  Args:
258    gyro_events: List of gyro event objects.
259    cam_times: Array of N camera times, one for each frame.
260
261  Returns:
262    Array of N-1 gyro rotation measurements (rads/s).
263  """
264  gyro_times = np.array([e['time'] for e in gyro_events])
265  all_gyro_rots = np.array([e['z'] for e in gyro_events])
266  gyro_rots = []
267  if gyro_times[0] > cam_times[0] or gyro_times[-1] < cam_times[-1]:
268    raise AssertionError('Gyro times do not bound camera times! '
269                         f'gyro: {gyro_times[0]:.0f} -> {gyro_times[-1]:.0f} '
270                         f'cam: {cam_times[0]} -> {cam_times[-1]} (ns).')
271  # Integrate the gyro data between each pair of camera frame times.
272  for i_cam in range(len(cam_times)-1):
273    # Get the window of gyro samples within the current pair of frames.
274    # Note: bisect always picks first gyro index after camera time.
275    t_cam0 = cam_times[i_cam]
276    t_cam1 = cam_times[i_cam+1]
277    i_gyro_window0 = bisect.bisect(gyro_times, t_cam0)
278    i_gyro_window1 = bisect.bisect(gyro_times, t_cam1)
279    gyro_sum = 0
280
281    # Integrate samples within the window.
282    for i_gyro in range(i_gyro_window0, i_gyro_window1):
283      gyro_val = all_gyro_rots[i_gyro+1]
284      t_gyro0 = gyro_times[i_gyro]
285      t_gyro1 = gyro_times[i_gyro+1]
286      t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC
287      gyro_sum += gyro_val * t_gyro_delta
288
289    # Handle the fractional intervals at the sides of the window.
290    for side, i_gyro in enumerate([i_gyro_window0-1, i_gyro_window1]):
291      gyro_val = all_gyro_rots[i_gyro+1]
292      t_gyro0 = gyro_times[i_gyro]
293      t_gyro1 = gyro_times[i_gyro+1]
294      t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC
295      if side == 0:
296        f = (t_cam0 - t_gyro0) / (t_gyro1 - t_gyro0)
297        frac_correction = gyro_val * t_gyro_delta * (1.0 - f)
298        gyro_sum += frac_correction
299      else:
300        f = (t_cam1 - t_gyro0) / (t_gyro1 - t_gyro0)
301        frac_correction = gyro_val * t_gyro_delta * f
302        gyro_sum += frac_correction
303
304    gyro_rots.append(gyro_sum)
305  gyro_rots = np.array(gyro_rots)
306  return gyro_rots
307
308
309def get_best_alignment_offset(cam_times, cam_rots, gyro_events):
310  """Find the best offset to align the camera and gyro motion traces.
311
312  This function integrates the shifted gyro data between camera samples
313  for a range of candidate shift values, and returns the shift that
314  result in the best correlation.
315
316  Uses a correlation distance metric between the curves, where a smaller
317  value means that the curves are better-correlated.
318
319  Fits a curve to the correlation distance data to measure the minima more
320  accurately, by looking at the correlation distances within a range of
321  +/- 10ms from the measured best score; note that this will use fewer
322  than the full +/- 10 range for the curve fit if the measured score
323  (which is used as the center of the fit) is within 10ms of the edge of
324  the +/- 50ms candidate range.
325
326  Args:
327    cam_times: Array of N camera times, one for each frame.
328    cam_rots: Array of N-1 camera rotation displacements (rad).
329    gyro_events: List of gyro event objects.
330
331  Returns:
332    Best alignment offset(ms), fit coefficients, candidates, and distances.
333  """
334  # Measure the correlation distance over defined shift
335  shift_candidates = np.arange(-_CORR_TIME_OFFSET_MAX,
336                               _CORR_TIME_OFFSET_MAX+_CORR_TIME_OFFSET_STEP,
337                               _CORR_TIME_OFFSET_STEP).tolist()
338  spatial_distances = []
339  for shift in shift_candidates:
340    shifted_cam_times = cam_times + shift*_MSEC_TO_NSEC
341    gyro_rots = get_gyro_rotations(gyro_events, shifted_cam_times)
342    spatial_distance = scipy.spatial.distance.correlation(cam_rots, gyro_rots)
343    logging.debug('shift %.1fms spatial distance: %.5f', shift,
344                  spatial_distance)
345    spatial_distances.append(spatial_distance)
346
347  best_corr_dist = min(spatial_distances)
348  coarse_best_shift = shift_candidates[spatial_distances.index(best_corr_dist)]
349  logging.debug('Best shift without fitting is %.4f ms', coarse_best_shift)
350
351  # Fit a 2nd order polynomial around coarse_best_shift to extract best fit
352  i = spatial_distances.index(best_corr_dist)
353  i_poly_fit_min = i - _COARSE_FIT_RANGE
354  i_poly_fit_max = i + _COARSE_FIT_RANGE + 1
355  shift_candidates = shift_candidates[i_poly_fit_min:i_poly_fit_max]
356  spatial_distances = spatial_distances[i_poly_fit_min:i_poly_fit_max]
357  fit_coeffs = np.polyfit(shift_candidates, spatial_distances, 2)  # ax^2+bx+c
358  exact_best_shift = -fit_coeffs[1]/(2*fit_coeffs[0])
359  if abs(coarse_best_shift - exact_best_shift) > 2.0:
360    raise AssertionError(
361        f'Test failed. Bad fit to time-shift curve. Coarse best shift: '
362        f'{coarse_best_shift}, Exact best shift: {exact_best_shift}.')
363  if fit_coeffs[0] <= 0 or fit_coeffs[2] <= 0:
364    raise AssertionError(
365        f'Coefficients are < 0: a: {fit_coeffs[0]}, c: {fit_coeffs[2]}.')
366
367  return exact_best_shift, fit_coeffs, shift_candidates, spatial_distances
368
369
370class SensorFusionUtilsTests(unittest.TestCase):
371  """Run a suite of unit tests on this module."""
372
373  _CAM_FRAME_TIME = 30 * _MSEC_TO_NSEC  # Similar to 30FPS
374  _CAM_ROT_AMPLITUDE = 0.04  # Empirical number for rotation per frame (rads/s).
375
376  def _generate_pwl_waveform(self, pts, step, amplitude):
377    """Helper function to generate piece wise linear waveform."""
378    pwl_waveform = []
379    for t in range(pts[0], pts[1], step):
380      pwl_waveform.append(0)
381    for t in range(pts[1], pts[2], step):
382      pwl_waveform.append((t-pts[1])/(pts[2]-pts[1])*amplitude)
383    for t in range(pts[2], pts[3], step):
384      pwl_waveform.append(amplitude)
385    for t in range(pts[3], pts[4], step):
386      pwl_waveform.append((pts[4]-t)/(pts[4]-pts[3])*amplitude)
387    for t in range(pts[4], pts[5], step):
388      pwl_waveform.append(0)
389    for t in range(pts[5], pts[6], step):
390      pwl_waveform.append((-1*(t-pts[5])/(pts[6]-pts[5]))*amplitude)
391    for t in range(pts[6], pts[7], step):
392      pwl_waveform.append(-1*amplitude)
393    for t in range(pts[7], pts[8], step):
394      pwl_waveform.append((t-pts[8])/(pts[8]-pts[7])*amplitude)
395    for t in range(pts[8], pts[9], step):
396      pwl_waveform.append(0)
397    return pwl_waveform
398
399  def _generate_test_waveforms(self, gyro_sampling_rate, t_offset=0):
400    """Define ideal camera/gryo behavior.
401
402    Args:
403      gyro_sampling_rate: Value in samples/sec.
404      t_offset: Value in ns for gyro/camera timing offset.
405
406    Returns:
407      cam_times: numpy array of camera times N values long.
408      cam_rots: numpy array of camera rotations N-1 values long.
409      gyro_events: list of dicts of gyro events N*gyro_sampling_rate/30 long.
410
411    Round trip for motor is ~2 seconds (~60 frames)
412            1111111111111111
413           i                i
414          i                  i
415         i                    i
416     0000                      0000                      0000
417                                   i                    i
418                                    i                  i
419                                     i                i
420                                      -1-1-1-1-1-1-1-1
421    t_0 t_1 t_2           t_3 t_4 t_5 t_6           t_7 t_8 t_9
422
423    Note gyro waveform must extend +/- _CORR_TIME_OFFSET_MAX to enable shifting
424    of camera waveform to find best correlation.
425
426    """
427
428    t_ramp = 4 * self._CAM_FRAME_TIME
429    pts = {}
430    pts[0] = 3 * self._CAM_FRAME_TIME
431    pts[1] = pts[0] + 3 * self._CAM_FRAME_TIME
432    pts[2] = pts[1] + t_ramp
433    pts[3] = pts[2] + 32 * self._CAM_FRAME_TIME
434    pts[4] = pts[3] + t_ramp
435    pts[5] = pts[4] + 4 * self._CAM_FRAME_TIME
436    pts[6] = pts[5] + t_ramp
437    pts[7] = pts[6] + 32 * self._CAM_FRAME_TIME
438    pts[8] = pts[7] + t_ramp
439    pts[9] = pts[8] + 4 * self._CAM_FRAME_TIME
440    cam_times = np.array(range(pts[0], pts[9], self._CAM_FRAME_TIME))
441    cam_rots = self._generate_pwl_waveform(
442        pts, self._CAM_FRAME_TIME, self._CAM_ROT_AMPLITUDE)
443    cam_rots.pop()  # rots is N-1 for N length times.
444    cam_rots = np.array(cam_rots)
445
446    # Generate gyro waveform.
447    gyro_step = int(round(_SEC_TO_NSEC/gyro_sampling_rate, 0))
448    gyro_pts = {k: v+t_offset+self._CAM_FRAME_TIME//2 for k, v in pts.items()}
449    gyro_pts[0] = 0  # adjust end pts to bound camera
450    gyro_pts[9] += self._CAM_FRAME_TIME*2  # adjust end pt to bound camera
451    gyro_rot_amplitude = (
452        self._CAM_ROT_AMPLITUDE / self._CAM_FRAME_TIME * _SEC_TO_NSEC)
453    gyro_rots = self._generate_pwl_waveform(
454        gyro_pts, gyro_step, gyro_rot_amplitude)
455
456    # Create gyro events list of dicts.
457    gyro_events = []
458    for i, t in enumerate(range(gyro_pts[0], gyro_pts[9], gyro_step)):
459      gyro_events.append({'time': t, 'z': gyro_rots[i]})
460
461    return cam_times, cam_rots, gyro_events
462
463  def test_get_gyro_rotations(self):
464    """Tests that gyro rotations are masked properly by camera rotations.
465
466    Note that waveform ideal waveform generation only works properly with
467    integer multiples of frame rate.
468    """
469    # Run with different sampling rates to validate.
470    for gyro_sampling_rate in [200, 1000]:  # 6x, 30x frame rate
471      cam_times, cam_rots, gyro_events = self._generate_test_waveforms(
472          gyro_sampling_rate)
473      gyro_rots = get_gyro_rotations(gyro_events, cam_times)
474      e_msg = f'gyro sampling rate = {gyro_sampling_rate}\n'
475      e_msg += f'cam_times = {list(cam_times)}\n'
476      e_msg += f'cam_rots = {list(cam_rots)}\n'
477      e_msg += f'gyro_rots = {list(gyro_rots)}'
478
479      self.assertTrue(np.allclose(
480          gyro_rots, cam_rots, atol=self._CAM_ROT_AMPLITUDE*0.10), e_msg)
481
482  def test_get_best_alignment_offset(self):
483    """Unittest for alignment offset check."""
484
485    gyro_sampling_rate = 5000
486    for t_offset_ms in [0, 1]:  # Run with different offsets to validate.
487      t_offset = int(t_offset_ms * _MSEC_TO_NSEC)
488      cam_times, cam_rots, gyro_events = self._generate_test_waveforms(
489          gyro_sampling_rate, t_offset)
490
491      best_fit_offset, coeffs, x, y = get_best_alignment_offset(
492          cam_times, cam_rots, gyro_events)
493      e_msg = f'best: {best_fit_offset} ms\n'
494      e_msg += f'coeffs: {coeffs}\n'
495      e_msg += f'x: {x}\n'
496      e_msg += f'y: {y}'
497      self.assertTrue(np.isclose(t_offset_ms, best_fit_offset, atol=0.1), e_msg)
498
499
500if __name__ == '__main__':
501  unittest.main()
502
503