1 // Copyright 2017 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::rc::Rc;
7 use std::sync::atomic::{AtomicUsize, Ordering};
8 use std::sync::Arc;
9 use std::thread;
10 
11 use futures::{channel::mpsc, pin_mut, StreamExt};
12 use remain::sorted;
13 use thiserror::Error as ThisError;
14 
15 use base::{self, error, info, warn, AsRawDescriptor, AsyncTube, Event, RawDescriptor, Tube};
16 use cros_async::{select6, EventAsync, Executor};
17 use data_model::{DataInit, Le16, Le32, Le64};
18 use vm_control::{BalloonControlCommand, BalloonControlResult, BalloonStats};
19 use vm_memory::{GuestAddress, GuestMemory};
20 
21 use super::{
22     copy_config, descriptor_utils, DescriptorChain, Interrupt, Queue, Reader, SignalableInterrupt,
23     VirtioDevice, TYPE_BALLOON,
24 };
25 
26 #[sorted]
27 #[derive(ThisError, Debug)]
28 pub enum BalloonError {
29     /// Failed to create async message receiver.
30     #[error("failed to create async message receiver: {0}")]
31     CreatingMessageReceiver(base::TubeError),
32     /// Failed to receive command message.
33     #[error("failed to receive command message: {0}")]
34     ReceivingCommand(base::TubeError),
35     /// Failed to write config event.
36     #[error("failed to write config event: {0}")]
37     WritingConfigEvent(base::Error),
38 }
39 pub type Result<T> = std::result::Result<T, BalloonError>;
40 
41 // Balloon has three virt IO queues: Inflate, Deflate, and Stats.
42 const QUEUE_SIZE: u16 = 128;
43 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
44 
45 const VIRTIO_BALLOON_PFN_SHIFT: u32 = 12;
46 const VIRTIO_BALLOON_PF_SIZE: u64 = 1 << VIRTIO_BALLOON_PFN_SHIFT;
47 
48 // The feature bitmap for virtio balloon
49 const VIRTIO_BALLOON_F_MUST_TELL_HOST: u32 = 0; // Tell before reclaiming pages
50 const VIRTIO_BALLOON_F_STATS_VQ: u32 = 1; // Stats reporting enabled
51 const VIRTIO_BALLOON_F_DEFLATE_ON_OOM: u32 = 2; // Deflate balloon on OOM
52 
53 // virtio_balloon_config is the balloon device configuration space defined by the virtio spec.
54 #[derive(Copy, Clone, Debug, Default)]
55 #[repr(C)]
56 struct virtio_balloon_config {
57     num_pages: Le32,
58     actual: Le32,
59 }
60 
61 // Safe because it only has data and has no implicit padding.
62 unsafe impl DataInit for virtio_balloon_config {}
63 
64 // BalloonConfig is modified by the worker and read from the device thread.
65 #[derive(Default)]
66 struct BalloonConfig {
67     num_pages: AtomicUsize,
68     actual_pages: AtomicUsize,
69 }
70 
71 // The constants defining stats types in virtio_baloon_stat
72 const VIRTIO_BALLOON_S_SWAP_IN: u16 = 0;
73 const VIRTIO_BALLOON_S_SWAP_OUT: u16 = 1;
74 const VIRTIO_BALLOON_S_MAJFLT: u16 = 2;
75 const VIRTIO_BALLOON_S_MINFLT: u16 = 3;
76 const VIRTIO_BALLOON_S_MEMFREE: u16 = 4;
77 const VIRTIO_BALLOON_S_MEMTOT: u16 = 5;
78 const VIRTIO_BALLOON_S_AVAIL: u16 = 6;
79 const VIRTIO_BALLOON_S_CACHES: u16 = 7;
80 const VIRTIO_BALLOON_S_HTLB_PGALLOC: u16 = 8;
81 const VIRTIO_BALLOON_S_HTLB_PGFAIL: u16 = 9;
82 
83 // BalloonStat is used to deserialize stats from the stats_queue.
84 #[derive(Copy, Clone)]
85 #[repr(C, packed)]
86 struct BalloonStat {
87     tag: Le16,
88     val: Le64,
89 }
90 // Safe because it only has data.
91 unsafe impl DataInit for BalloonStat {}
92 
93 impl BalloonStat {
update_stats(&self, stats: &mut BalloonStats)94     fn update_stats(&self, stats: &mut BalloonStats) {
95         let val = Some(self.val.to_native());
96         match self.tag.to_native() {
97             VIRTIO_BALLOON_S_SWAP_IN => stats.swap_in = val,
98             VIRTIO_BALLOON_S_SWAP_OUT => stats.swap_out = val,
99             VIRTIO_BALLOON_S_MAJFLT => stats.major_faults = val,
100             VIRTIO_BALLOON_S_MINFLT => stats.minor_faults = val,
101             VIRTIO_BALLOON_S_MEMFREE => stats.free_memory = val,
102             VIRTIO_BALLOON_S_MEMTOT => stats.total_memory = val,
103             VIRTIO_BALLOON_S_AVAIL => stats.available_memory = val,
104             VIRTIO_BALLOON_S_CACHES => stats.disk_caches = val,
105             VIRTIO_BALLOON_S_HTLB_PGALLOC => stats.hugetlb_allocations = val,
106             VIRTIO_BALLOON_S_HTLB_PGFAIL => stats.hugetlb_failures = val,
107             _ => (),
108         }
109     }
110 }
111 
112 // Processes one message's list of addresses.
handle_address_chain<F>( avail_desc: DescriptorChain, mem: &GuestMemory, desc_handler: &mut F, ) -> descriptor_utils::Result<()> where F: FnMut(GuestAddress, u64),113 fn handle_address_chain<F>(
114     avail_desc: DescriptorChain,
115     mem: &GuestMemory,
116     desc_handler: &mut F,
117 ) -> descriptor_utils::Result<()>
118 where
119     F: FnMut(GuestAddress, u64),
120 {
121     // In a long-running system, there is no reason to expect that
122     // a significant number of freed pages are consecutive. However,
123     // batching is relatively simple and can result in significant
124     // gains in a newly booted system, so it's worth attempting.
125     let mut range_start = 0;
126     let mut range_size = 0;
127     let mut reader = Reader::new(mem.clone(), avail_desc)?;
128     for res in reader.iter::<Le32>() {
129         let pfn = match res {
130             Ok(pfn) => pfn,
131             Err(e) => {
132                 error!("error while reading unused pages: {}", e);
133                 break;
134             }
135         };
136         let guest_address = (u64::from(pfn.to_native())) << VIRTIO_BALLOON_PFN_SHIFT;
137         if range_start + range_size == guest_address {
138             range_size += VIRTIO_BALLOON_PF_SIZE;
139         } else if range_start == guest_address + VIRTIO_BALLOON_PF_SIZE {
140             range_start = guest_address;
141             range_size += VIRTIO_BALLOON_PF_SIZE;
142         } else {
143             // Discontinuity, so flush the previous range. Note range_size
144             // will be 0 on the first iteration, so skip that.
145             if range_size != 0 {
146                 desc_handler(GuestAddress(range_start), range_size);
147             }
148             range_start = guest_address;
149             range_size = VIRTIO_BALLOON_PF_SIZE;
150         }
151     }
152     if range_size != 0 {
153         desc_handler(GuestAddress(range_start), range_size);
154     }
155     Ok(())
156 }
157 
158 // Async task that handles the main balloon inflate and deflate queues.
handle_queue<F>( mem: &GuestMemory, mut queue: Queue, mut queue_event: EventAsync, interrupt: Rc<RefCell<Interrupt>>, mut desc_handler: F, ) where F: FnMut(GuestAddress, u64),159 async fn handle_queue<F>(
160     mem: &GuestMemory,
161     mut queue: Queue,
162     mut queue_event: EventAsync,
163     interrupt: Rc<RefCell<Interrupt>>,
164     mut desc_handler: F,
165 ) where
166     F: FnMut(GuestAddress, u64),
167 {
168     loop {
169         let avail_desc = match queue.next_async(mem, &mut queue_event).await {
170             Err(e) => {
171                 error!("Failed to read descriptor {}", e);
172                 return;
173             }
174             Ok(d) => d,
175         };
176         let index = avail_desc.index;
177         if let Err(e) = handle_address_chain(avail_desc, mem, &mut desc_handler) {
178             error!("balloon: failed to process inflate addresses: {}", e);
179         }
180         queue.add_used(mem, index, 0);
181         interrupt.borrow_mut().signal_used_queue(queue.vector);
182     }
183 }
184 
185 // Async task that handles the stats queue. Note that the cadence of this is driven by requests for
186 // balloon stats from the control pipe.
187 // The guests queues an initial buffer on boot, which is read and then this future will block until
188 // signaled from the command socket that stats should be collected again.
handle_stats_queue( mem: &GuestMemory, mut queue: Queue, mut queue_event: EventAsync, mut stats_rx: mpsc::Receiver<()>, command_tube: &Tube, config: Arc<BalloonConfig>, interrupt: Rc<RefCell<Interrupt>>, )189 async fn handle_stats_queue(
190     mem: &GuestMemory,
191     mut queue: Queue,
192     mut queue_event: EventAsync,
193     mut stats_rx: mpsc::Receiver<()>,
194     command_tube: &Tube,
195     config: Arc<BalloonConfig>,
196     interrupt: Rc<RefCell<Interrupt>>,
197 ) {
198     loop {
199         let stats_desc = match queue.next_async(mem, &mut queue_event).await {
200             Err(e) => {
201                 error!("Failed to read descriptor {}", e);
202                 return;
203             }
204             Ok(d) => d,
205         };
206         let index = stats_desc.index;
207         let mut reader = match Reader::new(mem.clone(), stats_desc) {
208             Ok(r) => r,
209             Err(e) => {
210                 error!("balloon: failed to CREATE Reader: {}", e);
211                 continue;
212             }
213         };
214         let mut stats: BalloonStats = Default::default();
215         for res in reader.iter::<BalloonStat>() {
216             match res {
217                 Ok(stat) => stat.update_stats(&mut stats),
218                 Err(e) => {
219                     error!("error while reading stats: {}", e);
220                     break;
221                 }
222             };
223         }
224         let actual_pages = config.actual_pages.load(Ordering::Relaxed) as u64;
225         let result = BalloonControlResult::Stats {
226             balloon_actual: actual_pages << VIRTIO_BALLOON_PFN_SHIFT,
227             stats,
228         };
229         if let Err(e) = command_tube.send(&result) {
230             error!("failed to send stats result: {}", e);
231         }
232 
233         // Wait for a request to read the stats again.
234         if stats_rx.next().await.is_none() {
235             error!("stats signal tube was closed");
236             break;
237         }
238 
239         // Request a new stats_desc to the guest.
240         queue.add_used(&mem, index, 0);
241         interrupt.borrow_mut().signal_used_queue(queue.vector);
242     }
243 }
244 
245 // Async task that handles the command socket. The command socket handles messages from the host
246 // requesting that the guest balloon be adjusted or to report guest memory statistics.
handle_command_tube( command_tube: &AsyncTube, interrupt: Rc<RefCell<Interrupt>>, config: Arc<BalloonConfig>, mut stats_tx: mpsc::Sender<()>, ) -> Result<()>247 async fn handle_command_tube(
248     command_tube: &AsyncTube,
249     interrupt: Rc<RefCell<Interrupt>>,
250     config: Arc<BalloonConfig>,
251     mut stats_tx: mpsc::Sender<()>,
252 ) -> Result<()> {
253     loop {
254         match command_tube.next().await {
255             Ok(command) => match command {
256                 BalloonControlCommand::Adjust { num_bytes } => {
257                     let num_pages = (num_bytes >> VIRTIO_BALLOON_PFN_SHIFT) as usize;
258                     info!("balloon config changed to consume {} pages", num_pages);
259 
260                     config.num_pages.store(num_pages, Ordering::Relaxed);
261                     interrupt.borrow_mut().signal_config_changed();
262                 }
263                 BalloonControlCommand::Stats => {
264                     if let Err(e) = stats_tx.try_send(()) {
265                         error!("failed to signal the stat handler: {}", e);
266                     }
267                 }
268             },
269             Err(e) => {
270                 return Err(BalloonError::ReceivingCommand(e));
271             }
272         }
273     }
274 }
275 
276 // Async task that resamples the status of the interrupt when the guest sends a request by
277 // signalling the resample event associated with the interrupt.
handle_irq_resample(ex: &Executor, interrupt: Rc<RefCell<Interrupt>>)278 async fn handle_irq_resample(ex: &Executor, interrupt: Rc<RefCell<Interrupt>>) {
279     let resample_evt = if let Some(resample_evt) = interrupt.borrow_mut().get_resample_evt() {
280         let resample_evt = resample_evt.try_clone().unwrap();
281         let resample_evt = EventAsync::new(resample_evt.0, ex).unwrap();
282         Some(resample_evt)
283     } else {
284         None
285     };
286     if let Some(resample_evt) = resample_evt {
287         while resample_evt.next_val().await.is_ok() {
288             interrupt.borrow_mut().do_interrupt_resample();
289         }
290     } else {
291         // no resample event, park the future.
292         let () = futures::future::pending().await;
293     }
294 }
295 
296 // Async task that waits for a signal from the kill event given to the device at startup.  Once this event is
297 // readable, exit. Exiting this future will cause the main loop to break and the worker thread to
298 // exit.
wait_kill(kill_evt: EventAsync)299 async fn wait_kill(kill_evt: EventAsync) {
300     let _ = kill_evt.next_val().await;
301 }
302 
303 // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
304 // to be processed.
run_worker( mut queue_evts: Vec<Event>, mut queues: Vec<Queue>, command_tube: Tube, interrupt: Interrupt, kill_evt: Event, mem: GuestMemory, config: Arc<BalloonConfig>, ) -> Tube305 fn run_worker(
306     mut queue_evts: Vec<Event>,
307     mut queues: Vec<Queue>,
308     command_tube: Tube,
309     interrupt: Interrupt,
310     kill_evt: Event,
311     mem: GuestMemory,
312     config: Arc<BalloonConfig>,
313 ) -> Tube {
314     // Wrap the interrupt in a `RefCell` so it can be shared between async functions.
315     let interrupt = Rc::new(RefCell::new(interrupt));
316 
317     let ex = Executor::new().unwrap();
318     let command_tube = command_tube.into_async_tube(&ex).unwrap();
319 
320     // We need a block to release all references to command_tube at the end before returning it.
321     {
322         // The first queue is used for inflate messages
323         let inflate_event = EventAsync::new(queue_evts.remove(0).0, &ex)
324             .expect("failed to set up the inflate event");
325         let inflate = handle_queue(
326             &mem,
327             queues.remove(0),
328             inflate_event,
329             interrupt.clone(),
330             |guest_address, len| {
331                 if let Err(e) = mem.remove_range(guest_address, len) {
332                     warn!("Marking pages unused failed: {}, addr={}", e, guest_address);
333                 }
334             },
335         );
336         pin_mut!(inflate);
337 
338         // The second queue is used for deflate messages
339         let deflate_event = EventAsync::new(queue_evts.remove(0).0, &ex)
340             .expect("failed to set up the deflate event");
341         let deflate = handle_queue(
342             &mem,
343             queues.remove(0),
344             deflate_event,
345             interrupt.clone(),
346             |_, _| {}, // Ignore these.
347         );
348         pin_mut!(deflate);
349 
350         // The third queue is used for stats messages
351         let (stats_tx, stats_rx) = mpsc::channel::<()>(1);
352         let stats_event =
353             EventAsync::new(queue_evts.remove(0).0, &ex).expect("failed to set up the stats event");
354         let stats = handle_stats_queue(
355             &mem,
356             queues.remove(0),
357             stats_event,
358             stats_rx,
359             &command_tube,
360             config.clone(),
361             interrupt.clone(),
362         );
363         pin_mut!(stats);
364 
365         // Future to handle command messages that resize the balloon.
366         let command = handle_command_tube(&command_tube, interrupt.clone(), config, stats_tx);
367         pin_mut!(command);
368 
369         // Process any requests to resample the irq value.
370         let resample = handle_irq_resample(&ex, interrupt.clone());
371         pin_mut!(resample);
372 
373         // Exit if the kill event is triggered.
374         let kill_evt = EventAsync::new(kill_evt.0, &ex).expect("failed to set up the kill event");
375         let kill = wait_kill(kill_evt);
376         pin_mut!(kill);
377 
378         if let Err(e) = ex.run_until(select6(inflate, deflate, stats, command, resample, kill)) {
379             error!("error happened in executor: {}", e);
380         }
381     }
382 
383     command_tube.into()
384 }
385 
386 /// Virtio device for memory balloon inflation/deflation.
387 pub struct Balloon {
388     command_tube: Option<Tube>,
389     config: Arc<BalloonConfig>,
390     features: u64,
391     kill_evt: Option<Event>,
392     worker_thread: Option<thread::JoinHandle<Tube>>,
393 }
394 
395 impl Balloon {
396     /// Creates a new virtio balloon device.
new(base_features: u64, command_tube: Tube) -> Result<Balloon>397     pub fn new(base_features: u64, command_tube: Tube) -> Result<Balloon> {
398         Ok(Balloon {
399             command_tube: Some(command_tube),
400             config: Arc::new(BalloonConfig {
401                 num_pages: AtomicUsize::new(0),
402                 actual_pages: AtomicUsize::new(0),
403             }),
404             kill_evt: None,
405             worker_thread: None,
406             features: base_features
407                 | 1 << VIRTIO_BALLOON_F_MUST_TELL_HOST
408                 | 1 << VIRTIO_BALLOON_F_STATS_VQ
409                 | 1 << VIRTIO_BALLOON_F_DEFLATE_ON_OOM,
410         })
411     }
412 
get_config(&self) -> virtio_balloon_config413     fn get_config(&self) -> virtio_balloon_config {
414         let num_pages = self.config.num_pages.load(Ordering::Relaxed) as u32;
415         let actual_pages = self.config.actual_pages.load(Ordering::Relaxed) as u32;
416         virtio_balloon_config {
417             num_pages: num_pages.into(),
418             actual: actual_pages.into(),
419         }
420     }
421 }
422 
423 impl Drop for Balloon {
drop(&mut self)424     fn drop(&mut self) {
425         if let Some(kill_evt) = self.kill_evt.take() {
426             // Ignore the result because there is nothing we can do with a failure.
427             let _ = kill_evt.write(1);
428         }
429 
430         if let Some(worker_thread) = self.worker_thread.take() {
431             let _ = worker_thread.join();
432         }
433     }
434 }
435 
436 impl VirtioDevice for Balloon {
keep_rds(&self) -> Vec<RawDescriptor>437     fn keep_rds(&self) -> Vec<RawDescriptor> {
438         vec![self.command_tube.as_ref().unwrap().as_raw_descriptor()]
439     }
440 
device_type(&self) -> u32441     fn device_type(&self) -> u32 {
442         TYPE_BALLOON
443     }
444 
queue_max_sizes(&self) -> &[u16]445     fn queue_max_sizes(&self) -> &[u16] {
446         QUEUE_SIZES
447     }
448 
read_config(&self, offset: u64, data: &mut [u8])449     fn read_config(&self, offset: u64, data: &mut [u8]) {
450         copy_config(data, 0, self.get_config().as_slice(), offset);
451     }
452 
write_config(&mut self, offset: u64, data: &[u8])453     fn write_config(&mut self, offset: u64, data: &[u8]) {
454         let mut config = self.get_config();
455         copy_config(config.as_mut_slice(), offset, data, 0);
456         self.config
457             .actual_pages
458             .store(config.actual.to_native() as usize, Ordering::Relaxed);
459     }
460 
features(&self) -> u64461     fn features(&self) -> u64 {
462         self.features
463     }
464 
ack_features(&mut self, value: u64)465     fn ack_features(&mut self, value: u64) {
466         self.features &= value;
467     }
468 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, queues: Vec<Queue>, queue_evts: Vec<Event>, )469     fn activate(
470         &mut self,
471         mem: GuestMemory,
472         interrupt: Interrupt,
473         queues: Vec<Queue>,
474         queue_evts: Vec<Event>,
475     ) {
476         if queues.len() != QUEUE_SIZES.len() || queue_evts.len() != QUEUE_SIZES.len() {
477             return;
478         }
479 
480         let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
481             Ok(v) => v,
482             Err(e) => {
483                 error!("failed to create kill Event pair: {}", e);
484                 return;
485             }
486         };
487         self.kill_evt = Some(self_kill_evt);
488 
489         let config = self.config.clone();
490         let command_tube = self.command_tube.take().unwrap();
491         let worker_result = thread::Builder::new()
492             .name("virtio_balloon".to_string())
493             .spawn(move || {
494                 run_worker(
495                     queue_evts,
496                     queues,
497                     command_tube,
498                     interrupt,
499                     kill_evt,
500                     mem,
501                     config,
502                 )
503             });
504 
505         match worker_result {
506             Err(e) => {
507                 error!("failed to spawn virtio_balloon worker: {}", e);
508             }
509             Ok(join_handle) => {
510                 self.worker_thread = Some(join_handle);
511             }
512         }
513     }
514 
reset(&mut self) -> bool515     fn reset(&mut self) -> bool {
516         if let Some(kill_evt) = self.kill_evt.take() {
517             if kill_evt.write(1).is_err() {
518                 error!("{}: failed to notify the kill event", self.debug_label());
519                 return false;
520             }
521         }
522 
523         if let Some(worker_thread) = self.worker_thread.take() {
524             match worker_thread.join() {
525                 Err(_) => {
526                     error!("{}: failed to get back resources", self.debug_label());
527                     return false;
528                 }
529                 Ok(command_tube) => {
530                     self.command_tube = Some(command_tube);
531                     return true;
532                 }
533             }
534         }
535         false
536     }
537 }
538 
539 #[cfg(test)]
540 mod tests {
541     use super::*;
542 
543     use crate::virtio::descriptor_utils::{create_descriptor_chain, DescriptorType};
544 
545     #[test]
desc_parsing_inflate()546     fn desc_parsing_inflate() {
547         // Check that the memory addresses are parsed correctly by 'handle_address_chain' and passed
548         // to the closure.
549         let memory_start_addr = GuestAddress(0x0);
550         let memory = GuestMemory::new(&vec![(memory_start_addr, 0x10000)]).unwrap();
551         memory
552             .write_obj_at_addr(0x10u32, GuestAddress(0x100))
553             .unwrap();
554         memory
555             .write_obj_at_addr(0xaa55aa55u32, GuestAddress(0x104))
556             .unwrap();
557 
558         let chain = create_descriptor_chain(
559             &memory,
560             GuestAddress(0x0),
561             GuestAddress(0x100),
562             vec![(DescriptorType::Readable, 8)],
563             0,
564         )
565         .expect("create_descriptor_chain failed");
566 
567         let mut addrs = Vec::new();
568         let res = handle_address_chain(chain, &memory, &mut |guest_address, len| {
569             addrs.push((guest_address, len));
570         });
571         assert!(res.is_ok());
572         assert_eq!(addrs.len(), 2);
573         assert_eq!(
574             addrs[0].0,
575             GuestAddress(0x10u64 << VIRTIO_BALLOON_PFN_SHIFT)
576         );
577         assert_eq!(
578             addrs[1].0,
579             GuestAddress(0xaa55aa55u64 << VIRTIO_BALLOON_PFN_SHIFT)
580         );
581     }
582 }
583