diff options
| author | Martin Fischer <martin@push-f.com> | 2021-09-12 16:04:38 +0200 | 
|---|---|---|
| committer | Martin Fischer <martin@push-f.com> | 2021-09-12 19:29:26 +0200 | 
| commit | 8b9bcd0d86eaa27dc78b82ea65c9f60dfc7ce621 (patch) | |
| tree | 556ffb0e620c6d9f9a5df6d6ebb5630506282c09 /src | |
initial commit
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 132 | 
1 files changed, 132 insertions, 0 deletions
| diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..cc4163d --- /dev/null +++ b/src/main.rs @@ -0,0 +1,132 @@ +use std::io::{stdin, BufRead}; +use std::{net::SocketAddr, time::Duration}; + +use anyhow::{Context, Result}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use tokio::net::TcpListener; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio_tungstenite::{tungstenite::Error as WsError, tungstenite::Message}; +use url::Url; + +const USAGE: &str = "\ +Usage: webcat <url>       Connect to a ws:// URL +Usage: webcat -l <port>   Listen to the given port + +You can use FIFOs to create a debugging man-in-the-middle setup: + +    mkfifo client-in server-in +    webcat ws://example.com:3000/ < client-in > server-in +    webcat -l 4000 < server-in > client-in +    echo > server-in # unblock the FIFO deadlock\ +"; + +#[tokio::main] +async fn main() -> Result<()> { +    let args: Vec<_> = std::env::args().skip(1).collect(); +    if args.len() == 1 { +        start_client(args[0].parse().context("failed to parse URL")?).await?; +    } else if args.len() == 2 && args[0] == "-l" { +        start_server(args[1].parse().context("failed to parse port")?).await?; +    } else { +        println!("{}", USAGE); +    } +    Ok(()) +} + +async fn start_client(url: Url) -> Result<()> { +    // stdio is blocking so we do it in a dedicated task +    let (stdin_sender, mut stdin_receiver) = unbounded_channel::<String>(); +    tokio::spawn(send_stdin_to_channel(stdin_sender)); + +    loop { +        match tokio_tungstenite::connect_async(&url).await { +            Ok((stream, _resp)) => { +                eprintln!("connected to server"); +                webcat("client >>>", stream, &mut stdin_receiver).await?; +            } +            Err(err) => { +                eprintln!("failed to connect to server: {}", err); +                let mut reconnect_timer = tokio::time::interval(Duration::from_secs(1)); +                eprint!("reconnecting in "); +                for i in (1..6).rev() { +                    reconnect_timer.tick().await; +                    eprint!("... {}", i); +                } +                eprintln!(); +            } +        } +    } +} + +async fn start_server(port: u16) -> Result<()> { +    let listen_addr: SocketAddr = ([127, 0, 0, 1], port).into(); +    let listener = TcpListener::bind(listen_addr).await?; + +    // stdio is blocking so we do it in a dedicated task +    let (stdin_sender, mut stdin_receiver) = unbounded_channel::<String>(); +    tokio::spawn(send_stdin_to_channel(stdin_sender)); + +    eprintln!("listening for connections on {}", listen_addr); + +    loop { +        let (stream, _client_addr) = listener.accept().await?; +        eprintln!("accepted TCP connection"); +        match tokio_tungstenite::accept_async(stream).await { +            Ok(stream) => { +                webcat("server <<<", stream, &mut stdin_receiver).await?; +            } +            Err(err) => { +                eprintln!("failed to upgrade {}", err); +            } +        } +    } +} + +async fn send_stdin_to_channel(sender: UnboundedSender<String>) -> Result<()> { +    for line in stdin().lock().lines().flatten() { +        sender.send(line)?; +    } +    Ok(()) +} + +async fn webcat<S: Stream<Item = Result<Message, WsError>> + Sink<Message>>( +    name: &str, +    stream: S, +    receiver: &mut UnboundedReceiver<String>, +) -> Result<()> { +    let (mut write, mut read) = stream.split(); +    loop { +        tokio::select! { +            Some(line) = receiver.recv() => { +                if write.send(Message::Text(line)).await.is_err() { +                    eprintln!("failed to send message"); +                } +            } +            msg = read.next() => { +                match msg { +                    Some(Ok(msg)) => { +                        match msg { +                            Message::Text(text) => { +                                println!("{}", text); +                                if !atty::is(atty::Stream::Stdout) { +                                    // if stdout is redirected we also print the +                                    // messages to stderr for convenience +                                    eprintln!("{} {}", name, text); +                                } +                            } +                            other => { +                                eprintln!("received: {:?}", other); +                            } +                        } +                    } +                    Some(Err(err)) => { +                        eprintln!("error: {:?}", err); +                    } +                    None => { +                        return Ok(()); +                    } +                } +            } +        }; +    } +} | 
