1 use futures_01::executor::{
2 spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01,
3 Spawn as Spawn01, UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{
6 Async as Async01, Future as Future01,
7 Stream as Stream01,
8 };
9 #[cfg(feature = "sink")]
10 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
11 use futures_core::{task as task03, future::Future as Future03, stream::Stream as Stream03};
12 use std::pin::Pin;
13 use std::task::Context;
14 #[cfg(feature = "sink")]
15 use futures_sink::Sink as Sink03;
16
17 #[cfg(feature = "io-compat")]
18 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
19 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
20 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
21
22 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
23 /// object to a futures 0.3-compatible version,
24 #[derive(Debug)]
25 #[must_use = "futures do nothing unless you `.await` or poll them"]
26 pub struct Compat01As03<T> {
27 pub(crate) inner: Spawn01<T>,
28 }
29
30 impl<T> Unpin for Compat01As03<T> {}
31
32 impl<T> Compat01As03<T> {
33 /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
34 /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Self35 pub fn new(object: T) -> Self {
36 Self {
37 inner: spawn01(object),
38 }
39 }
40
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R41 fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
42 let notify = &WakerToHandle(cx.waker());
43 self.inner.poll_fn_notify(notify, 0, f)
44 }
45
46 /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T47 pub fn get_ref(&self) -> &T {
48 self.inner.get_ref()
49 }
50
51 /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
52 /// within.
get_mut(&mut self) -> &mut T53 pub fn get_mut(&mut self) -> &mut T {
54 self.inner.get_mut()
55 }
56
57 /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
58 /// AsyncWrite object.
into_inner(self) -> T59 pub fn into_inner(self) -> T {
60 self.inner.into_inner()
61 }
62 }
63
64 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
65 pub trait Future01CompatExt: Future01 {
66 /// Converts a futures 0.1
67 /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
68 /// into a futures 0.3
69 /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
70 ///
71 /// ```
72 /// # futures::executor::block_on(async {
73 /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
74 /// # // feature issues
75 /// use futures_util::compat::Future01CompatExt;
76 ///
77 /// let future = futures_01::future::ok::<u32, ()>(1);
78 /// assert_eq!(future.compat().await, Ok(1));
79 /// # });
80 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,81 fn compat(self) -> Compat01As03<Self>
82 where
83 Self: Sized,
84 {
85 Compat01As03::new(self)
86 }
87 }
88 impl<Fut: Future01> Future01CompatExt for Fut {}
89
90 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
91 pub trait Stream01CompatExt: Stream01 {
92 /// Converts a futures 0.1
93 /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
94 /// into a futures 0.3
95 /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
96 ///
97 /// ```
98 /// # futures::executor::block_on(async {
99 /// use futures::stream::StreamExt;
100 /// use futures_util::compat::Stream01CompatExt;
101 ///
102 /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
103 /// let mut stream = stream.compat();
104 /// assert_eq!(stream.next().await, Some(Ok(1)));
105 /// assert_eq!(stream.next().await, None);
106 /// # });
107 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,108 fn compat(self) -> Compat01As03<Self>
109 where
110 Self: Sized,
111 {
112 Compat01As03::new(self)
113 }
114 }
115 impl<St: Stream01> Stream01CompatExt for St {}
116
117 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
118 #[cfg(feature = "sink")]
119 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
120 pub trait Sink01CompatExt: Sink01 {
121 /// Converts a futures 0.1
122 /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
123 /// into a futures 0.3
124 /// [`Sink<T, Error = E>`](futures_sink::Sink).
125 ///
126 /// ```
127 /// # futures::executor::block_on(async {
128 /// use futures::{sink::SinkExt, stream::StreamExt};
129 /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
130 ///
131 /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
132 /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
133 ///
134 /// tx.send(1).await.unwrap();
135 /// drop(tx);
136 /// assert_eq!(rx.next().await, Some(Ok(1)));
137 /// assert_eq!(rx.next().await, None);
138 /// # });
139 /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,140 fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
141 where
142 Self: Sized,
143 {
144 Compat01As03Sink::new(self)
145 }
146 }
147 #[cfg(feature = "sink")]
148 impl<Si: Sink01> Sink01CompatExt for Si {}
149
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>150 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
151 match x? {
152 Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
153 Async01::NotReady => task03::Poll::Pending,
154 }
155 }
156
157 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
158 type Output = Result<Fut::Item, Fut::Error>;
159
poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Self::Output>160 fn poll(
161 mut self: Pin<&mut Self>,
162 cx: &mut Context<'_>,
163 ) -> task03::Poll<Self::Output> {
164 poll_01_to_03(self.in_notify(cx, Future01::poll))
165 }
166 }
167
168 impl<St: Stream01> Stream03 for Compat01As03<St> {
169 type Item = Result<St::Item, St::Error>;
170
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>171 fn poll_next(
172 mut self: Pin<&mut Self>,
173 cx: &mut Context<'_>,
174 ) -> task03::Poll<Option<Self::Item>> {
175 match self.in_notify(cx, Stream01::poll)? {
176 Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
177 Async01::Ready(None) => task03::Poll::Ready(None),
178 Async01::NotReady => task03::Poll::Pending,
179 }
180 }
181 }
182
183 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
184 #[cfg(feature = "sink")]
185 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
186 #[derive(Debug)]
187 #[must_use = "sinks do nothing unless polled"]
188 pub struct Compat01As03Sink<S, SinkItem> {
189 pub(crate) inner: Spawn01<S>,
190 pub(crate) buffer: Option<SinkItem>,
191 pub(crate) close_started: bool,
192 }
193
194 #[cfg(feature = "sink")]
195 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
196
197 #[cfg(feature = "sink")]
198 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
199 /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Self200 pub fn new(inner: S) -> Self {
201 Self {
202 inner: spawn01(inner),
203 buffer: None,
204 close_started: false
205 }
206 }
207
in_notify<R>( &mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R, ) -> R208 fn in_notify<R>(
209 &mut self,
210 cx: &mut Context<'_>,
211 f: impl FnOnce(&mut S) -> R,
212 ) -> R {
213 let notify = &WakerToHandle(cx.waker());
214 self.inner.poll_fn_notify(notify, 0, f)
215 }
216
217 /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S218 pub fn get_ref(&self) -> &S {
219 self.inner.get_ref()
220 }
221
222 /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S223 pub fn get_mut(&mut self) -> &mut S {
224 self.inner.get_mut()
225 }
226
227 /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S228 pub fn into_inner(self) -> S {
229 self.inner.into_inner()
230 }
231 }
232
233 #[cfg(feature = "sink")]
234 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
235 where
236 S: Stream01,
237 {
238 type Item = Result<S::Item, S::Error>;
239
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>240 fn poll_next(
241 mut self: Pin<&mut Self>,
242 cx: &mut Context<'_>,
243 ) -> task03::Poll<Option<Self::Item>> {
244 match self.in_notify(cx, Stream01::poll)? {
245 Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
246 Async01::Ready(None) => task03::Poll::Ready(None),
247 Async01::NotReady => task03::Poll::Pending,
248 }
249 }
250 }
251
252 #[cfg(feature = "sink")]
253 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
254 where
255 S: Sink01<SinkItem = SinkItem>,
256 {
257 type Error = S::SinkError;
258
start_send( mut self: Pin<&mut Self>, item: SinkItem, ) -> Result<(), Self::Error>259 fn start_send(
260 mut self: Pin<&mut Self>,
261 item: SinkItem,
262 ) -> Result<(), Self::Error> {
263 debug_assert!(self.buffer.is_none());
264 self.buffer = Some(item);
265 Ok(())
266 }
267
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>268 fn poll_ready(
269 mut self: Pin<&mut Self>,
270 cx: &mut Context<'_>,
271 ) -> task03::Poll<Result<(), Self::Error>> {
272 match self.buffer.take() {
273 Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
274 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
275 AsyncSink01::NotReady(i) => {
276 self.buffer = Some(i);
277 task03::Poll::Pending
278 }
279 },
280 None => task03::Poll::Ready(Ok(())),
281 }
282 }
283
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>284 fn poll_flush(
285 mut self: Pin<&mut Self>,
286 cx: &mut Context<'_>,
287 ) -> task03::Poll<Result<(), Self::Error>> {
288 let item = self.buffer.take();
289 match self.in_notify(cx, |f| match item {
290 Some(i) => match f.start_send(i)? {
291 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
292 AsyncSink01::NotReady(t) => {
293 Ok((Async01::NotReady, Some(t)))
294 }
295 },
296 None => f.poll_complete().map(|i| (i, None)),
297 })? {
298 (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
299 (Async01::NotReady, item) => {
300 self.buffer = item;
301 task03::Poll::Pending
302 }
303 }
304 }
305
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>306 fn poll_close(
307 mut self: Pin<&mut Self>,
308 cx: &mut Context<'_>,
309 ) -> task03::Poll<Result<(), Self::Error>> {
310 let item = self.buffer.take();
311 let close_started = self.close_started;
312
313 let result = self.in_notify(cx, |f| {
314 if !close_started {
315 if let Some(item) = item {
316 if let AsyncSink01::NotReady(item) = f.start_send(item)? {
317 return Ok((Async01::NotReady, Some(item), false));
318 }
319 }
320
321 if let Async01::NotReady = f.poll_complete()? {
322 return Ok((Async01::NotReady, None, false));
323 }
324 }
325
326 Ok((<S as Sink01>::close(f)?, None, true))
327 });
328
329 match result? {
330 (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
331 (Async01::NotReady, item, close_started) => {
332 self.buffer = item;
333 self.close_started = close_started;
334 task03::Poll::Pending
335 }
336 }
337 }
338 }
339
340 struct NotifyWaker(task03::Waker);
341
342 #[allow(missing_debug_implementations)] // false positive: this is private type
343 #[derive(Clone)]
344 struct WakerToHandle<'a>(&'a task03::Waker);
345
346 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> Self347 fn from(handle: WakerToHandle<'_>) -> Self {
348 let ptr = Box::new(NotifyWaker(handle.0.clone()));
349
350 unsafe { Self::new(Box::into_raw(ptr)) }
351 }
352 }
353
354 impl Notify01 for NotifyWaker {
notify(&self, _: usize)355 fn notify(&self, _: usize) {
356 self.0.wake_by_ref();
357 }
358 }
359
360 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01361 unsafe fn clone_raw(&self) -> NotifyHandle01 {
362 WakerToHandle(&self.0).into()
363 }
364
drop_raw(&self)365 unsafe fn drop_raw(&self) {
366 let ptr: *const dyn UnsafeNotify01 = self;
367 drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
368 }
369 }
370
371 #[cfg(feature = "io-compat")]
372 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
373 mod io {
374 use super::*;
375 #[cfg(feature = "read-initializer")]
376 use futures_io::Initializer;
377 use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
378 use std::io::Error;
379 use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
380
381 /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
382 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
383 pub trait AsyncRead01CompatExt: AsyncRead01 {
384 /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
385 /// [`AsyncRead`](futures_io::AsyncRead).
386 ///
387 /// ```
388 /// # futures::executor::block_on(async {
389 /// use futures::io::AsyncReadExt;
390 /// use futures_util::compat::AsyncRead01CompatExt;
391 ///
392 /// let input = b"Hello World!";
393 /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
394 /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
395 ///
396 /// let mut output = Vec::with_capacity(12);
397 /// reader.read_to_end(&mut output).await.unwrap();
398 /// assert_eq!(output, input);
399 /// # });
400 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,401 fn compat(self) -> Compat01As03<Self>
402 where
403 Self: Sized,
404 {
405 Compat01As03::new(self)
406 }
407 }
408 impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
409
410 /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
411 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
412 pub trait AsyncWrite01CompatExt: AsyncWrite01 {
413 /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
414 /// [`AsyncWrite`](futures_io::AsyncWrite).
415 ///
416 /// ```
417 /// # futures::executor::block_on(async {
418 /// use futures::io::AsyncWriteExt;
419 /// use futures_util::compat::AsyncWrite01CompatExt;
420 ///
421 /// let input = b"Hello World!";
422 /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
423 ///
424 /// let mut writer = (&mut cursor).compat();
425 /// writer.write_all(input).await.unwrap();
426 ///
427 /// assert_eq!(cursor.into_inner(), input);
428 /// # });
429 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,430 fn compat(self) -> Compat01As03<Self>
431 where
432 Self: Sized,
433 {
434 Compat01As03::new(self)
435 }
436 }
437 impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
438
439 impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
440 #[cfg(feature = "read-initializer")]
initializer(&self) -> Initializer441 unsafe fn initializer(&self) -> Initializer {
442 // check if `prepare_uninitialized_buffer` needs zeroing
443 if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
444 Initializer::zeroing()
445 } else {
446 Initializer::nop()
447 }
448 }
449
poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> task03::Poll<Result<usize, Error>>450 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
451 -> task03::Poll<Result<usize, Error>>
452 {
453 poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
454 }
455 }
456
457 impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> task03::Poll<Result<usize, Error>>458 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
459 -> task03::Poll<Result<usize, Error>>
460 {
461 poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
462 }
463
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>464 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
465 -> task03::Poll<Result<(), Error>>
466 {
467 poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
468 }
469
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>470 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
471 -> task03::Poll<Result<(), Error>>
472 {
473 poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
474 }
475 }
476 }
477