diff --git a/etc/zones.conf b/etc/zones.conf index 47d3e2232..31845e1e0 100644 --- a/etc/zones.conf +++ b/etc/zones.conf @@ -60,8 +60,7 @@ zone.external.force_shutdown_policy = 10000|32MB ## Depth so big may lead to subscribing performance issues. ## ## Value: Number [0-65535] -## Default 7 -zone.external.max_topic_levels = 7 +## zone.external.max_topic_levels = 7 ## Maximum QoS allowed. ## diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 74f3135e0..30089180f 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -126,9 +126,13 @@ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_zone, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} @@ -258,9 +262,13 @@ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_zone, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ffec9055e..4054c27c4 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -589,6 +589,11 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet|Packets], NState) catch + error:proxy_protocol_config_disabled -> + ?LOG(error, + "~nMalformed packet, " + "please check proxy_protocol config for specific listeners and zones~n"), + {[{frame_error, proxy_protocol_config_disabled} | Packets], State}; error:Reason:Stk -> ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", [Reason, Stk, Data]), @@ -812,4 +817,3 @@ stop(Reason, Reply, State) -> set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 7caa92ea8..f0e2f6e4e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -69,6 +69,13 @@ -define(MULTIPLIER_MAX, 16#200000). +%% proxy_protocol v1 header human readable +-define(PPV1_PROXY, "PROXY "). +-define(PPV1_PROXY_UNKNOWN, "PROXY UNKNOWN"). +%% proxy_protocol v2 header signature: +%% 16#0D,16#0A, 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A +-define(PPV2_HEADER_SIG, "\r\n\r\n\0\r\nQUIT\n"). + -dialyzer({no_match, [serialize_utf8_string/2]}). %%-------------------------------------------------------------------- @@ -98,6 +105,13 @@ parse(Bin) -> -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> {more, {none, Options}}; +parse(<>, {none, _Options}) + when IPVer =:= <<"TCP4 ">> orelse IPVer =:= <<"TCP6 ">> -> + error(proxy_protocol_config_disabled); +parse(<>, {none, _Options}) -> + error(proxy_protocol_config_disabled); +parse(<>, {none, _Options}) -> + error(proxy_protocol_config_disabled); parse(<>, {none, Options = #{strict_mode := StrictMode}}) -> %% Validate header if strict mode. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ac975850e..ff7dccd32 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -63,7 +63,7 @@ %% Simulate the active_n opt active_n :: pos_integer(), %% MQTT Piggyback - mqtt_piggyback :: single | multiple, + mqtt_piggyback :: single | multiple, %% Limiter limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer @@ -486,6 +486,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, postpone({incoming, Packet}, NState)) catch + error:proxy_protocol_config_disabled -> + ?LOG(error, + "~nMalformed packet, " + "please check proxy_protocol config for specific listeners and zones~n"), + FrameError = {frame_error, proxy_protocol_config_disabled}, + postpone({incoming, FrameError} ,State); error:Reason:Stk -> ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p", [Reason, Stk, Data]), @@ -544,7 +550,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT postpone({check_gc, Stats}, State); false -> State end, - + {case MQTTPiggyback of single -> [{binary, IoData}]; multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData) @@ -689,4 +695,3 @@ trigger(Event) -> erlang:send(self(), Event). set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). -