use std::io::{stdin, BufRead}; use std::{net::SocketAddr, time::Duration}; use anyhow::{Context, Result}; use futures::{Sink, SinkExt, Stream, StreamExt}; use tokio::net::TcpListener; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::{tungstenite::Error as WsError, tungstenite::Message}; use url::Url; const USAGE: &str = "\ Usage: webcat Connect to a ws:// URL Usage: webcat -l Listen to the given port You can use FIFOs to create a debugging man-in-the-middle setup: mkfifo client-in server-in webcat ws://example.com:3000/ < client-in > server-in webcat -l 4000 < server-in > client-in echo > server-in # unblock the FIFO deadlock\ "; #[tokio::main] async fn main() -> Result<()> { let args: Vec<_> = std::env::args().skip(1).collect(); if args.len() == 1 { start_client(args[0].parse().context("failed to parse URL")?).await?; } else if args.len() == 2 && args[0] == "-l" { start_server(args[1].parse().context("failed to parse port")?).await?; } else { println!("{}", USAGE); } Ok(()) } async fn start_client(url: Url) -> Result<()> { // stdio is blocking so we do it in a dedicated task let (stdin_sender, mut stdin_receiver) = unbounded_channel::(); tokio::spawn(send_stdin_to_channel(stdin_sender)); loop { match tokio_tungstenite::connect_async(&url).await { Ok((stream, _resp)) => { eprintln!("connected to server"); webcat("client >>>", stream, &mut stdin_receiver).await?; } Err(err) => { eprintln!("failed to connect to server: {}", err); let mut reconnect_timer = tokio::time::interval(Duration::from_secs(1)); eprint!("reconnecting in "); for i in (1..6).rev() { reconnect_timer.tick().await; eprint!("... {}", i); } eprintln!(); } } } } async fn start_server(port: u16) -> Result<()> { let listen_addr: SocketAddr = ([127, 0, 0, 1], port).into(); let listener = TcpListener::bind(listen_addr).await?; // stdio is blocking so we do it in a dedicated task let (stdin_sender, mut stdin_receiver) = unbounded_channel::(); tokio::spawn(send_stdin_to_channel(stdin_sender)); eprintln!("listening for connections on {}", listen_addr); loop { let (stream, _client_addr) = listener.accept().await?; eprintln!("accepted TCP connection"); match tokio_tungstenite::accept_async(stream).await { Ok(stream) => { webcat("server <<<", stream, &mut stdin_receiver).await?; } Err(err) => { eprintln!("failed to upgrade {}", err); } } } } async fn send_stdin_to_channel(sender: UnboundedSender) -> Result<()> { for line in stdin().lock().lines().flatten() { sender.send(line)?; } Ok(()) } async fn webcat> + Sink>( name: &str, stream: S, receiver: &mut UnboundedReceiver, ) -> Result<()> { let (mut write, mut read) = stream.split(); loop { tokio::select! { Some(line) = receiver.recv() => { if write.send(Message::Text(line)).await.is_err() { eprintln!("failed to send message"); } } msg = read.next() => { match msg { Some(Ok(msg)) => { match msg { Message::Text(text) => { println!("{}", text); if !atty::is(atty::Stream::Stdout) { // if stdout is redirected we also print the // messages to stderr for convenience eprintln!("{} {}", name, text); } } other => { eprintln!("received: {:?}", other); } } } Some(Err(err)) => { eprintln!("error: {:?}", err); } None => { return Ok(()); } } } }; } }