diff --git a/etc/emqx.conf b/etc/emqx.conf index 81b5b6460..11997a750 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1608,6 +1608,18 @@ listener.ws.external.access.1 = allow all ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## listener.ws.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 +## Specify which HTTP header for real source IP if the EMQ X cluster is +## deployed behind NGINX or HAProxy. +## +## Default: X-Forwarded-For +## listener.ws.external.proxy_address_header = X-Forwarded-For + +## Specify which HTTP header for real source port if the EMQ X cluster is +## deployed behind NGINX or HAProxy. +## +## Default: X-Forwarded-Port +## listener.ws.external.proxy_address_header = X-Forwarded-Port + ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## @@ -1851,6 +1863,18 @@ listener.wss.external.access.1 = allow all ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## listener.wss.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 +## Specify which HTTP header for real source IP if the EMQ X cluster is +## deployed behind NGINX or HAProxy. +## +## Default: X-Forwarded-For +## listener.wss.external.proxy_address_header = X-Forwarded-For + +## Specify which HTTP header for real source port if the EMQ X cluster is +## deployed behind NGINX or HAProxy. +## +## Default: X-Forwarded-Port +## listener.wss.external.proxy_port_header = X-Forwarded-Port + ## Enable the Proxy Protocol V1/2 support. ## ## See: listener.tcp.$name.proxy_protocol diff --git a/priv/emqx.schema b/priv/emqx.schema index c9dc938cf..65902328e 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1531,6 +1531,16 @@ end}. {datatype, string} ]}. +{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [ + {default, "X-Forwarded-For"}, + {datatype, string} +]}. + +{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [ + {default, "X-Forwarded-Port"}, + {datatype, string} +]}. + {mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. @@ -1715,6 +1725,16 @@ end}. {datatype, string} ]}. +{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [ + {default, "X-Forwarded-For"}, + {datatype, string} +]}. + +{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [ + {default, "X-Forwarded-Port"}, + {datatype, string} +]}. + {mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. @@ -1967,6 +1987,8 @@ end}. {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, {rate_limit, RateLimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))}, {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, + {proxy_address_header, list_to_binary(string:lowercase(cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, "")))}, + {proxy_port_header, list_to_binary(string:lowercase(cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, "")))}, {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, {fail_if_no_subprotocol, cuttlefish:conf_get(Prefix ++ ".fail_if_no_subprotocol", Conf, undefined)}, {supported_subprotocols, string:tokens(cuttlefish:conf_get(Prefix ++ ".supported_subprotocols", Conf, ""), ", ")}, diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 01f7b5e2b..647e16727 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -253,7 +253,7 @@ websocket_init([Req, Opts]) -> #{src_address := SrcAddr, src_port := SrcPort} -> {SrcAddr, SrcPort}; _ -> - cowboy_req:peer(Req) + get_peer(Req, Opts) end, Sockname = cowboy_req:sock(Req), Peercert = cowboy_req:cert(Req), @@ -725,6 +725,34 @@ classify([Event|More], Packets, Cmds, Events) -> trigger(Event) -> erlang:send(self(), Event). +get_peer(Req, Opts) -> + {PeerAddr, PeerPort} = cowboy_req:peer(Req), + AddrHeader = cowboy_req:header(proplists:get_value(proxy_address_header, Opts), Req, <<>>), + ClientAddr = case string:tokens(binary_to_list(AddrHeader), ", ") of + [] -> + undefined; + AddrList -> + hd(AddrList) + end, + Addr = case inet:parse_address(ClientAddr) of + {ok, A} -> + A; + _ -> + PeerAddr + end, + PortHeader = cowboy_req:header(proplists:get_value(proxy_port_header, Opts), Req, <<>>), + ClientPort = case string:tokens(binary_to_list(PortHeader), ", ") of + [] -> + undefined; + PortList -> + hd(PortList) + end, + try + {Addr, list_to_integer(ClientPort)} + catch + _:_ -> {Addr, PeerPort} + end. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 04018b141..15f0656a5 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -50,6 +50,7 @@ init_per_testcase(TestCase, Config) when -> %% Mock cowboy_req ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), + ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end), ok = meck:expect(cowboy_req, peer, fun(_) -> {{127,0,0,1}, 3456} end), ok = meck:expect(cowboy_req, sock, fun(_) -> {{127,0,0,1}, 18083} end), ok = meck:expect(cowboy_req, cert, fun(_) -> undefined end), @@ -123,6 +124,22 @@ t_info(_) -> sockstate := running } = SockInfo. +t_header(_) -> + ok = meck:expect(cowboy_req, header, fun(<<"x-forwarded-for">>, _, _) -> <<"100.100.100.100, 99.99.99.99">>; + (<<"x-forwarded-port">>, _, _) -> <<"1000">> end), + {ok, St, _} = ?ws_conn:websocket_init([req, [{zone, external}, + {proxy_address_header, <<"x-forwarded-for">>}, + {proxy_port_header, <<"x-forwarded-port">>}]]), + WsPid = spawn(fun() -> + receive {call, From, info} -> + gen_server:reply(From, ?ws_conn:info(St)) + end end), + #{sockinfo := SockInfo} = ?ws_conn:call(WsPid, info), + #{socktype := ws, + peername := {{100,100,100,100}, 1000}, + sockstate := running + } = SockInfo. + t_info_limiter(_) -> St = st(#{limiter => emqx_limiter:init(external, [])}), ?assertEqual(undefined, ?ws_conn:info(limiter, St)).