1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::fmt::{self, Debug, Formatter};
4 use std::sync::Arc;
5 
6 use super::Inner;
7 use crate::call::{BatchContext, MessageReader, RpcStatusCode};
8 use crate::error::Error;
9 
10 /// Batch job type.
11 #[derive(PartialEq, Debug)]
12 pub enum BatchType {
13     /// Finish without reading any message.
14     Finish,
15     /// Extract one message when finish.
16     Read,
17     /// Check the rpc code and then extract one message.
18     CheckRead,
19 }
20 
21 /// A promise used to resolve batch jobs.
22 pub struct Batch {
23     ty: BatchType,
24     ctx: BatchContext,
25     inner: Arc<Inner<Option<MessageReader>>>,
26 }
27 
28 impl Batch {
new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch29     pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
30         Batch {
31             ty,
32             ctx: BatchContext::new(),
33             inner,
34         }
35     }
36 
context(&self) -> &BatchContext37     pub fn context(&self) -> &BatchContext {
38         &self.ctx
39     }
40 
read_one_msg(&mut self, success: bool)41     fn read_one_msg(&mut self, success: bool) {
42         let task = {
43             let mut guard = self.inner.lock();
44             if success {
45                 guard.set_result(Ok(self.ctx.recv_message()))
46             } else {
47                 // rely on C core to handle the failed read (e.g. deliver approriate
48                 // statusCode on the clientside).
49                 guard.set_result(Ok(None))
50             }
51         };
52         task.map(|t| t.wake());
53     }
54 
finish_response(&mut self, succeed: bool)55     fn finish_response(&mut self, succeed: bool) {
56         let task = {
57             let mut guard = self.inner.lock();
58             if succeed {
59                 let status = self.ctx.rpc_status();
60                 if status.status == RpcStatusCode::OK {
61                     guard.set_result(Ok(None))
62                 } else {
63                     guard.set_result(Err(Error::RpcFailure(status)))
64                 }
65             } else {
66                 guard.set_result(Err(Error::RemoteStopped))
67             }
68         };
69         task.map(|t| t.wake());
70     }
71 
handle_unary_response(&mut self)72     fn handle_unary_response(&mut self) {
73         let task = {
74             let mut guard = self.inner.lock();
75             let status = self.ctx.rpc_status();
76             if status.status == RpcStatusCode::OK {
77                 guard.set_result(Ok(self.ctx.recv_message()))
78             } else {
79                 guard.set_result(Err(Error::RpcFailure(status)))
80             }
81         };
82         task.map(|t| t.wake());
83     }
84 
resolve(mut self, success: bool)85     pub fn resolve(mut self, success: bool) {
86         match self.ty {
87             BatchType::CheckRead => {
88                 assert!(success);
89                 self.handle_unary_response();
90             }
91             BatchType::Finish => {
92                 self.finish_response(success);
93             }
94             BatchType::Read => {
95                 self.read_one_msg(success);
96             }
97         }
98     }
99 }
100 
101 impl Debug for Batch {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result102     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
103         write!(f, "Batch [{:?}]", self.ty)
104     }
105 }
106 
107 /// A promise used to resolve async action status.
108 ///
109 /// The action can only succeed or fail without extra error hint.
110 pub struct Action {
111     inner: Arc<Inner<bool>>,
112 }
113 
114 impl Action {
new(inner: Arc<Inner<bool>>) -> Action115     pub fn new(inner: Arc<Inner<bool>>) -> Action {
116         Action { inner }
117     }
118 
resolve(self, success: bool)119     pub fn resolve(self, success: bool) {
120         let task = {
121             let mut guard = self.inner.lock();
122             guard.set_result(Ok(success))
123         };
124         task.map(|t| t.wake());
125     }
126 }
127