1// Copyright 2020 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15/* eslint-env browser */ 16import {BehaviorSubject, Subject, Subscription, Observable} from 'rxjs'; 17import DeviceTransport from './device_transport'; 18 19const DEFAULT_SERIAL_OPTIONS: SerialOptions & {baudRate: number} = { 20 // Some versions of chrome use `baudrate` (linux) 21 baudrate: 921600, 22 // Some versions use `baudRate` (chromebook) 23 baudRate: 921600, 24 databits: 8, 25 parity: 'none', 26 stopbits: 1, 27}; 28 29interface PortReadConnection { 30 chunks: Observable<Uint8Array>; 31 errors: Observable<Error>; 32} 33 34interface PortConnection extends PortReadConnection { 35 sendChunk: (chunk: Uint8Array) => Promise<void>; 36} 37 38export class DeviceLostError extends Error { 39 message = 'The device has been lost'; 40} 41 42export class DeviceLockedError extends Error { 43 message = 44 "The device's port is locked. Try unplugging it" + 45 ' and plugging it back in.'; 46} 47 48/** 49 * WebSerialTransport sends and receives UInt8Arrays to and 50 * from a serial device connected over USB. 51 */ 52export class WebSerialTransport implements DeviceTransport { 53 chunks = new Subject<Uint8Array>(); 54 errors = new Subject<Error>(); 55 connected = new BehaviorSubject<boolean>(false); 56 private portConnections: Map<SerialPort, PortConnection> = new Map(); 57 private activePortConnectionConnection: PortConnection | undefined; 58 private rxSubscriptions: Subscription[] = []; 59 60 constructor( 61 private serial: Serial = navigator.serial, 62 private filters: SerialPortFilter[] = [], 63 private serialOptions = DEFAULT_SERIAL_OPTIONS 64 ) {} 65 66 /** 67 * Send a UInt8Array chunk of data to the connected device. 68 * @param {Uint8Array} chunk The chunk to send 69 */ 70 async sendChunk(chunk: Uint8Array): Promise<void> { 71 if (this.activePortConnectionConnection) { 72 return this.activePortConnectionConnection.sendChunk(chunk); 73 } 74 throw new Error('Device not connected'); 75 } 76 77 /** 78 * Attempt to open a connection to a device. This includes 79 * asking the user to select a serial port and should only 80 * be called in response to user interaction. 81 */ 82 async connect(): Promise<void> { 83 const port = await this.serial.requestPort({filters: this.filters}); 84 await this.connectPort(port); 85 } 86 87 private disconnect() { 88 for (const subscription of this.rxSubscriptions) { 89 subscription.unsubscribe(); 90 } 91 this.rxSubscriptions = []; 92 93 this.activePortConnectionConnection = undefined; 94 this.connected.next(false); 95 } 96 97 /** 98 * Connect to a given SerialPort. This involves no user interaction. 99 * and can be called whenever a port is available. 100 */ 101 async connectPort(port: SerialPort): Promise<void> { 102 this.disconnect(); 103 104 this.activePortConnectionConnection = 105 this.portConnections.get(port) ?? (await this.conectNewPort(port)); 106 107 this.connected.next(true); 108 109 this.rxSubscriptions.push( 110 this.activePortConnectionConnection.chunks.subscribe( 111 chunk => { 112 this.chunks.next(chunk); 113 }, 114 err => { 115 throw new Error(`Chunks observable had an unexpeted error ${err}`); 116 }, 117 () => { 118 this.connected.next(false); 119 this.portConnections.delete(port); 120 // Don't complete the chunks observable because then it would not 121 // be able to forward any future chunks. 122 } 123 ) 124 ); 125 126 this.rxSubscriptions.push( 127 this.activePortConnectionConnection.errors.subscribe(error => { 128 this.errors.next(error); 129 if (error instanceof DeviceLostError) { 130 // The device has been lost 131 this.connected.next(false); 132 } 133 }) 134 ); 135 } 136 137 private async conectNewPort(port: SerialPort): Promise<PortConnection> { 138 await port.open(this.serialOptions); 139 const writer = port.writable.getWriter(); 140 141 async function sendChunk(chunk: Uint8Array) { 142 await writer.ready; 143 await writer.write(chunk); 144 } 145 146 const {chunks, errors} = this.getChunks(port); 147 148 const connection: PortConnection = {sendChunk, chunks, errors}; 149 this.portConnections.set(port, connection); 150 return connection; 151 } 152 153 private getChunks(port: SerialPort): PortReadConnection { 154 const chunks = new Subject<Uint8Array>(); 155 const errors = new Subject<Error>(); 156 157 async function read() { 158 if (!port.readable) { 159 throw new DeviceLostError(); 160 } 161 if (port.readable.locked) { 162 throw new DeviceLockedError(); 163 } 164 await port.readable.pipeTo( 165 new WritableStream({ 166 write: chunk => { 167 chunks.next(chunk); 168 }, 169 close: () => { 170 chunks.complete(); 171 errors.complete(); 172 }, 173 abort: () => { 174 // Reconnect to the port. 175 connect(); 176 }, 177 }) 178 ); 179 } 180 181 function connect() { 182 read().catch(err => { 183 // Don't error the chunks observable since that stops it from 184 // reading any more packets, and we often want to continue 185 // despite an error. Instead, push errors to the 'errors' 186 // observable. 187 errors.next(err); 188 }); 189 } 190 191 connect(); 192 193 return {chunks, errors}; 194 } 195} 196