diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ffec9055e..4054c27c4 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -589,6 +589,11 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet|Packets], NState) catch + error:proxy_protocol_config_disabled -> + ?LOG(error, + "~nMalformed packet, " + "please check proxy_protocol config for specific listeners and zones~n"), + {[{frame_error, proxy_protocol_config_disabled} | Packets], State}; error:Reason:Stk -> ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", [Reason, Stk, Data]), @@ -812,4 +817,3 @@ stop(Reason, Reply, State) -> set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 7caa92ea8..f0e2f6e4e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -69,6 +69,13 @@ -define(MULTIPLIER_MAX, 16#200000). +%% proxy_protocol v1 header human readable +-define(PPV1_PROXY, "PROXY "). +-define(PPV1_PROXY_UNKNOWN, "PROXY UNKNOWN"). +%% proxy_protocol v2 header signature: +%% 16#0D,16#0A, 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A +-define(PPV2_HEADER_SIG, "\r\n\r\n\0\r\nQUIT\n"). + -dialyzer({no_match, [serialize_utf8_string/2]}). %%-------------------------------------------------------------------- @@ -98,6 +105,13 @@ parse(Bin) -> -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> {more, {none, Options}}; +parse(<>, {none, _Options}) + when IPVer =:= <<"TCP4 ">> orelse IPVer =:= <<"TCP6 ">> -> + error(proxy_protocol_config_disabled); +parse(<>, {none, _Options}) -> + error(proxy_protocol_config_disabled); +parse(<>, {none, _Options}) -> + error(proxy_protocol_config_disabled); parse(<>, {none, Options = #{strict_mode := StrictMode}}) -> %% Validate header if strict mode. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ac975850e..ff7dccd32 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -63,7 +63,7 @@ %% Simulate the active_n opt active_n :: pos_integer(), %% MQTT Piggyback - mqtt_piggyback :: single | multiple, + mqtt_piggyback :: single | multiple, %% Limiter limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer @@ -486,6 +486,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, postpone({incoming, Packet}, NState)) catch + error:proxy_protocol_config_disabled -> + ?LOG(error, + "~nMalformed packet, " + "please check proxy_protocol config for specific listeners and zones~n"), + FrameError = {frame_error, proxy_protocol_config_disabled}, + postpone({incoming, FrameError} ,State); error:Reason:Stk -> ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p", [Reason, Stk, Data]), @@ -544,7 +550,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT postpone({check_gc, Stats}, State); false -> State end, - + {case MQTTPiggyback of single -> [{binary, IoData}]; multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData) @@ -689,4 +695,3 @@ trigger(Event) -> erlang:send(self(), Event). set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). -