1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::os::unix::net::UnixStream;
7 use std::path::Path;
8 use std::thread;
9 
10 use base::{error, Event, RawDescriptor};
11 use cros_async::Executor;
12 use data_model::{DataInit, Le32};
13 use vm_memory::GuestMemory;
14 use vmm_vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
15 use vmm_vhost::vhost_user::{Error as VhostUserError, Master};
16 use vmm_vhost::Error as VhostError;
17 
18 use crate::virtio::fs::{virtio_fs_config, FS_MAX_TAG_LEN, QUEUE_SIZE};
19 use crate::virtio::vhost::user::handler::VhostUserHandler;
20 use crate::virtio::vhost::user::worker::Worker;
21 use crate::virtio::vhost::user::{Error, Result};
22 use crate::virtio::{copy_config, TYPE_FS};
23 use crate::virtio::{Interrupt, Queue, VirtioDevice};
24 
25 pub struct Fs {
26     cfg: virtio_fs_config,
27     kill_evt: Option<Event>,
28     worker_thread: Option<thread::JoinHandle<Worker>>,
29     handler: RefCell<VhostUserHandler>,
30     queue_sizes: Vec<u16>,
31 }
32 impl Fs {
new<P: AsRef<Path>>(base_features: u64, socket_path: P, tag: &str) -> Result<Fs>33     pub fn new<P: AsRef<Path>>(base_features: u64, socket_path: P, tag: &str) -> Result<Fs> {
34         if tag.len() > FS_MAX_TAG_LEN {
35             return Err(Error::TagTooLong {
36                 len: tag.len(),
37                 max: FS_MAX_TAG_LEN,
38             });
39         }
40 
41         // The spec requires a minimum of 2 queues: one worker queue and one high priority queue
42         let default_queue_size = 2;
43 
44         let mut cfg_tag = [0u8; FS_MAX_TAG_LEN];
45         cfg_tag[..tag.len()].copy_from_slice(tag.as_bytes());
46 
47         let cfg = virtio_fs_config {
48             tag: cfg_tag,
49             // Only count the worker queues, exclude the high prio queue
50             num_request_queues: Le32::from(default_queue_size - 1),
51         };
52 
53         let socket = UnixStream::connect(&socket_path).map_err(Error::SocketConnect)?;
54         let master = Master::from_stream(socket, default_queue_size as u64);
55 
56         let allow_features = 1u64 << crate::virtio::VIRTIO_F_VERSION_1
57             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
58         let init_features = base_features | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
59         let allow_protocol_features =
60             VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::CONFIG;
61 
62         let mut handler = VhostUserHandler::new(
63             master,
64             allow_features,
65             init_features,
66             allow_protocol_features,
67         )?;
68         let queue_sizes = handler.queue_sizes(QUEUE_SIZE, default_queue_size as usize)?;
69 
70         Ok(Fs {
71             cfg,
72             kill_evt: None,
73             worker_thread: None,
74             handler: RefCell::new(handler),
75             queue_sizes,
76         })
77     }
78 }
79 
80 impl VirtioDevice for Fs {
keep_rds(&self) -> Vec<RawDescriptor>81     fn keep_rds(&self) -> Vec<RawDescriptor> {
82         Vec::new()
83     }
84 
device_type(&self) -> u3285     fn device_type(&self) -> u32 {
86         TYPE_FS
87     }
88 
queue_max_sizes(&self) -> &[u16]89     fn queue_max_sizes(&self) -> &[u16] {
90         &self.queue_sizes
91     }
92 
features(&self) -> u6493     fn features(&self) -> u64 {
94         self.handler.borrow().avail_features
95     }
96 
ack_features(&mut self, features: u64)97     fn ack_features(&mut self, features: u64) {
98         if let Err(e) = self.handler.borrow_mut().ack_features(features) {
99             error!("failed to enable features 0x{:x}: {}", features, e);
100         }
101     }
102 
read_config(&self, offset: u64, data: &mut [u8])103     fn read_config(&self, offset: u64, data: &mut [u8]) {
104         match self
105             .handler
106             .borrow_mut()
107             .read_config::<virtio_fs_config>(offset, data)
108         {
109             Ok(()) => {}
110             // copy local config when VhostUserProtocolFeatures::CONFIG is not supported by the
111             // device
112             Err(Error::GetConfig(VhostError::VhostUserProtocol(
113                 VhostUserError::InvalidOperation,
114             ))) => copy_config(data, 0, self.cfg.as_slice(), offset),
115             Err(e) => error!("Failed to fetch device config: {}", e),
116         }
117     }
118 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, queues: Vec<Queue>, queue_evts: Vec<Event>, )119     fn activate(
120         &mut self,
121         mem: GuestMemory,
122         interrupt: Interrupt,
123         queues: Vec<Queue>,
124         queue_evts: Vec<Event>,
125     ) {
126         if let Err(e) = self
127             .handler
128             .borrow_mut()
129             .activate(&mem, &interrupt, &queues, &queue_evts)
130         {
131             error!("failed to activate queues: {}", e);
132             return;
133         }
134 
135         let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
136             Ok(v) => v,
137             Err(e) => {
138                 error!("failed creating kill Event pair: {}", e);
139                 return;
140             }
141         };
142         self.kill_evt = Some(self_kill_evt);
143 
144         let worker_result = thread::Builder::new()
145             .name("vhost_user_virtio_fs".to_string())
146             .spawn(move || {
147                 let ex = Executor::new().expect("failed to create an executor");
148                 let mut worker = Worker {
149                     queues,
150                     mem,
151                     kill_evt,
152                 };
153 
154                 if let Err(e) = worker.run(&ex, interrupt) {
155                     error!("failed to start a worker: {}", e);
156                 }
157                 worker
158             });
159 
160         match worker_result {
161             Err(e) => {
162                 error!("failed to spawn vhost-user virtio_fs worker: {}", e);
163                 return;
164             }
165             Ok(join_handle) => {
166                 self.worker_thread = Some(join_handle);
167             }
168         }
169     }
170 
reset(&mut self) -> bool171     fn reset(&mut self) -> bool {
172         if let Err(e) = self.handler.borrow_mut().reset(self.queue_sizes.len()) {
173             error!("Failed to reset fs device: {}", e);
174             false
175         } else {
176             true
177         }
178     }
179 }
180 
181 impl Drop for Fs {
drop(&mut self)182     fn drop(&mut self) {
183         if let Some(kill_evt) = self.kill_evt.take() {
184             // Ignore the result because there is nothing we can do about it.
185             let _ = kill_evt.write(1);
186         }
187 
188         if let Some(worker_thread) = self.worker_thread.take() {
189             let _ = worker_thread.join();
190         }
191     }
192 }
193