Skip to content

Commit 468c054

Browse files
committed
feat: make CompatWrite !Unpin and avoid Box::pin
1 parent 49db306 commit 468c054

File tree

3 files changed

+33
-46
lines changed

3 files changed

+33
-46
lines changed

‎Cargo.lock‎

Lines changed: 0 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ edition = "2021"
99
compio-buf = "0.2.0-beta.1"
1010
compio-io = "0.1.0-beta.2"
1111
futures-util = { version = "0.3", features = ["io"] }
12-
pin-project = "1"
1312

1413
[dev-dependencies]
1514
compio = "0.9.0-beta.4"

‎src/write.rs‎

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@ use std::{io, slice};
66
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
77
use futures_util::task::AtomicWaker;
88

9-
type WriteFut<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> =
10-
impl Future<Output = (Io, BufResult<usize, Buf>)> + 'a + Unpin;
9+
type WriteFut<'a, Io: compio_io::AsyncWrite + 'a, Buf: IoBufMut> =
10+
impl Future<Output = (Io, BufResult<usize, Buf>)> + 'a;
1111

12-
type FlushFut<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> =
13-
impl Future<Output = (Io, io::Result<()>)> + 'a + Unpin;
12+
type FlushFut<'a, Io: compio_io::AsyncWrite + 'a, Buf: IoBufMut> =
13+
impl Future<Output = (Io, io::Result<()>)> + 'a;
1414

15-
type CloseFut<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> =
16-
impl Future<Output = (Io, io::Result<()>)> + 'a + Unpin;
15+
type CloseFut<'a, Io: compio_io::AsyncWrite + 'a, Buf: IoBufMut> =
16+
impl Future<Output = (Io, io::Result<()>)> + 'a;
1717

18-
enum FutState<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> {
18+
enum FutState<'a, Io: compio_io::AsyncWrite + 'a, Buf: IoBufMut> {
1919
Idle,
2020
Write(WriteFut<'a, Io, Buf>),
2121
Flush(FlushFut<'a, Io, Buf>),
2222
Close(CloseFut<'a, Io, Buf>),
2323
}
2424

25-
pub struct CompatWrite<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> {
25+
pub struct CompatWrite<'a, Io: compio_io::AsyncWrite + 'a, Buf: IoBufMut> {
2626
io: Option<Io>,
2727
fut: FutState<'a, Io, Buf>,
2828
write_waker: AtomicWaker,
@@ -31,7 +31,7 @@ pub struct CompatWrite<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut
3131
buf: Option<Buf>,
3232
}
3333

34-
impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> CompatWrite<'a, Io, Buf> {
34+
impl<'a, Io: compio_io::AsyncWrite + 'a, Buf: IoBufMut> CompatWrite<'a, Io, Buf> {
3535
pub fn new(io: Io, buf: Buf) -> Self {
3636
Self {
3737
io: Some(io),
@@ -44,15 +44,16 @@ impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> CompatWr
4444
}
4545
}
4646

47-
impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> futures_util::AsyncWrite
47+
impl<'a, Io: compio_io::AsyncWrite + 'a, Buf: IoBufMut> futures_util::AsyncWrite
4848
for CompatWrite<'a, Io, Buf>
4949
{
5050
fn poll_write(
5151
self: Pin<&mut Self>,
5252
cx: &mut Context<'_>,
5353
data: &[u8],
5454
) -> Poll<io::Result<usize>> {
55-
let this = self.get_mut();
55+
// safety: we don't move self
56+
let this = unsafe { self.get_unchecked_mut() };
5657
loop {
5758
match &mut this.fut {
5859
FutState::Idle => {
@@ -68,15 +69,16 @@ impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> futures_
6869
buf.copy_from_slice(&data[..size]);
6970
}
7071

71-
this.fut = FutState::Write(Box::pin(async move {
72+
this.fut = FutState::Write(async move {
7273
let BufResult(res, buf) = io.write(buf).await;
7374

7475
(io, BufResult(res, buf.into_inner()))
75-
}));
76+
});
7677
}
7778

7879
FutState::Write(fut) => {
79-
return match Pin::new(fut).poll(cx) {
80+
// safety: we don't move fut until it is completed
81+
return match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
8082
Poll::Pending => Poll::Pending,
8183
Poll::Ready((io, BufResult(res, buf))) => {
8284
this.io = Some(io);
@@ -108,17 +110,18 @@ impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> futures_
108110
}
109111

110112
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
111-
let this = self.get_mut();
113+
// safety: we don't move self
114+
let this = unsafe { self.get_unchecked_mut() };
112115
loop {
113116
match &mut this.fut {
114117
FutState::Idle => {
115118
let mut io = this.io.take().unwrap();
116119

117-
this.fut = FutState::Flush(Box::pin(async move {
120+
this.fut = FutState::Flush(async move {
118121
let res = io.flush().await;
119122

120123
(io, res)
121-
}));
124+
});
122125
}
123126

124127
FutState::Write(_) => {
@@ -128,7 +131,8 @@ impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> futures_
128131
}
129132

130133
FutState::Flush(fut) => {
131-
return match Pin::new(fut).poll(cx) {
134+
// safety: we don't move fut until it is completed
135+
return match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
132136
Poll::Pending => Poll::Pending,
133137
Poll::Ready((io, res)) => {
134138
this.io = Some(io);
@@ -153,17 +157,18 @@ impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> futures_
153157
}
154158

155159
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
156-
let this = self.get_mut();
160+
// safety: we don't move self
161+
let this = unsafe { self.get_unchecked_mut() };
157162
loop {
158163
match &mut this.fut {
159164
FutState::Idle => {
160165
let mut io = this.io.take().unwrap();
161166

162-
this.fut = FutState::Close(Box::pin(async move {
167+
this.fut = FutState::Close(async move {
163168
let res = io.shutdown().await;
164169

165170
(io, res)
166-
}));
171+
});
167172
}
168173

169174
FutState::Write(_) => {
@@ -173,7 +178,8 @@ impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> futures_
173178
}
174179

175180
FutState::Close(fut) => {
176-
return match Pin::new(fut).poll(cx) {
181+
// safety: we don't move fut until it is completed
182+
return match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
177183
Poll::Pending => Poll::Pending,
178184
Poll::Ready((io, res)) => {
179185
this.io = Some(io);
@@ -201,6 +207,7 @@ impl<'a, Io: compio_io::AsyncWrite + Unpin + 'a, Buf: IoBufMut + Unpin> futures_
201207
#[cfg(test)]
202208
mod tests {
203209
use std::env;
210+
use std::pin::pin;
204211

205212
use compio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
206213
use compio::runtime;
@@ -225,7 +232,8 @@ mod tests {
225232

226233
let mut tcp_stream = TcpStream::connect(addr).await.unwrap();
227234

228-
let mut compat_write = task.await;
235+
let compat_write = task.await;
236+
let mut compat_write = pin!(compat_write);
229237
compat_write.write_all(b"test").await.unwrap();
230238
compat_write.flush().await.unwrap();
231239
compat_write.close().await.unwrap();
@@ -248,7 +256,8 @@ mod tests {
248256

249257
let mut unix_stream = UnixStream::connect(path).unwrap();
250258
let unix_stream2 = task.await;
251-
let mut compat_write = CompatWrite::new(unix_stream2, vec![0; 100]);
259+
let compat_write = CompatWrite::new(unix_stream2, vec![0; 100]);
260+
let mut compat_write = pin!(compat_write);
252261
compat_write.write_all(b"test").await.unwrap();
253262
compat_write.flush().await.unwrap();
254263
compat_write.close().await.unwrap();

0 commit comments

Comments
 (0)