diff options
author | Martin Fischer <martin@push-f.com> | 2021-09-12 16:04:38 +0200 |
---|---|---|
committer | Martin Fischer <martin@push-f.com> | 2021-09-12 19:29:26 +0200 |
commit | 8b9bcd0d86eaa27dc78b82ea65c9f60dfc7ce621 (patch) | |
tree | 556ffb0e620c6d9f9a5df6d6ebb5630506282c09 /src/main.rs |
initial commit
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..cc4163d --- /dev/null +++ b/src/main.rs @@ -0,0 +1,132 @@ +use std::io::{stdin, BufRead}; +use std::{net::SocketAddr, time::Duration}; + +use anyhow::{Context, Result}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use tokio::net::TcpListener; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio_tungstenite::{tungstenite::Error as WsError, tungstenite::Message}; +use url::Url; + +const USAGE: &str = "\ +Usage: webcat <url> Connect to a ws:// URL +Usage: webcat -l <port> Listen to the given port + +You can use FIFOs to create a debugging man-in-the-middle setup: + + mkfifo client-in server-in + webcat ws://example.com:3000/ < client-in > server-in + webcat -l 4000 < server-in > client-in + echo > server-in # unblock the FIFO deadlock\ +"; + +#[tokio::main] +async fn main() -> Result<()> { + let args: Vec<_> = std::env::args().skip(1).collect(); + if args.len() == 1 { + start_client(args[0].parse().context("failed to parse URL")?).await?; + } else if args.len() == 2 && args[0] == "-l" { + start_server(args[1].parse().context("failed to parse port")?).await?; + } else { + println!("{}", USAGE); + } + Ok(()) +} + +async fn start_client(url: Url) -> Result<()> { + // stdio is blocking so we do it in a dedicated task + let (stdin_sender, mut stdin_receiver) = unbounded_channel::<String>(); + tokio::spawn(send_stdin_to_channel(stdin_sender)); + + loop { + match tokio_tungstenite::connect_async(&url).await { + Ok((stream, _resp)) => { + eprintln!("connected to server"); + webcat("client >>>", stream, &mut stdin_receiver).await?; + } + Err(err) => { + eprintln!("failed to connect to server: {}", err); + let mut reconnect_timer = tokio::time::interval(Duration::from_secs(1)); + eprint!("reconnecting in "); + for i in (1..6).rev() { + reconnect_timer.tick().await; + eprint!("... {}", i); + } + eprintln!(); + } + } + } +} + +async fn start_server(port: u16) -> Result<()> { + let listen_addr: SocketAddr = ([127, 0, 0, 1], port).into(); + let listener = TcpListener::bind(listen_addr).await?; + + // stdio is blocking so we do it in a dedicated task + let (stdin_sender, mut stdin_receiver) = unbounded_channel::<String>(); + tokio::spawn(send_stdin_to_channel(stdin_sender)); + + eprintln!("listening for connections on {}", listen_addr); + + loop { + let (stream, _client_addr) = listener.accept().await?; + eprintln!("accepted TCP connection"); + match tokio_tungstenite::accept_async(stream).await { + Ok(stream) => { + webcat("server <<<", stream, &mut stdin_receiver).await?; + } + Err(err) => { + eprintln!("failed to upgrade {}", err); + } + } + } +} + +async fn send_stdin_to_channel(sender: UnboundedSender<String>) -> Result<()> { + for line in stdin().lock().lines().flatten() { + sender.send(line)?; + } + Ok(()) +} + +async fn webcat<S: Stream<Item = Result<Message, WsError>> + Sink<Message>>( + name: &str, + stream: S, + receiver: &mut UnboundedReceiver<String>, +) -> Result<()> { + let (mut write, mut read) = stream.split(); + loop { + tokio::select! { + Some(line) = receiver.recv() => { + if write.send(Message::Text(line)).await.is_err() { + eprintln!("failed to send message"); + } + } + msg = read.next() => { + match msg { + Some(Ok(msg)) => { + match msg { + Message::Text(text) => { + println!("{}", text); + if !atty::is(atty::Stream::Stdout) { + // if stdout is redirected we also print the + // messages to stderr for convenience + eprintln!("{} {}", name, text); + } + } + other => { + eprintln!("received: {:?}", other); + } + } + } + Some(Err(err)) => { + eprintln!("error: {:?}", err); + } + None => { + return Ok(()); + } + } + } + }; + } +} |