summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Silverstone <dsilvers@digital-scurf.org>2018-04-04 21:19:31 +0100
committerDaniel Silverstone <dsilvers@digital-scurf.org>2018-04-04 21:19:31 +0100
commitf9e951115363687040c6532d4f4d59b956fc1485 (patch)
treeadf7a497a18f7ef08a3bf73ef74dd22a8ead959e
parent39e6030a357789637eed2e69a8b791d63911e18b (diff)
downloadcanopied-f9e951115363687040c6532d4f4d59b956fc1485.tar.bz2
Fix up to support sink and stream on ISOTP
-rw-r--r--src/isotp.rs88
-rw-r--r--src/main.rs41
2 files changed, 101 insertions, 28 deletions
diff --git a/src/isotp.rs b/src/isotp.rs
index 0085e0d..d5dc3d5 100644
--- a/src/isotp.rs
+++ b/src/isotp.rs
@@ -6,6 +6,13 @@ use std::collections::{HashMap, VecDeque};
use std::cmp::min;
+use std::io;
+
+use tokio;
+use tokio::prelude::*;
+
+use futures::sync::mpsc;
+
const DEFAULT_BLOCKS: u8 = 16;
struct IncomingPacket {
@@ -131,24 +138,39 @@ impl OutgoingPacket {
}
pub struct ISOTP {
+ canstream: CANStream,
+ cansink: mpsc::Sender<CANFrame>,
incoming: HashMap<u16, IncomingPacket>,
outgoing: HashMap<u16, OutgoingPacket>,
txqueue: VecDeque<CANFrame>,
}
impl ISOTP {
- pub fn new() -> ISOTP {
+ pub fn new(interface: &str) -> ISOTP {
+ let stream = CANStream::from_name(interface).unwrap();
+ let sink = CANStream::from_name(interface).unwrap();
+
+ let (sender, receiver) = mpsc::channel(100);
+
+ tokio::spawn(
+ receiver
+ .map(|f| {
+ trace!("Forwarding {:X}", f);
+ f
+ })
+ .forward(sink.sink_map_err(|_| ()))
+ .map(|_| ()),
+ );
+
ISOTP {
+ canstream: stream,
+ cansink: sender,
incoming: HashMap::new(),
outgoing: HashMap::new(),
txqueue: VecDeque::new(),
}
}
- pub fn get_txqueue(&mut self) -> Option<CANFrame> {
- self.txqueue.pop_front()
- }
-
pub fn send_packet(&mut self, destination: u16, payload: &[u8]) {
let answerer = destination + 8;
let mut packet = OutgoingPacket::new(destination, payload);
@@ -162,6 +184,21 @@ impl ISOTP {
} else {
trace!("Nothing more to send!");
}
+ self.drain_queue();
+ }
+
+ fn drain_queue(&mut self) {
+ if self.txqueue.len() > 0 {
+ trace!("Draining {} frames from the queue", self.txqueue.len());
+ let mut waiter = self.cansink.clone().wait();
+
+ while let Some(frame) = self.txqueue.pop_front() {
+ trace!("Draining frame {:X}", frame);
+ waiter.send(frame).unwrap();
+ waiter.flush().unwrap();
+ }
+ trace!("Drain complete");
+ }
}
pub fn handle_frame(&mut self, frame: &CANFrame) -> Option<(u16, Vec<u8>)> {
@@ -194,6 +231,7 @@ impl ISOTP {
// We insert this into the hash
self.incoming.insert(id, incoming);
// Finally we return None since we've not completed reception
+ self.drain_queue();
return None;
}
2 => {
@@ -236,6 +274,7 @@ impl ISOTP {
if do_delete {
self.incoming.remove(&id);
}
+ self.drain_queue();
return ret;
}
3 => {
@@ -248,6 +287,7 @@ impl ISOTP {
self.outgoing.insert(id, outpacket);
}
}
+ self.drain_queue();
}
_ => {}
}
@@ -271,3 +311,41 @@ impl ISOTP {
!packet.is_finished()
}
}
+
+impl Stream for ISOTP {
+ type Item = (u16, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
+ match self.canstream.poll()? {
+ Async::NotReady => Ok(Async::NotReady),
+ Async::Ready(None) => Ok(Async::Ready(None)),
+ Async::Ready(Some(frame)) => {
+ if let Some(out) = self.handle_frame(&frame) {
+ Ok(Async::Ready(Some(out)))
+ } else {
+ // Nothing to do, best to poll ourselves again to get our
+ // readerness back on the epoll queue
+ self.poll()
+ }
+ }
+ }
+ }
+}
+
+impl Sink for ISOTP {
+ type SinkItem = (u16, Vec<u8>);
+ type SinkError = ();
+
+ fn start_send(
+ &mut self,
+ item: Self::SinkItem,
+ ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
+ self.send_packet(item.0, &item.1);
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
+ Ok(Async::Ready(()))
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 36307e6..dcc3597 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,27 +14,22 @@ use tokio::prelude::*;
fn main() {
env_logger::init();
- let (sink, stream) = canstream::CANStream::from_name("vcan0").unwrap().split();
- let mut waiter = sink.wait();
- let mut mytp = isotp::ISOTP::new();
- tokio::run(
- stream
- .map_err(|e| error!("error = {:?}", e))
- .for_each(move |frame| {
- info!("Received CAN frame: {:X}", frame);
- match mytp.handle_frame(&frame) {
- None => {}
- Some((id, data)) => {
- info!("Received from {:03x} data {:?}", id, data);
- mytp.send_packet(0x123, &data);
- }
- }
- while let Some(packet) = mytp.get_txqueue() {
- info!(" Sending CAN frame: {:X}", packet);
- waiter.send(packet).map_err(|_| ())?;
- waiter.flush().map_err(|_| ())?;
- }
- Ok(())
- }),
- );
+ let world = futures::future::lazy(|| {
+ let (sink, stream) = isotp::ISOTP::new("slcan0").split();
+
+ tokio::spawn(futures::future::lazy(|| {
+ sink.send((0x7E4, vec![0x21, 0x01])).map(|_| ())
+ }));
+
+ tokio::spawn(
+ stream
+ .map_err(|e| error!("error = {:?}", e))
+ .for_each(move |frame| {
+ info!("Received ISO-TP frame: {:?}", frame);
+
+ Ok(())
+ }),
+ )
+ });
+ tokio::run(world);
}