1 //! `UnixStream` owned split support.
2 //!
3 //! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
4 //! with the `UnixStream::into_split` method. `OwnedReadHalf` implements
5 //! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10
11 use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
12 use crate::net::UnixStream;
13
14 use std::error::Error;
15 use std::net::Shutdown;
16 use std::pin::Pin;
17 use std::sync::Arc;
18 use std::task::{Context, Poll};
19 use std::{fmt, io};
20
21 /// Owned read half of a [`UnixStream`], created by [`into_split`].
22 ///
23 /// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
24 /// on the [`AsyncReadExt`] trait.
25 ///
26 /// [`UnixStream`]: crate::net::UnixStream
27 /// [`into_split`]: crate::net::UnixStream::into_split()
28 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
29 #[derive(Debug)]
30 pub struct OwnedReadHalf {
31 inner: Arc<UnixStream>,
32 }
33
34 /// Owned write half of a [`UnixStream`], created by [`into_split`].
35 ///
36 /// Note that in the [`AsyncWrite`] implementation of this type,
37 /// [`poll_shutdown`] will shut down the stream in the write direction.
38 /// Dropping the write half will also shut down the write half of the stream.
39 ///
40 /// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
41 /// found on the [`AsyncWriteExt`] trait.
42 ///
43 /// [`UnixStream`]: crate::net::UnixStream
44 /// [`into_split`]: crate::net::UnixStream::into_split()
45 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
46 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
47 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
48 #[derive(Debug)]
49 pub struct OwnedWriteHalf {
50 inner: Arc<UnixStream>,
51 shutdown_on_drop: bool,
52 }
53
split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf)54 pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
55 let arc = Arc::new(stream);
56 let read = OwnedReadHalf {
57 inner: Arc::clone(&arc),
58 };
59 let write = OwnedWriteHalf {
60 inner: arc,
61 shutdown_on_drop: true,
62 };
63 (read, write)
64 }
65
reunite( read: OwnedReadHalf, write: OwnedWriteHalf, ) -> Result<UnixStream, ReuniteError>66 pub(crate) fn reunite(
67 read: OwnedReadHalf,
68 write: OwnedWriteHalf,
69 ) -> Result<UnixStream, ReuniteError> {
70 if Arc::ptr_eq(&read.inner, &write.inner) {
71 write.forget();
72 // This unwrap cannot fail as the api does not allow creating more than two Arcs,
73 // and we just dropped the other half.
74 Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
75 } else {
76 Err(ReuniteError(read, write))
77 }
78 }
79
80 /// Error indicating that two halves were not from the same socket, and thus could
81 /// not be reunited.
82 #[derive(Debug)]
83 pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
84
85 impl fmt::Display for ReuniteError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 write!(
88 f,
89 "tried to reunite halves that are not from the same socket"
90 )
91 }
92 }
93
94 impl Error for ReuniteError {}
95
96 impl OwnedReadHalf {
97 /// Attempts to put the two halves of a `UnixStream` back together and
98 /// recover the original socket. Succeeds only if the two halves
99 /// originated from the same call to [`into_split`].
100 ///
101 /// [`into_split`]: crate::net::UnixStream::into_split()
reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError>102 pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
103 reunite(self, other)
104 }
105 }
106
107 impl AsyncRead for OwnedReadHalf {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>108 fn poll_read(
109 self: Pin<&mut Self>,
110 cx: &mut Context<'_>,
111 buf: &mut ReadBuf<'_>,
112 ) -> Poll<io::Result<()>> {
113 self.inner.poll_read_priv(cx, buf)
114 }
115 }
116
117 impl OwnedWriteHalf {
118 /// Attempts to put the two halves of a `UnixStream` back together and
119 /// recover the original socket. Succeeds only if the two halves
120 /// originated from the same call to [`into_split`].
121 ///
122 /// [`into_split`]: crate::net::UnixStream::into_split()
reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError>123 pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
124 reunite(other, self)
125 }
126
127 /// Destroy the write half, but don't close the write half of the stream
128 /// until the read half is dropped. If the read half has already been
129 /// dropped, this closes the stream.
forget(mut self)130 pub fn forget(mut self) {
131 self.shutdown_on_drop = false;
132 drop(self);
133 }
134 }
135
136 impl Drop for OwnedWriteHalf {
drop(&mut self)137 fn drop(&mut self) {
138 if self.shutdown_on_drop {
139 let _ = self.inner.shutdown_std(Shutdown::Write);
140 }
141 }
142 }
143
144 impl AsyncWrite for OwnedWriteHalf {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>145 fn poll_write(
146 self: Pin<&mut Self>,
147 cx: &mut Context<'_>,
148 buf: &[u8],
149 ) -> Poll<io::Result<usize>> {
150 self.inner.poll_write_priv(cx, buf)
151 }
152
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>153 fn poll_write_vectored(
154 self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 bufs: &[io::IoSlice<'_>],
157 ) -> Poll<io::Result<usize>> {
158 self.inner.poll_write_vectored_priv(cx, bufs)
159 }
160
is_write_vectored(&self) -> bool161 fn is_write_vectored(&self) -> bool {
162 self.inner.is_write_vectored()
163 }
164
165 #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>166 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
167 // flush is a no-op
168 Poll::Ready(Ok(()))
169 }
170
171 // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>172 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
173 let res = self.inner.shutdown_std(Shutdown::Write);
174 if res.is_ok() {
175 Pin::into_inner(self).shutdown_on_drop = false;
176 }
177 res.into()
178 }
179 }
180
181 impl AsRef<UnixStream> for OwnedReadHalf {
as_ref(&self) -> &UnixStream182 fn as_ref(&self) -> &UnixStream {
183 &*self.inner
184 }
185 }
186
187 impl AsRef<UnixStream> for OwnedWriteHalf {
as_ref(&self) -> &UnixStream188 fn as_ref(&self) -> &UnixStream {
189 &*self.inner
190 }
191 }
192