summaryrefslogtreecommitdiff
path: root/src/isotp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/isotp.rs')
-rw-r--r--src/isotp.rs88
1 files changed, 83 insertions, 5 deletions
diff --git a/src/isotp.rs b/src/isotp.rs
index 0085e0d..d5dc3d5 100644
--- a/src/isotp.rs
+++ b/src/isotp.rs
@@ -6,6 +6,13 @@ use std::collections::{HashMap, VecDeque};
use std::cmp::min;
+use std::io;
+
+use tokio;
+use tokio::prelude::*;
+
+use futures::sync::mpsc;
+
const DEFAULT_BLOCKS: u8 = 16;
struct IncomingPacket {
@@ -131,24 +138,39 @@ impl OutgoingPacket {
}
pub struct ISOTP {
+ canstream: CANStream,
+ cansink: mpsc::Sender<CANFrame>,
incoming: HashMap<u16, IncomingPacket>,
outgoing: HashMap<u16, OutgoingPacket>,
txqueue: VecDeque<CANFrame>,
}
impl ISOTP {
- pub fn new() -> ISOTP {
+ pub fn new(interface: &str) -> ISOTP {
+ let stream = CANStream::from_name(interface).unwrap();
+ let sink = CANStream::from_name(interface).unwrap();
+
+ let (sender, receiver) = mpsc::channel(100);
+
+ tokio::spawn(
+ receiver
+ .map(|f| {
+ trace!("Forwarding {:X}", f);
+ f
+ })
+ .forward(sink.sink_map_err(|_| ()))
+ .map(|_| ()),
+ );
+
ISOTP {
+ canstream: stream,
+ cansink: sender,
incoming: HashMap::new(),
outgoing: HashMap::new(),
txqueue: VecDeque::new(),
}
}
- pub fn get_txqueue(&mut self) -> Option<CANFrame> {
- self.txqueue.pop_front()
- }
-
pub fn send_packet(&mut self, destination: u16, payload: &[u8]) {
let answerer = destination + 8;
let mut packet = OutgoingPacket::new(destination, payload);
@@ -162,6 +184,21 @@ impl ISOTP {
} else {
trace!("Nothing more to send!");
}
+ self.drain_queue();
+ }
+
+ fn drain_queue(&mut self) {
+ if self.txqueue.len() > 0 {
+ trace!("Draining {} frames from the queue", self.txqueue.len());
+ let mut waiter = self.cansink.clone().wait();
+
+ while let Some(frame) = self.txqueue.pop_front() {
+ trace!("Draining frame {:X}", frame);
+ waiter.send(frame).unwrap();
+ waiter.flush().unwrap();
+ }
+ trace!("Drain complete");
+ }
}
pub fn handle_frame(&mut self, frame: &CANFrame) -> Option<(u16, Vec<u8>)> {
@@ -194,6 +231,7 @@ impl ISOTP {
// We insert this into the hash
self.incoming.insert(id, incoming);
// Finally we return None since we've not completed reception
+ self.drain_queue();
return None;
}
2 => {
@@ -236,6 +274,7 @@ impl ISOTP {
if do_delete {
self.incoming.remove(&id);
}
+ self.drain_queue();
return ret;
}
3 => {
@@ -248,6 +287,7 @@ impl ISOTP {
self.outgoing.insert(id, outpacket);
}
}
+ self.drain_queue();
}
_ => {}
}
@@ -271,3 +311,41 @@ impl ISOTP {
!packet.is_finished()
}
}
+
+impl Stream for ISOTP {
+ type Item = (u16, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
+ match self.canstream.poll()? {
+ Async::NotReady => Ok(Async::NotReady),
+ Async::Ready(None) => Ok(Async::Ready(None)),
+ Async::Ready(Some(frame)) => {
+ if let Some(out) = self.handle_frame(&frame) {
+ Ok(Async::Ready(Some(out)))
+ } else {
+ // Nothing to do, best to poll ourselves again to get our
+ // readerness back on the epoll queue
+ self.poll()
+ }
+ }
+ }
+ }
+}
+
+impl Sink for ISOTP {
+ type SinkItem = (u16, Vec<u8>);
+ type SinkError = ();
+
+ fn start_send(
+ &mut self,
+ item: Self::SinkItem,
+ ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
+ self.send_packet(item.0, &item.1);
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
+ Ok(Async::Ready(()))
+ }
+}