1 //! `UnixStream` owned split support.
2 //!
3 //! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
4 //! with the `UnixStream::into_split` method.  `OwnedReadHalf` implements
5 //! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10 
11 use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
12 use crate::net::UnixStream;
13 
14 use std::error::Error;
15 use std::net::Shutdown;
16 use std::pin::Pin;
17 use std::sync::Arc;
18 use std::task::{Context, Poll};
19 use std::{fmt, io};
20 
21 /// Owned read half of a [`UnixStream`], created by [`into_split`].
22 ///
23 /// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
24 /// on the [`AsyncReadExt`] trait.
25 ///
26 /// [`UnixStream`]: crate::net::UnixStream
27 /// [`into_split`]: crate::net::UnixStream::into_split()
28 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
29 #[derive(Debug)]
30 pub struct OwnedReadHalf {
31     inner: Arc<UnixStream>,
32 }
33 
34 /// Owned write half of a [`UnixStream`], created by [`into_split`].
35 ///
36 /// Note that in the [`AsyncWrite`] implementation of this type,
37 /// [`poll_shutdown`] will shut down the stream in the write direction.
38 /// Dropping the write half will also shut down the write half of the stream.
39 ///
40 /// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
41 /// found on the [`AsyncWriteExt`] trait.
42 ///
43 /// [`UnixStream`]: crate::net::UnixStream
44 /// [`into_split`]: crate::net::UnixStream::into_split()
45 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
46 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
47 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
48 #[derive(Debug)]
49 pub struct OwnedWriteHalf {
50     inner: Arc<UnixStream>,
51     shutdown_on_drop: bool,
52 }
53 
split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf)54 pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
55     let arc = Arc::new(stream);
56     let read = OwnedReadHalf {
57         inner: Arc::clone(&arc),
58     };
59     let write = OwnedWriteHalf {
60         inner: arc,
61         shutdown_on_drop: true,
62     };
63     (read, write)
64 }
65 
reunite( read: OwnedReadHalf, write: OwnedWriteHalf, ) -> Result<UnixStream, ReuniteError>66 pub(crate) fn reunite(
67     read: OwnedReadHalf,
68     write: OwnedWriteHalf,
69 ) -> Result<UnixStream, ReuniteError> {
70     if Arc::ptr_eq(&read.inner, &write.inner) {
71         write.forget();
72         // This unwrap cannot fail as the api does not allow creating more than two Arcs,
73         // and we just dropped the other half.
74         Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
75     } else {
76         Err(ReuniteError(read, write))
77     }
78 }
79 
80 /// Error indicating that two halves were not from the same socket, and thus could
81 /// not be reunited.
82 #[derive(Debug)]
83 pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
84 
85 impl fmt::Display for ReuniteError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result86     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87         write!(
88             f,
89             "tried to reunite halves that are not from the same socket"
90         )
91     }
92 }
93 
94 impl Error for ReuniteError {}
95 
96 impl OwnedReadHalf {
97     /// Attempts to put the two halves of a `UnixStream` back together and
98     /// recover the original socket. Succeeds only if the two halves
99     /// originated from the same call to [`into_split`].
100     ///
101     /// [`into_split`]: crate::net::UnixStream::into_split()
reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError>102     pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
103         reunite(self, other)
104     }
105 }
106 
107 impl AsyncRead for OwnedReadHalf {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>108     fn poll_read(
109         self: Pin<&mut Self>,
110         cx: &mut Context<'_>,
111         buf: &mut ReadBuf<'_>,
112     ) -> Poll<io::Result<()>> {
113         self.inner.poll_read_priv(cx, buf)
114     }
115 }
116 
117 impl OwnedWriteHalf {
118     /// Attempts to put the two halves of a `UnixStream` back together and
119     /// recover the original socket. Succeeds only if the two halves
120     /// originated from the same call to [`into_split`].
121     ///
122     /// [`into_split`]: crate::net::UnixStream::into_split()
reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError>123     pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
124         reunite(other, self)
125     }
126 
127     /// Destroy the write half, but don't close the write half of the stream
128     /// until the read half is dropped. If the read half has already been
129     /// dropped, this closes the stream.
forget(mut self)130     pub fn forget(mut self) {
131         self.shutdown_on_drop = false;
132         drop(self);
133     }
134 }
135 
136 impl Drop for OwnedWriteHalf {
drop(&mut self)137     fn drop(&mut self) {
138         if self.shutdown_on_drop {
139             let _ = self.inner.shutdown_std(Shutdown::Write);
140         }
141     }
142 }
143 
144 impl AsyncWrite for OwnedWriteHalf {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>145     fn poll_write(
146         self: Pin<&mut Self>,
147         cx: &mut Context<'_>,
148         buf: &[u8],
149     ) -> Poll<io::Result<usize>> {
150         self.inner.poll_write_priv(cx, buf)
151     }
152 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>153     fn poll_write_vectored(
154         self: Pin<&mut Self>,
155         cx: &mut Context<'_>,
156         bufs: &[io::IoSlice<'_>],
157     ) -> Poll<io::Result<usize>> {
158         self.inner.poll_write_vectored_priv(cx, bufs)
159     }
160 
is_write_vectored(&self) -> bool161     fn is_write_vectored(&self) -> bool {
162         self.inner.is_write_vectored()
163     }
164 
165     #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>166     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
167         // flush is a no-op
168         Poll::Ready(Ok(()))
169     }
170 
171     // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>172     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
173         let res = self.inner.shutdown_std(Shutdown::Write);
174         if res.is_ok() {
175             Pin::into_inner(self).shutdown_on_drop = false;
176         }
177         res.into()
178     }
179 }
180 
181 impl AsRef<UnixStream> for OwnedReadHalf {
as_ref(&self) -> &UnixStream182     fn as_ref(&self) -> &UnixStream {
183         &*self.inner
184     }
185 }
186 
187 impl AsRef<UnixStream> for OwnedWriteHalf {
as_ref(&self) -> &UnixStream188     fn as_ref(&self) -> &UnixStream {
189         &*self.inner
190     }
191 }
192