websockify/other/websockify.clj

233 lines
8.1 KiB
Clojure

(ns websockify
;(:use ring.adapter.jetty)
(:require [clojure.tools.cli :as cli]
[clojure.string :as string])
(:import
;; Netty TCP Client
[java.util.concurrent Executors]
[java.net InetSocketAddress]
[org.jboss.netty.channel
Channels SimpleChannelHandler ChannelPipelineFactory]
[org.jboss.netty.buffer ChannelBuffers]
[org.jboss.netty.channel.socket.nio NioClientSocketChannelFactory]
[org.jboss.netty.bootstrap ClientBootstrap]
[org.jboss.netty.handler.codec.base64 Base64]
[org.jboss.netty.util CharsetUtil]
;; Jetty WebSocket Server
[org.eclipse.jetty.server Server]
[org.eclipse.jetty.server.nio BlockingChannelConnector]
[org.eclipse.jetty.servlet
ServletContextHandler ServletHolder DefaultServlet]
[org.eclipse.jetty.websocket
WebSocket WebSocket$OnTextMessage
WebSocketClientFactory WebSocketClient WebSocketServlet]))
;; TCP / NIO
;; (defn tcp-channel [host port]
;; (try
;; (let [address (InetSocketAddress. host port)
;; channel (doto (SocketChannel/open)
;; (.connect address))]
;; channel)
;; (catch Exception e
;; (println (str "Failed to connect to'" host ":" port "':" e))
;; nil)))
;; http://docs.jboss.org/netty/3.2/guide/html/start.html#d0e51
;; http://stackoverflow.com/questions/5453602/highly-concurrent-http-with-netty-and-nio
;; https://github.com/datskos/ring-netty-adapter/blob/master/src/ring/adapter/netty.clj
(defn netty-client [host port open close message]
(let [handler (proxy [SimpleChannelHandler] []
(channelConnected [ctx e] (open ctx e))
(channelDisconnected [ctx e] (close ctx e))
(messageReceived [ctx e] (message ctx e))
(exceptionCaught [ctx e]
(println "exceptionCaught:" e)))
pipeline (proxy [ChannelPipelineFactory] []
(getPipeline []
(doto (Channels/pipeline)
(.addLast "handler" handler))))
bootstrap (doto (ClientBootstrap.
(NioClientSocketChannelFactory.
(Executors/newCachedThreadPool)
(Executors/newCachedThreadPool)))
(.setPipelineFactory pipeline)
(.setOption "tcpNoDelay" true)
(.setOption "keepAlive" true))
channel-future (.connect bootstrap (InetSocketAddress. host port))
channel (.. channel-future (awaitUninterruptibly) (getChannel))]
channel))
;; WebSockets
;; http://wiki.eclipse.org/Jetty/Feature/WebSockets
(defn make-websocket-servlet [open close message]
(proxy [WebSocketServlet] []
(doGet [request response]
;;(println "doGet" request)
(.. (proxy-super getServletContext)
(getNamedDispatcher (proxy-super getServletName))
(forward request response)))
(doWebSocketConnect [request response]
(println "doWebSocketConnect")
(reify WebSocket$OnTextMessage
(onOpen [this connection] (open this connection))
(onClose [this code message] (close this code message))
(onMessage [this data] (message this data))))))
(defn websocket-server
[port & {:keys [open close message ws-path web]
:or {open (fn [_ conn]
(println "New websocket client:" conn))
close (fn [_ code reason]
(println "Websocket client closed:" code reason))
message (fn [_ data]
(println "Websocket message:" data))
ws-path "/websocket"}}]
(let [http-servlet (doto (ServletHolder. (DefaultServlet.))
(.setInitParameter "dirAllowed" "true")
(.setInitParameter "resourceBase" web))
ws-servlet (ServletHolder.
(make-websocket-servlet open close message))
context (doto (ServletContextHandler.)
(.setContextPath "/")
(.addServlet ws-servlet ws-path))
connector (doto (BlockingChannelConnector.)
(.setPort port)
(.setMaxIdleTime Integer/MAX_VALUE))
server (doto (Server.)
(.setHandler context)
(.addConnector connector))]
(when web (.addServlet context http-servlet "/"))
server))
;; Websockify
(defonce settings (atom {}))
;; WebSocket client to TCP target mappings
(defonce clients (atom {}))
(defonce targets (atom {}))
(defn target-open [ctx e]
(println "Connected to target")
#_(println "channelConnected:" e))
(defn target-close [ctx e]
#_(println "channelDisconnected:" e)
(println "Target closed")
(when-let [channel (get @targets (.getChannel ctx))]
(.disconnect channel)))
(defn target-message [ctx e]
(let [channel (.getChannel ctx)
client (get @targets channel)
msg (.getMessage e)
len (.readableBytes msg)
b64 (Base64/encode msg false)
blen (.readableBytes b64)]
#_(println "received" len "bytes from target")
#_(println "target receive:" (.toString msg 0 len CharsetUtil/UTF_8))
#_(println "sending to client:" (.toString b64 0 blen CharsetUtil/UTF_8))
(.sendMessage client (.toString b64 0 blen CharsetUtil/UTF_8))))
(defn client-open [this connection]
#_(println "Got WebSocket connection:" connection)
(println "New client")
(let [target (netty-client
(:target-host @settings)
(:target-port @settings)
target-open target-close target-message)]
(swap! clients assoc this {:client connection
:target target})
(swap! targets assoc target connection)))
(defn client-close [this code message]
(println "WebSocket connection closed")
(when-let [target (:target (get @clients this))]
(println "Closing target")
(.close target)
(println "Target closed")
(swap! targets dissoc target))
(swap! clients dissoc this))
(defn client-message [this data]
#_(println "WebSocket onMessage:" data)
(let [target (:target (get @clients this))
cbuf (ChannelBuffers/copiedBuffer data CharsetUtil/UTF_8)
decbuf (Base64/decode cbuf)
rlen (.readableBytes decbuf)]
#_(println "Sending" rlen "bytes to target")
#_(println "Sending to target:" (.toString decbuf 0 rlen CharsetUtil/UTF_8))
(.write target decbuf)))
(defn start-websockify
[& {:keys [listen-port target-host target-port web]
:or {listen-port 6080
target-host "localhost"
target-port 5900
}}]
(reset! clients {})
(reset! targets {})
(reset! settings {:target-host target-host
:target-port target-port})
(let [server (websocket-server listen-port
:web web
:ws-path "/websockify"
:open client-open
:close client-close
:message client-message)]
(.start server)
(if web
(println "Serving web requests from:" web)
(println "Not serving web requests"))
(defn stop-websockify []
(doseq [client (vals @clients)]
(.disconnect (:client client))
(.close (:target client)))
(.stop server)
(reset! clients {})
(reset! targets {})
nil)))
(defn -main [& args]
(let [[options args banner]
(cli/cli
args
["-v" "--[no-]verbose" "Verbose output"]
["--web" "Run webserver with root at given location"]
["-h" "--help" "Show help" :default false :flag true]
)]
(when (or (:help options)
(not= 2 (count args)))
(println banner)
(System/exit 0))
(println options)
(println args)
(let [target (second args)
[target-host target-port] (string/split target #":")]
(start-websockify :listen-port (Integer/parseInt (first args))
:target-host target-host
:target-port (Integer/parseInt target-port)
:web (:web options))))
nil)