summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Silverstone <dsilvers@digital-scurf.org>2018-04-08 17:11:35 +0100
committerDaniel Silverstone <dsilvers@digital-scurf.org>2018-04-08 17:11:35 +0100
commit2535e9adba7331e11d7e36b32dce3714c17134e0 (patch)
tree0b6b82279df87755ccb3cfd9d40fae5e4984954f
parent7dcb6656251d4363d27d0ea3fb224a341a58ec3d (diff)
downloadcanopied-2535e9adba7331e11d7e36b32dce3714c17134e0.tar.bz2
Fix stuck reader, reduce traces
-rw-r--r--src/canstream.rs23
1 files changed, 12 insertions, 11 deletions
diff --git a/src/canstream.rs b/src/canstream.rs
index 9401047..1451797 100644
--- a/src/canstream.rs
+++ b/src/canstream.rs
@@ -16,6 +16,7 @@ pub struct EventableCANSocket {
impl EventableCANSocket {
fn new(socket: CANSocket) -> EventableCANSocket {
+ socket.set_nonblocking(true).unwrap();
EventableCANSocket { socket: socket }
}
@@ -94,7 +95,17 @@ impl Stream for CANStream {
}
// Okay, we're readable, so let's have a go at reading...
- let frame = self.socket.get_ref().get_ref().read_frame()?;
+ let frame = match self.socket.get_ref().get_ref().read_frame() {
+ Ok(f) => f,
+ Err(e) => {
+ if e.should_retry() {
+ return Ok(Async::NotReady);
+ } else {
+ warn!("Unable to cope with error: {:?}", e);
+ return Err(e);
+ }
+ }
+ };
Ok(Async::Ready(Some(frame)))
}
@@ -109,13 +120,11 @@ impl Sink for CANStream {
item: Self::SinkItem,
) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
if !self.flushed {
- trace!("Checking poll_complete in start_send");
match self.poll_complete()? {
Async::Ready(()) => {}
Async::NotReady => return Ok(AsyncSink::NotReady(item)),
}
}
- trace!("Okay, storing frame for transmission later");
self.pending_frame = Some(item);
self.flushed = false;
Ok(AsyncSink::Ready)
@@ -123,7 +132,6 @@ impl Sink for CANStream {
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
if self.flushed {
- trace!("poll_complete already flushed, stop");
return Ok(Async::Ready(()));
}
@@ -131,36 +139,29 @@ impl Sink for CANStream {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(r) => r,
};
- trace!("Ready!");
if !readiness.is_writable() {
return Ok(Async::NotReady);
}
- trace!("Writable");
match self.socket
.get_ref()
.get_ref()
.write_frame(&self.pending_frame.unwrap())
{
Ok(()) => {
- trace!("Written, clearing pending frame");
self.pending_frame = None;
}
Err(e) => {
if !e.should_retry() {
- trace!("Bad error");
return Err(e);
}
- trace!("Retryable error");
}
}
if self.pending_frame.is_none() {
self.flushed = true;
- trace!("Okay, we finished, flushed is true");
Ok(Async::Ready(()))
} else {
- trace!("Not finished, perhaps EAGAIN?");
Ok(Async::NotReady)
}
}