summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs104
1 files changed, 103 insertions, 1 deletions
diff --git a/src/main.rs b/src/main.rs
index e7a11a9..b899e09 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,105 @@
+extern crate socketcan;
+extern crate tokio;
+extern crate futures;
+extern crate mio;
+
+use mio::{Ready, Poll, PollOpt, Token};
+use mio::event::Evented;
+use mio::unix::EventedFd;
+
+use tokio::reactor::PollEvented2;
+use tokio::prelude::*;
+
+use std::os::unix::io::AsRawFd;
+use std::io;
+
+// Can Stream which is tokioable et al..
+
+pub struct EventableCANSocket {
+ socket: socketcan::CANSocket,
+}
+
+impl EventableCANSocket {
+ fn new(socket: socketcan::CANSocket) -> EventableCANSocket {
+ EventableCANSocket { socket: socket }
+ }
+
+ pub fn from_interfacename(ifname: &str) -> Result<EventableCANSocket, socketcan::CANSocketOpenError> {
+ Ok(EventableCANSocket::new(socketcan::CANSocket::open(ifname)?))
+ }
+
+ pub fn get_ref(&self) -> &socketcan::CANSocket {
+ &self.socket
+ }
+}
+
+impl Evented for EventableCANSocket {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.socket.as_raw_fd()).register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.socket.as_raw_fd()).reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ EventedFd(&self.socket.as_raw_fd()).deregister(poll)
+ }
+}
+
+pub struct CANStream {
+ socket: PollEvented2<EventableCANSocket>,
+}
+
+impl CANStream {
+ pub fn new(socket: EventableCANSocket) -> CANStream {
+ CANStream { socket: PollEvented2::new(socket) }
+ }
+}
+
+impl Stream for CANStream {
+ type Item = socketcan::CANFrame;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
+ // We shouldn't be woken unless we're ready, but just in case, let's
+ // ensure...
+ let readiness = match self.socket.poll_read_ready(Ready::readable())? {
+ Async::NotReady => { return Ok(Async::NotReady) },
+ Async::Ready(r) => r
+ };
+
+ if !readiness.is_readable() {
+ return Ok(Async::NotReady);
+ }
+
+ // Okay, we're readable, so let's have a go at reading...
+ let frame = self.socket.get_ref().get_ref().read_frame()?;
+
+ Ok(Async::Ready(Some(frame)))
+ }
+
+}
+
fn main() {
- println!("Hello, world!");
+ println!("Starting up candump-rs");
+
+ let evented = match EventableCANSocket::from_interfacename("vcan0") {
+ Ok(v) => v,
+ Err(e) => panic!("Unable to open socket: {}", e)
+ };
+
+ println!("Opened CAN socket");
+
+ let streamed = CANStream::new(evented);
+
+ println!("Made it streamed, starting reactor...");
+
+ tokio::run(streamed
+ .map_err(|e| println!("error = {:?}", e))
+ .for_each(|frame| {
+ println!("Received a frame: {:?}", frame);
+ Ok(())
+ }));
+
}