|
378 | 378 | (.setConnectTimeoutMillis ^ProxyHandler handler connection-timeout))
|
379 | 379 | handler))
|
380 | 380 |
|
381 |
| -(defn pending-proxy-connection-handler [early-response-d] |
| 381 | +(defn pending-proxy-connection-handler [proxy-connected] |
382 | 382 | (netty/channel-inbound-handler
|
383 | 383 | :exception-caught
|
384 | 384 | ([_ ctx cause]
|
|
388 | 388 | headers (when (instance? HttpProxyHandler$HttpProxyConnectException cause)
|
389 | 389 | (.headers ^HttpProxyHandler$HttpProxyConnectException cause))
|
390 | 390 | response (cond
|
391 |
| - (= "timeout" message) |
| 391 | + (.contains message "timeout") |
392 | 392 | (ProxyConnectionTimeoutException. ^Throwable cause)
|
393 | 393 |
|
394 | 394 | (some? headers)
|
395 |
| - (ex-info message {:headers (http1/headers->map headers)}) |
| 395 | + (ex-info message {:headers (http1/headers->map headers)} cause) |
396 | 396 |
|
397 | 397 | :else
|
398 | 398 | cause)]
|
399 |
| - (d/error! early-response-d response) |
| 399 | + (d/error! proxy-connected response) |
400 | 400 | ;; client handler should take care of the rest
|
401 | 401 | (netty/close ctx))))
|
402 | 402 |
|
403 | 403 | :user-event-triggered
|
404 | 404 | ([this ctx evt]
|
405 | 405 | (when (instance? ProxyConnectionEvent evt)
|
406 |
| - (.remove (.pipeline ctx) this)) |
| 406 | + (.remove (.pipeline ctx) this) |
| 407 | + (d/success! proxy-connected true)) |
407 | 408 | (.fireUserEventTriggered ^ChannelHandlerContext ctx evt))))
|
408 | 409 |
|
409 | 410 | (defn- add-proxy-handlers
|
410 | 411 | "Inserts handlers for proxying through a server"
|
411 |
| - [^ChannelPipeline p early-response-d proxy-options ssl?] |
412 |
| - (when (some? proxy-options) |
| 412 | + [^ChannelPipeline p proxy-connected proxy-options ssl?] |
| 413 | + (if (some? proxy-options) |
413 | 414 | (let [proxy (proxy-handler (assoc proxy-options :ssl? ssl?))]
|
414 | 415 | (.addFirst p "proxy" ^ChannelHandler proxy)
|
415 | 416 | ;; well, we need to wait before the proxy responded with
|
|
420 | 421 | "proxy"
|
421 | 422 | "pending-proxy-connection"
|
422 | 423 | ^ChannelHandler
|
423 |
| - (pending-proxy-connection-handler early-response-d))))) |
| 424 | + (pending-proxy-connection-handler proxy-connected)))) |
| 425 | + (d/success! proxy-connected false)) |
424 | 426 | p)
|
425 | 427 |
|
426 | 428 | (defn- setup-http1-pipeline
|
|
472 | 474 |
|
473 | 475 | Can't use an ApnHandler/ApplicationProtocolNegotiationHandler here,
|
474 | 476 | because it's tricky to run Manifold code on Netty threads."
|
475 |
| - [{:keys [ssl? remote-address ssl-context ssl-endpoint-id-alg proxy-options early-response-d]}] |
| 477 | + [{:keys [ssl? remote-address ssl-context ssl-endpoint-id-alg proxy-options proxy-connected]}] |
476 | 478 | (fn pipeline-builder*
|
477 | 479 | [^ChannelPipeline pipeline]
|
478 |
| - (add-proxy-handlers pipeline early-response-d proxy-options ssl?) |
| 480 | + (add-proxy-handlers pipeline proxy-connected proxy-options ssl?) |
479 | 481 | (when ssl?
|
480 | 482 | (do
|
481 | 483 | (.addLast pipeline
|
|
803 | 805 | (some? log-activity) (netty/activity-logger "aleph-client" log-activity)
|
804 | 806 | :else nil)
|
805 | 807 |
|
806 |
| - early-response-d (d/deferred) |
| 808 | + proxy-connected (d/deferred) |
807 | 809 | pipeline-builder (make-pipeline-builder
|
808 | 810 | (assoc opts
|
809 |
| - :early-response-d early-response-d |
| 811 | + :proxy-connected proxy-connected |
810 | 812 | :ssl? ssl?
|
811 | 813 | :ssl-context ssl-context
|
812 | 814 | :ssl-endpoint-id-alg ssl-endpoint-id-alg
|
|
827 | 829 |
|
828 | 830 | (attach-on-close-handler ch-d on-closed)
|
829 | 831 |
|
830 |
| - (d/alt' |
831 |
| - early-response-d |
832 |
| - (d/chain' ch-d |
833 |
| - (fn setup-client |
834 |
| - [^Channel ch] |
835 |
| - (log/debug "Channel:" ch) |
836 |
| - |
837 |
| - ;; We know the SSL handshake must be complete because create-client wraps the |
838 |
| - ;; future with maybe-ssl-handshake-future, so we can get the negotiated |
839 |
| - ;; protocol, falling back to HTTP/1.1 by default. |
840 |
| - (let [pipeline (.pipeline ch) |
841 |
| - protocol (cond |
842 |
| - ssl? |
843 |
| - (or (-> pipeline |
844 |
| - ^SslHandler (.get ^Class SslHandler) |
845 |
| - (.applicationProtocol)) |
846 |
| - ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed |
847 |
| - |
848 |
| - force-h2c? |
849 |
| - (do |
850 |
| - (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") |
851 |
| - ApplicationProtocolNames/HTTP_2) |
852 |
| - |
853 |
| - :else |
854 |
| - ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested |
855 |
| - setup-opts (assoc opts |
856 |
| - :authority authority |
857 |
| - :ch ch |
858 |
| - :server? false |
859 |
| - :keep-alive? keep-alive? |
860 |
| - :keep-alive?' keep-alive?' |
861 |
| - :logger logger |
862 |
| - :non-tun-proxy? non-tun-proxy? |
863 |
| - :pipeline pipeline |
864 |
| - :pipeline-transform pipeline-transform |
865 |
| - :raw-stream? raw-stream? |
866 |
| - :remote-address remote-address |
867 |
| - :response-buffer-size response-buffer-size |
868 |
| - :ssl-context ssl-context |
869 |
| - :ssl? ssl?)] |
870 |
| - |
871 |
| - (log/debug (str "Using HTTP protocol: " protocol) |
872 |
| - {:authority authority |
873 |
| - :ssl? ssl? |
874 |
| - :force-h2c? force-h2c?}) |
875 |
| - |
876 |
| - ;; can't use ApnHandler, because we need to coordinate with Manifold code |
877 |
| - (let [http-req-handler |
878 |
| - (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) |
879 |
| - (setup-http1-client setup-opts) |
880 |
| - |
881 |
| - (.equals ApplicationProtocolNames/HTTP_2 protocol) |
882 |
| - (do |
883 |
| - (http2/setup-conn-pipeline setup-opts) |
884 |
| - (http2-req-handler setup-opts)) |
885 |
| - |
886 |
| - :else |
887 |
| - (do |
888 |
| - (let [msg (str "Unknown protocol: " protocol) |
889 |
| - e (IllegalStateException. msg)] |
890 |
| - (log/error e msg) |
891 |
| - (netty/close ch) |
892 |
| - (throw e))))] |
893 |
| - |
894 |
| - ;; Both Netty and Aleph are set up, unpause the pipeline |
895 |
| - (when (.get pipeline "pause-handler") |
896 |
| - (log/debug "Unpausing pipeline") |
897 |
| - (.remove pipeline "pause-handler")) |
898 |
| - |
899 |
| - (fn http-req-fn |
900 |
| - [req] |
901 |
| - (log/trace "http-req-fn fired") |
902 |
| - (log/debug "client request:" (pr-str req)) |
903 |
| - |
904 |
| - ;; If :aleph/close is set in the req, closes the channel and |
905 |
| - ;; returns a deferred containing the result. |
906 |
| - (if (or (contains? req :aleph/close) |
907 |
| - (contains? req ::close)) |
908 |
| - (-> ch (netty/close) (netty/wrap-future)) |
909 |
| - |
910 |
| - (let [t0 (System/nanoTime) |
911 |
| - ;; I suspect the below is an error for http1 |
912 |
| - ;; since the shared handler might not match. |
913 |
| - ;; Should work for HTTP2, though |
914 |
| - raw-stream? (get req :raw-stream? raw-stream?)] |
915 |
| - |
916 |
| - (if (or (not (.isActive ch)) |
917 |
| - (not (.isOpen ch))) |
918 |
| - |
919 |
| - (d/error-deferred |
920 |
| - (ex-info "Channel is inactive/closed." |
921 |
| - {:req req |
922 |
| - :ch ch |
923 |
| - :open? (.isOpen ch) |
924 |
| - :active? (.isActive ch)})) |
925 |
| - |
926 |
| - (-> (http-req-handler req) |
927 |
| - (d/chain' (rsp-handler |
928 |
| - {:ch ch |
929 |
| - :keep-alive? keep-alive? ; why not keep-alive?' |
930 |
| - :raw-stream? raw-stream? |
931 |
| - :req req |
932 |
| - :response-buffer-size response-buffer-size |
933 |
| - :t0 t0})))))))))))))) |
| 832 | + (d/chain' |
| 833 | + proxy-connected |
| 834 | + (fn [_] |
| 835 | + (d/chain' ch-d |
| 836 | + (fn setup-client |
| 837 | + [^Channel ch] |
| 838 | + (log/debug "Channel:" ch) |
| 839 | + |
| 840 | + ;; We know the SSL handshake must be complete because create-client wraps the |
| 841 | + ;; future with maybe-ssl-handshake-future, so we can get the negotiated |
| 842 | + ;; protocol, falling back to HTTP/1.1 by default. |
| 843 | + (let [pipeline (.pipeline ch) |
| 844 | + protocol (cond |
| 845 | + ssl? |
| 846 | + (or (-> pipeline |
| 847 | + ^SslHandler (.get ^Class SslHandler) |
| 848 | + (.applicationProtocol)) |
| 849 | + ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed |
| 850 | + |
| 851 | + force-h2c? |
| 852 | + (do |
| 853 | + (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") |
| 854 | + ApplicationProtocolNames/HTTP_2) |
| 855 | + |
| 856 | + :else |
| 857 | + ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested |
| 858 | + setup-opts (assoc opts |
| 859 | + :authority authority |
| 860 | + :ch ch |
| 861 | + :server? false |
| 862 | + :keep-alive? keep-alive? |
| 863 | + :keep-alive?' keep-alive?' |
| 864 | + :logger logger |
| 865 | + :non-tun-proxy? non-tun-proxy? |
| 866 | + :pipeline pipeline |
| 867 | + :pipeline-transform pipeline-transform |
| 868 | + :raw-stream? raw-stream? |
| 869 | + :remote-address remote-address |
| 870 | + :response-buffer-size response-buffer-size |
| 871 | + :ssl-context ssl-context |
| 872 | + :ssl? ssl?)] |
| 873 | + |
| 874 | + (log/debug (str "Using HTTP protocol: " protocol) |
| 875 | + {:authority authority |
| 876 | + :ssl? ssl? |
| 877 | + :force-h2c? force-h2c?}) |
| 878 | + |
| 879 | + ;; can't use ApnHandler, because we need to coordinate with Manifold code |
| 880 | + (let [http-req-handler |
| 881 | + (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) |
| 882 | + (setup-http1-client setup-opts) |
| 883 | + |
| 884 | + (.equals ApplicationProtocolNames/HTTP_2 protocol) |
| 885 | + (do |
| 886 | + (http2/setup-conn-pipeline setup-opts) |
| 887 | + (http2-req-handler setup-opts)) |
| 888 | + |
| 889 | + :else |
| 890 | + (do |
| 891 | + (let [msg (str "Unknown protocol: " protocol) |
| 892 | + e (IllegalStateException. msg)] |
| 893 | + (log/error e msg) |
| 894 | + (netty/close ch) |
| 895 | + (throw e))))] |
| 896 | + |
| 897 | + ;; Both Netty and Aleph are set up, unpause the pipeline |
| 898 | + (when (.get pipeline "pause-handler") |
| 899 | + (log/debug "Unpausing pipeline") |
| 900 | + (.remove pipeline "pause-handler")) |
| 901 | + |
| 902 | + (fn http-req-fn |
| 903 | + [req] |
| 904 | + (log/trace "http-req-fn fired") |
| 905 | + (log/debug "client request:" (pr-str req)) |
| 906 | + |
| 907 | + ;; If :aleph/close is set in the req, closes the channel and |
| 908 | + ;; returns a deferred containing the result. |
| 909 | + (if (or (contains? req :aleph/close) |
| 910 | + (contains? req ::close)) |
| 911 | + (-> ch (netty/close) (netty/wrap-future)) |
| 912 | + |
| 913 | + (let [t0 (System/nanoTime) |
| 914 | + ;; I suspect the below is an error for http1 |
| 915 | + ;; since the shared handler might not match. |
| 916 | + ;; Should work for HTTP2, though |
| 917 | + raw-stream? (get req :raw-stream? raw-stream?)] |
| 918 | + |
| 919 | + (if (or (not (.isActive ch)) |
| 920 | + (not (.isOpen ch))) |
| 921 | + |
| 922 | + (d/error-deferred |
| 923 | + (ex-info "Channel is inactive/closed." |
| 924 | + {:req req |
| 925 | + :ch ch |
| 926 | + :open? (.isOpen ch) |
| 927 | + :active? (.isActive ch)})) |
| 928 | + |
| 929 | + (-> (http-req-handler req) |
| 930 | + (d/chain' (rsp-handler |
| 931 | + {:ch ch |
| 932 | + :keep-alive? keep-alive? ; why not keep-alive?' |
| 933 | + :raw-stream? raw-stream? |
| 934 | + :req req |
| 935 | + :response-buffer-size response-buffer-size |
| 936 | + :t0 t0}))))))))))))))) |
934 | 937 |
|
935 | 938 |
|
936 | 939 |
|
|
0 commit comments