From 8b9bcd0d86eaa27dc78b82ea65c9f60dfc7ce621 Mon Sep 17 00:00:00 2001 From: Martin Fischer Date: Sun, 12 Sep 2021 16:04:38 +0200 Subject: initial commit --- src/main.rs | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 src/main.rs (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..cc4163d --- /dev/null +++ b/src/main.rs @@ -0,0 +1,132 @@ +use std::io::{stdin, BufRead}; +use std::{net::SocketAddr, time::Duration}; + +use anyhow::{Context, Result}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use tokio::net::TcpListener; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio_tungstenite::{tungstenite::Error as WsError, tungstenite::Message}; +use url::Url; + +const USAGE: &str = "\ +Usage: webcat Connect to a ws:// URL +Usage: webcat -l Listen to the given port + +You can use FIFOs to create a debugging man-in-the-middle setup: + + mkfifo client-in server-in + webcat ws://example.com:3000/ < client-in > server-in + webcat -l 4000 < server-in > client-in + echo > server-in # unblock the FIFO deadlock\ +"; + +#[tokio::main] +async fn main() -> Result<()> { + let args: Vec<_> = std::env::args().skip(1).collect(); + if args.len() == 1 { + start_client(args[0].parse().context("failed to parse URL")?).await?; + } else if args.len() == 2 && args[0] == "-l" { + start_server(args[1].parse().context("failed to parse port")?).await?; + } else { + println!("{}", USAGE); + } + Ok(()) +} + +async fn start_client(url: Url) -> Result<()> { + // stdio is blocking so we do it in a dedicated task + let (stdin_sender, mut stdin_receiver) = unbounded_channel::(); + tokio::spawn(send_stdin_to_channel(stdin_sender)); + + loop { + match tokio_tungstenite::connect_async(&url).await { + Ok((stream, _resp)) => { + eprintln!("connected to server"); + webcat("client >>>", stream, &mut stdin_receiver).await?; + } + Err(err) => { + eprintln!("failed to connect to server: {}", err); + let mut reconnect_timer = tokio::time::interval(Duration::from_secs(1)); + eprint!("reconnecting in "); + for i in (1..6).rev() { + reconnect_timer.tick().await; + eprint!("... {}", i); + } + eprintln!(); + } + } + } +} + +async fn start_server(port: u16) -> Result<()> { + let listen_addr: SocketAddr = ([127, 0, 0, 1], port).into(); + let listener = TcpListener::bind(listen_addr).await?; + + // stdio is blocking so we do it in a dedicated task + let (stdin_sender, mut stdin_receiver) = unbounded_channel::(); + tokio::spawn(send_stdin_to_channel(stdin_sender)); + + eprintln!("listening for connections on {}", listen_addr); + + loop { + let (stream, _client_addr) = listener.accept().await?; + eprintln!("accepted TCP connection"); + match tokio_tungstenite::accept_async(stream).await { + Ok(stream) => { + webcat("server <<<", stream, &mut stdin_receiver).await?; + } + Err(err) => { + eprintln!("failed to upgrade {}", err); + } + } + } +} + +async fn send_stdin_to_channel(sender: UnboundedSender) -> Result<()> { + for line in stdin().lock().lines().flatten() { + sender.send(line)?; + } + Ok(()) +} + +async fn webcat> + Sink>( + name: &str, + stream: S, + receiver: &mut UnboundedReceiver, +) -> Result<()> { + let (mut write, mut read) = stream.split(); + loop { + tokio::select! { + Some(line) = receiver.recv() => { + if write.send(Message::Text(line)).await.is_err() { + eprintln!("failed to send message"); + } + } + msg = read.next() => { + match msg { + Some(Ok(msg)) => { + match msg { + Message::Text(text) => { + println!("{}", text); + if !atty::is(atty::Stream::Stdout) { + // if stdout is redirected we also print the + // messages to stderr for convenience + eprintln!("{} {}", name, text); + } + } + other => { + eprintln!("received: {:?}", other); + } + } + } + Some(Err(err)) => { + eprintln!("error: {:?}", err); + } + None => { + return Ok(()); + } + } + } + }; + } +} -- cgit v1.2.3