summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs132
1 files changed, 132 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..cc4163d
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,132 @@
+use std::io::{stdin, BufRead};
+use std::{net::SocketAddr, time::Duration};
+
+use anyhow::{Context, Result};
+use futures::{Sink, SinkExt, Stream, StreamExt};
+use tokio::net::TcpListener;
+use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
+use tokio_tungstenite::{tungstenite::Error as WsError, tungstenite::Message};
+use url::Url;
+
+const USAGE: &str = "\
+Usage: webcat <url> Connect to a ws:// URL
+Usage: webcat -l <port> Listen to the given port
+
+You can use FIFOs to create a debugging man-in-the-middle setup:
+
+ mkfifo client-in server-in
+ webcat ws://example.com:3000/ < client-in > server-in
+ webcat -l 4000 < server-in > client-in
+ echo > server-in # unblock the FIFO deadlock\
+";
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args: Vec<_> = std::env::args().skip(1).collect();
+ if args.len() == 1 {
+ start_client(args[0].parse().context("failed to parse URL")?).await?;
+ } else if args.len() == 2 && args[0] == "-l" {
+ start_server(args[1].parse().context("failed to parse port")?).await?;
+ } else {
+ println!("{}", USAGE);
+ }
+ Ok(())
+}
+
+async fn start_client(url: Url) -> Result<()> {
+ // stdio is blocking so we do it in a dedicated task
+ let (stdin_sender, mut stdin_receiver) = unbounded_channel::<String>();
+ tokio::spawn(send_stdin_to_channel(stdin_sender));
+
+ loop {
+ match tokio_tungstenite::connect_async(&url).await {
+ Ok((stream, _resp)) => {
+ eprintln!("connected to server");
+ webcat("client >>>", stream, &mut stdin_receiver).await?;
+ }
+ Err(err) => {
+ eprintln!("failed to connect to server: {}", err);
+ let mut reconnect_timer = tokio::time::interval(Duration::from_secs(1));
+ eprint!("reconnecting in ");
+ for i in (1..6).rev() {
+ reconnect_timer.tick().await;
+ eprint!("... {}", i);
+ }
+ eprintln!();
+ }
+ }
+ }
+}
+
+async fn start_server(port: u16) -> Result<()> {
+ let listen_addr: SocketAddr = ([127, 0, 0, 1], port).into();
+ let listener = TcpListener::bind(listen_addr).await?;
+
+ // stdio is blocking so we do it in a dedicated task
+ let (stdin_sender, mut stdin_receiver) = unbounded_channel::<String>();
+ tokio::spawn(send_stdin_to_channel(stdin_sender));
+
+ eprintln!("listening for connections on {}", listen_addr);
+
+ loop {
+ let (stream, _client_addr) = listener.accept().await?;
+ eprintln!("accepted TCP connection");
+ match tokio_tungstenite::accept_async(stream).await {
+ Ok(stream) => {
+ webcat("server <<<", stream, &mut stdin_receiver).await?;
+ }
+ Err(err) => {
+ eprintln!("failed to upgrade {}", err);
+ }
+ }
+ }
+}
+
+async fn send_stdin_to_channel(sender: UnboundedSender<String>) -> Result<()> {
+ for line in stdin().lock().lines().flatten() {
+ sender.send(line)?;
+ }
+ Ok(())
+}
+
+async fn webcat<S: Stream<Item = Result<Message, WsError>> + Sink<Message>>(
+ name: &str,
+ stream: S,
+ receiver: &mut UnboundedReceiver<String>,
+) -> Result<()> {
+ let (mut write, mut read) = stream.split();
+ loop {
+ tokio::select! {
+ Some(line) = receiver.recv() => {
+ if write.send(Message::Text(line)).await.is_err() {
+ eprintln!("failed to send message");
+ }
+ }
+ msg = read.next() => {
+ match msg {
+ Some(Ok(msg)) => {
+ match msg {
+ Message::Text(text) => {
+ println!("{}", text);
+ if !atty::is(atty::Stream::Stdout) {
+ // if stdout is redirected we also print the
+ // messages to stderr for convenience
+ eprintln!("{} {}", name, text);
+ }
+ }
+ other => {
+ eprintln!("received: {:?}", other);
+ }
+ }
+ }
+ Some(Err(err)) => {
+ eprintln!("error: {:?}", err);
+ }
+ None => {
+ return Ok(());
+ }
+ }
+ }
+ };
+ }
+}