|
447 | 447 | (pipeline-transform p)
|
448 | 448 | p)
|
449 | 449 |
|
450 |
| -(let [stream-frame->http-object-codec (delay (Http2StreamFrameToHttpObjectCodec. false))] |
451 |
| - (defn- h2-stream-chan-initializer |
452 |
| - "The multiplex handler creates a channel per HTTP2 stream, this |
453 |
| - sets up each new stream channel" |
454 |
| - [response-stream proxy-options ssl? logger pipeline-transform handler] |
455 |
| - (println "h2-stream-chan-initializer called") (flush) |
456 |
| - (netty/pipeline-initializer |
457 |
| - (fn [^ChannelPipeline p] |
458 |
| - (log/trace "h2-stream-chan-initializer initChannel called") |
459 |
| - |
460 |
| - (.addLast p |
461 |
| - "stream-frame-to-http-object" |
462 |
| - ^Http2StreamFrameToHttpObjectCodec @stream-frame->http-object-codec) |
463 |
| - (.addLast p |
464 |
| - "handler" |
465 |
| - ^ChannelHandler handler) |
| 450 | +;; TODO: is the delay actually helping? It still creates a Delay and a new fn... |
| 451 | +(def ^:no-doc stream-frame->http-object-codec (delay (Http2StreamFrameToHttpObjectCodec. false))) |
466 | 452 |
|
467 |
| - (add-non-http-handlers |
468 |
| - p |
469 |
| - response-stream |
470 |
| - proxy-options |
471 |
| - ssl? |
472 |
| - logger |
473 |
| - pipeline-transform) |
474 |
| - (log/trace "added all stream-chan handlers") |
| 453 | +(defn- h2-stream-chan-initializer |
| 454 | + "The multiplex handler creates a channel per HTTP2 stream, this |
| 455 | + sets up each new stream channel" |
| 456 | + [response-stream proxy-options ssl? logger pipeline-transform handler] |
| 457 | + (log/trace "h2-stream-chan-initializer") |
475 | 458 |
|
476 |
| - (log/debug (str "Stream chan pipeline:" (prn-str p))))))) |
| 459 | + (netty/pipeline-initializer |
| 460 | + (fn [^ChannelPipeline p] |
| 461 | + (log/trace "h2-stream-chan-initializer initChannel called") |
| 462 | + |
| 463 | + ;; necessary for multipart support in HTTP/2 |
| 464 | + (.addLast p |
| 465 | + "stream-frame-to-http-object" |
| 466 | + ^ChannelHandler @stream-frame->http-object-codec) |
477 | 467 | (.addLast p
|
478 | 468 | "streamer"
|
479 | 469 | ^ChannelHandler (ChunkedWriteHandler.))
|
| 470 | + (.addLast p |
| 471 | + "handler" |
| 472 | + ^ChannelHandler handler) |
| 473 | + |
| 474 | + (add-non-http-handlers |
| 475 | + p |
| 476 | + response-stream |
| 477 | + proxy-options |
| 478 | + ssl? |
| 479 | + logger |
| 480 | + pipeline-transform) |
| 481 | + |
| 482 | + (log/trace "added all stream-chan handlers") |
| 483 | + (log/debug (str "Stream chan pipeline:" (prn-str p)))))) |
480 | 484 |
|
481 | 485 |
|
482 | 486 | (defn- setup-http-pipeline
|
|
684 | 688 | :response-buffer-size response-buffer-size
|
685 | 689 | :t0 t0})))))))
|
686 | 690 |
|
| 691 | +(defn- make-http1-req-preprocessor |
| 692 | + "Returns a fn that handles a Ring req map using the HTTP/1 objects. |
| 693 | +
|
| 694 | + Used for HTTP/1, and for HTTP/2 with multipart requests (Netty HTTP/2 |
| 695 | + code doesn't support multipart)." |
| 696 | + [{:keys [authority ch keep-alive?' non-tun-proxy? responses ssl?]}] |
| 697 | + (fn http1-req-preprocess [req] |
| 698 | + (try |
| 699 | + (let [out-ch (or (:ch req) ch) ; for HTTP/2 multiplex chans |
| 700 | + ^HttpRequest req' (http1/ring-request->netty-request |
| 701 | + (if non-tun-proxy? |
| 702 | + (assoc req :uri (req->proxy-url req)) |
| 703 | + req))] |
| 704 | + (when-not (.get (.headers req') "Host") |
| 705 | + (.set (.headers req') HttpHeaderNames/HOST authority)) |
| 706 | + (when-not (.get (.headers req') "Connection") |
| 707 | + (HttpUtil/setKeepAlive req' keep-alive?')) |
| 708 | + |
| 709 | + (let [body (:body req) |
| 710 | + parts (:multipart req) |
| 711 | + multipart? (some? parts) |
| 712 | + [req' body] (cond |
| 713 | + ;; RFC #7231 4.3.8. TRACE |
| 714 | + ;; A client MUST NOT send a message body... |
| 715 | + (= :trace (:request-method req)) |
| 716 | + (do |
| 717 | + (when (or (some? body) multipart?) |
| 718 | + (log/warn "TRACE request body was omitted")) |
| 719 | + [req' nil]) |
| 720 | + |
| 721 | + (not multipart?) |
| 722 | + [req' body] |
| 723 | + |
| 724 | + :else |
| 725 | + (multipart/encode-request req' parts))] |
| 726 | + |
| 727 | + (when-let [save-message (get req :aleph/save-request-message)] |
| 728 | + ;; debug purpose only |
| 729 | + ;; note, that req' is effectively mutable, so |
| 730 | + ;; it will "capture" all changes made during "send-message" |
| 731 | + ;; execution |
| 732 | + (reset! save-message req')) |
| 733 | + |
| 734 | + (when-let [save-body (get req :aleph/save-request-body)] |
| 735 | + ;; might be different in case we use :multipart |
| 736 | + (reset! save-body body)) |
| 737 | + |
| 738 | + (-> (netty/safe-execute out-ch |
| 739 | + (http1/send-message out-ch true ssl? req' body)) |
| 740 | + (d/catch' (fn [e] |
| 741 | + (log/error e "Error in http1-req-preprocess") |
| 742 | + (s/put! responses (d/error-deferred e)) |
| 743 | + (netty/close out-ch) |
| 744 | + (when-not (= ch out-ch) |
| 745 | + (netty/close ch))))))) |
| 746 | + |
| 747 | + ;; this will usually happen because of a malformed request |
| 748 | + (catch Throwable e |
| 749 | + (log/error e "Error in http1-req-preprocess") |
| 750 | + (s/put! responses (d/error-deferred e)) |
| 751 | + (netty/close ch))))) |
| 752 | + |
| 753 | +(defn- make-http2-req-preprocessor |
| 754 | + "Returns a fn that handles a Ring req map using the HTTP/2 objects. |
| 755 | +
|
| 756 | + Used for HTTP/2, but falls back to HTTP1 objects for multipart requests |
| 757 | + (Netty HTTP/2 code doesn't support multipart)." |
| 758 | + [{:keys [authority ch handler logger pipeline-transform proxy-options responses ssl?] :as opts}] |
| 759 | + (let [h2-bootstrap (Http2StreamChannelBootstrap. ch) |
| 760 | + ;; TODO: is the delay actually helping? It still creates a Delay and a new fn... |
| 761 | + multipart-req-preprocess (delay (make-http1-req-preprocessor opts))] |
| 762 | + |
| 763 | + ;; when you create an HTTP2 outbound stream, you have to supply it with a |
| 764 | + ;; handler for the response |
| 765 | + (.handler h2-bootstrap |
| 766 | + (h2-stream-chan-initializer |
| 767 | + responses proxy-options ssl? logger pipeline-transform handler)) |
| 768 | + |
| 769 | + (fn http2-req-preprocess-init [req] |
| 770 | + (log/trace "http2-req-preprocess-init fired") |
| 771 | + |
| 772 | + (let [req' (cond-> req |
| 773 | + ;; http2 uses :authority, not host |
| 774 | + (nil? (:authority req)) |
| 775 | + (assoc :authority authority) |
| 776 | + |
| 777 | + ;; http2 cannot leave the path empty |
| 778 | + (nil? (:uri req)) |
| 779 | + (assoc :uri "/") |
| 780 | + |
| 781 | + (nil? (:scheme req)) |
| 782 | + (assoc :scheme (if ssl? :https :http)))] |
| 783 | + |
| 784 | + ;; create a new outbound HTTP2 stream/channel |
| 785 | + (-> (.open h2-bootstrap) |
| 786 | + netty/wrap-future |
| 787 | + (d/chain' (fn [^Http2StreamChannel chan] |
| 788 | + (if (multipart/is-multipart? req) |
| 789 | + (@multipart-req-preprocess (assoc req :ch chan)) ; switch to HTTP1 code for multipart |
| 790 | + (http2/req-preprocess chan req' responses)))) |
| 791 | + (d/catch' (fn [^Throwable t] |
| 792 | + (log/error t "Unable to open outbound HTTP/2 stream channel") |
| 793 | + (s/put! responses (d/error-deferred t)) |
| 794 | + (netty/close ch)))))))) |
687 | 795 |
|
688 | 796 | (defn- req-preprocesser
|
689 | 797 | "Returns a fn that preprocesses Ring reqs off the requests stream, and sends
|
|
694 | 802 | [{:keys [ch protocol responses ssl? authority] :as opts}]
|
695 | 803 | (cond
|
696 | 804 | (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
|
697 |
| - (let [{:keys [keep-alive?' non-tun-proxy?]} opts] |
698 |
| - (fn [req] |
699 |
| - (try |
700 |
| - (let [^HttpRequest req' (http/ring-request->netty-request |
701 |
| - (if non-tun-proxy? |
702 |
| - (assoc req :uri (req->proxy-url req)) |
703 |
| - req))] |
704 |
| - (when-not (.get (.headers req') "Host") |
705 |
| - (.set (.headers req') HttpHeaderNames/HOST authority)) |
706 |
| - (when-not (.get (.headers req') "Connection") |
707 |
| - (HttpUtil/setKeepAlive req' keep-alive?')) |
708 |
| - |
709 |
| - (let [body (:body req) |
710 |
| - parts (:multipart req) |
711 |
| - multipart? (some? parts) |
712 |
| - [req' body] (cond |
713 |
| - ;; RFC #7231 4.3.8. TRACE |
714 |
| - ;; A client MUST NOT send a message body... |
715 |
| - (= :trace (:request-method req)) |
716 |
| - (do |
717 |
| - (when (or (some? body) multipart?) |
718 |
| - (log/warn "TRACE request body was omitted")) |
719 |
| - [req' nil]) |
720 |
| - |
721 |
| - (not multipart?) |
722 |
| - [req' body] |
723 |
| - |
724 |
| - :else |
725 |
| - (multipart/encode-request req' parts))] |
726 |
| - |
727 |
| - (when-let [save-message (get req :aleph/save-request-message)] |
728 |
| - ;; debug purpose only |
729 |
| - ;; note, that req' is effectively mutable, so |
730 |
| - ;; it will "capture" all changes made during "send-message" |
731 |
| - ;; execution |
732 |
| - (reset! save-message req')) |
733 |
| - |
734 |
| - (when-let [save-body (get req :aleph/save-request-body)] |
735 |
| - ;; might be different in case we use :multipart |
736 |
| - (reset! save-body body)) |
737 |
| - |
738 |
| - (-> (netty/safe-execute ch |
739 |
| - (http/send-message ch true ssl? req' body)) |
740 |
| - (d/catch' (fn [e] |
741 |
| - (s/put! responses (d/error-deferred e)) |
742 |
| - (netty/close ch)))))) |
743 |
| - |
744 |
| - ;; this will usually happen because of a malformed request |
745 |
| - (catch Throwable e |
746 |
| - (s/put! responses (d/error-deferred e)) |
747 |
| - (netty/close ch))))) |
| 805 | + (make-http1-req-preprocessor opts) |
748 | 806 |
|
749 | 807 | (.equals ApplicationProtocolNames/HTTP_2 protocol)
|
750 |
| - (let [h2-bootstrap (Http2StreamChannelBootstrap. ch) |
751 |
| - {:keys [proxy-options logger pipeline-transform handler]} opts] |
752 |
| - |
753 |
| - ;; when you create an HTTP2 outbound stream, you have to supply it with a |
754 |
| - ;; handler for the response |
755 |
| - (.handler h2-bootstrap |
756 |
| - (h2-stream-chan-initializer |
757 |
| - responses proxy-options ssl? logger pipeline-transform handler)) |
758 |
| - |
759 |
| - (fn [req] |
760 |
| - (println "req-preprocesser h2 fired") |
761 |
| - |
762 |
| - (let [req' (cond-> req |
763 |
| - (nil? (:authority req)) |
764 |
| - (assoc :authority authority) |
765 |
| - |
766 |
| - (nil? (:uri req)) |
767 |
| - (assoc :uri "/") |
768 |
| - |
769 |
| - (nil? (:scheme req)) |
770 |
| - (assoc :scheme (if ssl? :https :http)))] |
771 |
| - (-> (.open h2-bootstrap) |
772 |
| - netty/wrap-future |
773 |
| - (d/chain' (fn [^Http2StreamChannel chan] |
774 |
| - (println "Got outbound h2 stream.") |
775 |
| - |
776 |
| - (-> chan |
777 |
| - .pipeline |
778 |
| - (.addLast "debug" |
779 |
| - (netty/channel-inbound-handler |
780 |
| - :channel-read ([_ ctx msg] |
781 |
| - (println "received msg of class" (class msg)) |
782 |
| - (println "msg:" msg))))) |
783 |
| - (http2/req-preprocess chan req' responses))) |
784 |
| - (d/catch' (fn [^Throwable t] |
785 |
| - (log/error t "Unable to open outbound HTTP/2 stream channel") |
786 |
| - (println "Unable to open outbound HTTP/2 stream channel") |
787 |
| - (.printStackTrace t) |
788 |
| - (s/put! responses (d/error-deferred t)) |
789 |
| - (netty/close ch))))))) |
| 808 | + (make-http2-req-preprocessor opts) |
790 | 809 |
|
791 | 810 | :else
|
792 | 811 | (do
|
793 | 812 | (let [msg (str "Unknown protocol: " protocol)
|
794 |
| - e (SSLHandshakeException. msg)] |
| 813 | + e (IllegalStateException. msg)] |
795 | 814 | (log/error e msg)
|
796 |
| - (println msg protocol) |
797 | 815 | (netty/close ch)
|
798 | 816 | (s/put! responses (d/error-deferred e))))))
|
799 | 817 |
|
|
0 commit comments