Merge pull request #1366 from HJianBo/ws_proxy
Websocket proxy configuration
This commit is contained in:
commit
a5e06308e5
|
@ -496,6 +496,10 @@ listener.ws.external.max_clients = 64
|
||||||
|
|
||||||
listener.ws.external.access.1 = allow all
|
listener.ws.external.access.1 = allow all
|
||||||
|
|
||||||
|
listener.ws.external.proxy_ipaddress_header = x-forwarded-for
|
||||||
|
|
||||||
|
listener.ws.external.proxy_port_header = x-remote-port
|
||||||
|
|
||||||
## TCP Options
|
## TCP Options
|
||||||
listener.ws.external.backlog = 1024
|
listener.ws.external.backlog = 1024
|
||||||
|
|
||||||
|
@ -518,6 +522,10 @@ listener.wss.external.max_clients = 64
|
||||||
|
|
||||||
listener.wss.external.access.1 = allow all
|
listener.wss.external.access.1 = allow all
|
||||||
|
|
||||||
|
listener.wss.external.proxy_ipaddress_header = x-forwarded-for
|
||||||
|
|
||||||
|
listener.wss.external.proxy_port_header = x-remote-port
|
||||||
|
|
||||||
## SSL Options
|
## SSL Options
|
||||||
listener.wss.external.handshake_timeout = 15s
|
listener.wss.external.handshake_timeout = 15s
|
||||||
|
|
||||||
|
|
|
@ -987,6 +987,16 @@ end}.
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.ws.$name.proxy_port_header", "emqttd.listeners", [
|
||||||
|
{datatype, string},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.ws.$name.proxy_ipaddress_header", "emqttd.listeners", [
|
||||||
|
{datatype, string},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [
|
{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
@ -1050,6 +1060,16 @@ end}.
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.wss.$name.proxy_port_header", "emqttd.listeners", [
|
||||||
|
{datatype, string},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.wss.$name.proxy_ipaddress_header", "emqttd.listeners", [
|
||||||
|
{datatype, string},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [
|
{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
@ -1127,6 +1147,13 @@ end}.
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
WsProxyOpts = fun(Prefix) when Prefix =:= "listener.ws.external" orelse
|
||||||
|
Prefix =:= "listener.wss.external" ->
|
||||||
|
Filter([{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
|
||||||
|
{proxy_ipaddress_header, cuttlefish:conf_get(Prefix ++ ".proxy_ipaddress_header", Conf, undefined)}]);
|
||||||
|
(_) -> []
|
||||||
|
end,
|
||||||
|
|
||||||
MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end,
|
MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end,
|
||||||
|
|
||||||
ConnOpts = fun(Prefix) ->
|
ConnOpts = fun(Prefix) ->
|
||||||
|
@ -1178,7 +1205,8 @@ end}.
|
||||||
undefined ->
|
undefined ->
|
||||||
[];
|
[];
|
||||||
ListenOn ->
|
ListenOn ->
|
||||||
[{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
|
[{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
|
||||||
|
{sockopts, TcpOpts(Prefix)} | LisOpts(Prefix) ++ WsProxyOpts(Prefix)]}]
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -1190,7 +1218,7 @@ end}.
|
||||||
ListenOn ->
|
ListenOn ->
|
||||||
[{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
|
[{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
|
||||||
{sockopts, TcpOpts(Prefix)},
|
{sockopts, TcpOpts(Prefix)},
|
||||||
{sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}]
|
{sslopts, SslOpts(Prefix)} | LisOpts(Prefix) ++ WsProxyOpts(Prefix)]}]
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
|
|
||||||
-include("emqttd_internal.hrl").
|
-include("emqttd_internal.hrl").
|
||||||
|
|
||||||
-import(proplists, [get_value/3]).
|
-import(proplists, [get_value/3, get_value/2]).
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
-export([start_link/4]).
|
-export([start_link/4]).
|
||||||
|
@ -93,7 +93,7 @@ init([Env, WsPid, Req, ReplyChannel]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
Conn = Req:get(connection),
|
Conn = Req:get(connection),
|
||||||
true = link(WsPid),
|
true = link(WsPid),
|
||||||
case Req:get(peername) of
|
case peername(Env, Req) of
|
||||||
{ok, Peername} ->
|
{ok, Peername} ->
|
||||||
Headers = mochiweb_headers:to_list(
|
Headers = mochiweb_headers:to_list(
|
||||||
mochiweb_request:get(headers, Req)),
|
mochiweb_request:get(headers, Req)),
|
||||||
|
@ -321,3 +321,32 @@ gc(State) ->
|
||||||
Cb = fun() -> emit_stats(State) end,
|
Cb = fun() -> emit_stats(State) end,
|
||||||
emqttd_gc:maybe_force_gc(#wsclient_state.force_gc_count, State, Cb).
|
emqttd_gc:maybe_force_gc(#wsclient_state.force_gc_count, State, Cb).
|
||||||
|
|
||||||
|
peername(Env, Req) ->
|
||||||
|
Conn = Req:get(connection),
|
||||||
|
case Conn:peername() of
|
||||||
|
{ok, Peername} ->
|
||||||
|
% return original address, if existed
|
||||||
|
case last_forwarded(get_value(Conn:type(), Env, []), Req) of
|
||||||
|
undefined -> {ok, Peername};
|
||||||
|
Forwarded -> {ok, Forwarded}
|
||||||
|
end;
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
last_forwarded([], _) -> undefined;
|
||||||
|
last_forwarded(Conf, Req) ->
|
||||||
|
HostHeader = get_value(proxy_ipaddress_header, Conf),
|
||||||
|
PortHeader = get_value(proxy_port_header, Conf),
|
||||||
|
case tune_host(Req:get_header_value(HostHeader)) of
|
||||||
|
undefined -> undefined;
|
||||||
|
Host -> {Host, tune_port(Req:get_header_value(PortHeader))}
|
||||||
|
end.
|
||||||
|
|
||||||
|
tune_host(undefined) -> undefined;
|
||||||
|
tune_host(Hosts) ->
|
||||||
|
{ok, Last} = inet:parse_address(string:strip(lists:last(string:tokens(Hosts, ",")))),
|
||||||
|
Last.
|
||||||
|
|
||||||
|
tune_port(undefined) -> undefined;
|
||||||
|
tune_port(Port) -> list_to_integer(Port).
|
||||||
|
|
||||||
|
|
|
@ -39,8 +39,25 @@ start_client(WsPid, Req, ReplyChannel) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])),
|
Env = lists:append(emqttd:env(client, []),
|
||||||
|
emqttd:env(protocol, []) ++ forwarded_header()),
|
||||||
{ok, {{simple_one_for_one, 0, 1},
|
{ok, {{simple_one_for_one, 0, 1},
|
||||||
[{ws_client, {emqttd_ws_client, start_link, [Env]},
|
[{ws_client, {emqttd_ws_client, start_link, [Env]},
|
||||||
temporary, 5000, worker, [emqttd_ws_client]}]}}.
|
temporary, 5000, worker, [emqttd_ws_client]}]}}.
|
||||||
|
|
||||||
|
forwarded_header() ->
|
||||||
|
Env = [{Proto, Opts} || {Proto, _, Opts} <- emqttd:env(listeners, []), Proto == ws orelse Proto == wss],
|
||||||
|
lists:foldl(fun({Proto, Opts}, Acc) ->
|
||||||
|
Proto1 = case Proto of
|
||||||
|
ws -> tcp;
|
||||||
|
wss -> ssl
|
||||||
|
end,
|
||||||
|
case {proplists:get_value(proxy_ipaddress_header, Opts),
|
||||||
|
proplists:get_value(proxy_port_header, Opts)} of
|
||||||
|
{undefined, _} -> Acc;
|
||||||
|
{AddrHeader, undefined} -> [{Proto1, [{proxy_ipaddress_header, AddrHeader}]} | Acc];
|
||||||
|
{AddrHeader, PortHeader} -> [{Proto1, [{proxy_ipaddress_header, AddrHeader},
|
||||||
|
{proxy_port_header, PortHeader}]} | Acc]
|
||||||
|
end
|
||||||
|
end, [], Env).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue