use std::io::{stdin, BufRead}; use std::{net::SocketAddr, time::Duration}; use anyhow::{Context, Result}; use futures::{Sink, SinkExt, Stream, StreamExt}; use tokio::net::TcpListener; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::{tungstenite::Error as WsError, tungstenite::Message}; use url::Url; const USAGE: &str = "\ Usage: webcat Connect to a ws:// URL Usage: webcat -l Listen to the given port You can use FIFOs to create a debugging man-in-the-middle setup: mkfifo client-in server-in webcat ws://example.com:3000/ < client-in > server-in webcat -l 4000 < server-in > client-in echo > server-in # unblock the FIFO deadlock\ "; #[tokio::main] async fn main() -> Result<()> { let args: Vec<_> = std::env::args().skip(1).collect(); if args.len() == 1 { start_client(args[0].parse().context("failed to parse URL")?).await?; } else if args.len() == 2 && args[0] == "-l" { start_server(args[1].parse().context("failed to parse port")?).await?; } else { println!("{}", USAGE); } Ok(()) } async fn start_client(url: Url) -> Result<()> { // stdio is blocking so we do it in a dedicated task let (stdin_sender, mut stdin_receiver) = unbounded_channel::(); tokio::spawn(send_stdin_to_channel(stdin_sender)); loop { match tokio_tungstenite::connect_async(&url).await { Ok((stream, _resp)) => { eprintln!("connected to server"); webcat(Mode::Client, stream, &mut stdin_receiver).await?; } Err(err) => { eprintln!("failed to connect to server: {}", err); let mut reconnect_timer = tokio::time::interval(Duration::from_secs(1)); eprint!("reconnecting in "); for i in (1..6).rev() { reconnect_timer.tick().await; eprint!("... {}", i); } eprintln!(); } } } } const SERVER_ACCEPTED_NEW_CONN: &str = "accepted new client"; async fn start_server(port: u16) -> Result<()> { let listen_addr: SocketAddr = ([127, 0, 0, 1], port).into(); let listener = TcpListener::bind(listen_addr).await?; // stdio is blocking so we do it in a dedicated task let (stdin_sender, mut stdin_receiver) = unbounded_channel::(); tokio::spawn(send_stdin_to_channel(stdin_sender)); eprintln!("listening for connections on {}", listen_addr); loop { let (stream, _client_addr) = listener.accept().await?; match tokio_tungstenite::accept_async(stream).await { Ok(stream) => { println!("{}", SERVER_ACCEPTED_NEW_CONN); webcat(Mode::Server, stream, &mut stdin_receiver).await?; } Err(err) => { eprintln!("failed to upgrade {}", err); } } } } async fn send_stdin_to_channel(sender: UnboundedSender) -> Result<()> { for line in stdin().lock().lines().flatten() { sender.send(line)?; } Ok(()) } #[derive(PartialEq)] enum Mode { Server, Client, } async fn webcat> + Sink>( mode: Mode, stream: S, receiver: &mut UnboundedReceiver, ) -> Result<()> { let (mut write, mut read) = stream.split(); loop { tokio::select! { Some(line) = receiver.recv() => { if atty::isnt(atty::Stream::Stdin) && mode == Mode::Client && line == SERVER_ACCEPTED_NEW_CONN { // Assuming the MITM setup described in the README, we // disconnect and reconnect the client to prevent the // application client and server from desynchronizing. return Ok(()); } else if write.send(Message::Text(line)).await.is_err() { eprintln!("failed to send message"); } } msg = read.next() => { match msg { Some(Ok(msg)) => { match msg { Message::Text(text) => { println!("{}", text); if !atty::is(atty::Stream::Stdout) { // if stdout is redirected we also print the // messages to stderr for convenience eprintln!("{} {}", match mode { Mode::Server => "server <<<", Mode::Client => "client >>>", }, text); } } other => { eprintln!("received: {:?}", other); } } } Some(Err(err)) => { eprintln!("error: {:?}", err); } None => { return Ok(()); } } } }; } }