1 // Copyright 2021 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::cell::RefCell; 6 use std::os::unix::net::UnixStream; 7 use std::path::Path; 8 use std::thread; 9 10 use base::{error, Event, RawDescriptor}; 11 use cros_async::Executor; 12 use data_model::{DataInit, Le32}; 13 use vm_memory::GuestMemory; 14 use vmm_vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures}; 15 use vmm_vhost::vhost_user::{Error as VhostUserError, Master}; 16 use vmm_vhost::Error as VhostError; 17 18 use crate::virtio::fs::{virtio_fs_config, FS_MAX_TAG_LEN, QUEUE_SIZE}; 19 use crate::virtio::vhost::user::handler::VhostUserHandler; 20 use crate::virtio::vhost::user::worker::Worker; 21 use crate::virtio::vhost::user::{Error, Result}; 22 use crate::virtio::{copy_config, TYPE_FS}; 23 use crate::virtio::{Interrupt, Queue, VirtioDevice}; 24 25 pub struct Fs { 26 cfg: virtio_fs_config, 27 kill_evt: Option<Event>, 28 worker_thread: Option<thread::JoinHandle<Worker>>, 29 handler: RefCell<VhostUserHandler>, 30 queue_sizes: Vec<u16>, 31 } 32 impl Fs { new<P: AsRef<Path>>(base_features: u64, socket_path: P, tag: &str) -> Result<Fs>33 pub fn new<P: AsRef<Path>>(base_features: u64, socket_path: P, tag: &str) -> Result<Fs> { 34 if tag.len() > FS_MAX_TAG_LEN { 35 return Err(Error::TagTooLong { 36 len: tag.len(), 37 max: FS_MAX_TAG_LEN, 38 }); 39 } 40 41 // The spec requires a minimum of 2 queues: one worker queue and one high priority queue 42 let default_queue_size = 2; 43 44 let mut cfg_tag = [0u8; FS_MAX_TAG_LEN]; 45 cfg_tag[..tag.len()].copy_from_slice(tag.as_bytes()); 46 47 let cfg = virtio_fs_config { 48 tag: cfg_tag, 49 // Only count the worker queues, exclude the high prio queue 50 num_request_queues: Le32::from(default_queue_size - 1), 51 }; 52 53 let socket = UnixStream::connect(&socket_path).map_err(Error::SocketConnect)?; 54 let master = Master::from_stream(socket, default_queue_size as u64); 55 56 let allow_features = 1u64 << crate::virtio::VIRTIO_F_VERSION_1 57 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 58 let init_features = base_features | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 59 let allow_protocol_features = 60 VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::CONFIG; 61 62 let mut handler = VhostUserHandler::new( 63 master, 64 allow_features, 65 init_features, 66 allow_protocol_features, 67 )?; 68 let queue_sizes = handler.queue_sizes(QUEUE_SIZE, default_queue_size as usize)?; 69 70 Ok(Fs { 71 cfg, 72 kill_evt: None, 73 worker_thread: None, 74 handler: RefCell::new(handler), 75 queue_sizes, 76 }) 77 } 78 } 79 80 impl VirtioDevice for Fs { keep_rds(&self) -> Vec<RawDescriptor>81 fn keep_rds(&self) -> Vec<RawDescriptor> { 82 Vec::new() 83 } 84 device_type(&self) -> u3285 fn device_type(&self) -> u32 { 86 TYPE_FS 87 } 88 queue_max_sizes(&self) -> &[u16]89 fn queue_max_sizes(&self) -> &[u16] { 90 &self.queue_sizes 91 } 92 features(&self) -> u6493 fn features(&self) -> u64 { 94 self.handler.borrow().avail_features 95 } 96 ack_features(&mut self, features: u64)97 fn ack_features(&mut self, features: u64) { 98 if let Err(e) = self.handler.borrow_mut().ack_features(features) { 99 error!("failed to enable features 0x{:x}: {}", features, e); 100 } 101 } 102 read_config(&self, offset: u64, data: &mut [u8])103 fn read_config(&self, offset: u64, data: &mut [u8]) { 104 match self 105 .handler 106 .borrow_mut() 107 .read_config::<virtio_fs_config>(offset, data) 108 { 109 Ok(()) => {} 110 // copy local config when VhostUserProtocolFeatures::CONFIG is not supported by the 111 // device 112 Err(Error::GetConfig(VhostError::VhostUserProtocol( 113 VhostUserError::InvalidOperation, 114 ))) => copy_config(data, 0, self.cfg.as_slice(), offset), 115 Err(e) => error!("Failed to fetch device config: {}", e), 116 } 117 } 118 activate( &mut self, mem: GuestMemory, interrupt: Interrupt, queues: Vec<Queue>, queue_evts: Vec<Event>, )119 fn activate( 120 &mut self, 121 mem: GuestMemory, 122 interrupt: Interrupt, 123 queues: Vec<Queue>, 124 queue_evts: Vec<Event>, 125 ) { 126 if let Err(e) = self 127 .handler 128 .borrow_mut() 129 .activate(&mem, &interrupt, &queues, &queue_evts) 130 { 131 error!("failed to activate queues: {}", e); 132 return; 133 } 134 135 let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) { 136 Ok(v) => v, 137 Err(e) => { 138 error!("failed creating kill Event pair: {}", e); 139 return; 140 } 141 }; 142 self.kill_evt = Some(self_kill_evt); 143 144 let worker_result = thread::Builder::new() 145 .name("vhost_user_virtio_fs".to_string()) 146 .spawn(move || { 147 let ex = Executor::new().expect("failed to create an executor"); 148 let mut worker = Worker { 149 queues, 150 mem, 151 kill_evt, 152 }; 153 154 if let Err(e) = worker.run(&ex, interrupt) { 155 error!("failed to start a worker: {}", e); 156 } 157 worker 158 }); 159 160 match worker_result { 161 Err(e) => { 162 error!("failed to spawn vhost-user virtio_fs worker: {}", e); 163 return; 164 } 165 Ok(join_handle) => { 166 self.worker_thread = Some(join_handle); 167 } 168 } 169 } 170 reset(&mut self) -> bool171 fn reset(&mut self) -> bool { 172 if let Err(e) = self.handler.borrow_mut().reset(self.queue_sizes.len()) { 173 error!("Failed to reset fs device: {}", e); 174 false 175 } else { 176 true 177 } 178 } 179 } 180 181 impl Drop for Fs { drop(&mut self)182 fn drop(&mut self) { 183 if let Some(kill_evt) = self.kill_evt.take() { 184 // Ignore the result because there is nothing we can do about it. 185 let _ = kill_evt.write(1); 186 } 187 188 if let Some(worker_thread) = self.worker_thread.take() { 189 let _ = worker_thread.join(); 190 } 191 } 192 } 193