diff --git a/etc/emqx.conf b/etc/emqx.conf index 7454f32f4..b559f28a3 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -202,9 +202,11 @@ mqtt.max_packet_size = 1MB mqtt.max_clientid_len = 65535 ## Maximum topic levels allowed. 0 means no limit. +## Depth so big may lead to subscribing performance issues. ## -## Value: Number -mqtt.max_topic_levels = 0 +## Value: Number [0-65535] +## Default: 128 +mqtt.max_topic_levels = 128 ## Maximum QoS allowed. ## @@ -242,7 +244,7 @@ mqtt.ignore_loop_deliver = false mqtt.strict_mode = false ## Specify the response information returned to the client -## +## ## Value: String ## mqtt.response_information = example diff --git a/etc/zones.conf b/etc/zones.conf index a290eaa4d..31845e1e0 100644 --- a/etc/zones.conf +++ b/etc/zones.conf @@ -57,8 +57,9 @@ zone.external.force_shutdown_policy = 10000|32MB ## zone.external.max_clientid_len = 1024 ## Maximum topic levels allowed. 0 means no limit. +## Depth so big may lead to subscribing performance issues. ## -## Value: Number +## Value: Number [0-65535] ## zone.external.max_topic_levels = 7 ## Maximum QoS allowed. @@ -250,7 +251,7 @@ zone.external.ignore_loop_deliver = false zone.external.strict_mode = false ## Specify the response information returned to the client -## +## ## Value: String ## zone.external.response_information = example @@ -341,7 +342,7 @@ zone.internal.ignore_loop_deliver = false zone.internal.strict_mode = false ## Specify the response information returned to the client -## +## ## Value: String ## zone.internal.response_information = example diff --git a/priv/emqx.schema b/priv/emqx.schema index 4bfa44c6a..0ff956869 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -757,7 +757,7 @@ end}. %% @doc Set the Maximum topic levels. {mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [ - {default, 0}, + {default, 128}, {datatype, integer} ]}. diff --git a/rebar.config b/rebar.config index fe785724a..ee58d3dac 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.10"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 2c0d887f8..d37dc20c1 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -70,7 +70,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -89,7 +90,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -108,7 +110,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -120,13 +123,22 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_cm, brutal_purge, soft_purge, []} + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} + ]}, + {<<"4.2.9">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -199,7 +211,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -218,7 +231,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -237,7 +251,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -249,13 +264,22 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_cm, brutal_purge, soft_purge, []} + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} + ]}, + {<<"4.2.9">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ffec9055e..4054c27c4 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -589,6 +589,11 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet|Packets], NState) catch + error:proxy_protocol_config_disabled -> + ?LOG(error, + "~nMalformed packet, " + "please check proxy_protocol config for specific listeners and zones~n"), + {[{frame_error, proxy_protocol_config_disabled} | Packets], State}; error:Reason:Stk -> ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", [Reason, Stk, Data]), @@ -812,4 +817,3 @@ stop(Reason, Reply, State) -> set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 7caa92ea8..f0e2f6e4e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -69,6 +69,13 @@ -define(MULTIPLIER_MAX, 16#200000). +%% proxy_protocol v1 header human readable +-define(PPV1_PROXY, "PROXY "). +-define(PPV1_PROXY_UNKNOWN, "PROXY UNKNOWN"). +%% proxy_protocol v2 header signature: +%% 16#0D,16#0A, 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A +-define(PPV2_HEADER_SIG, "\r\n\r\n\0\r\nQUIT\n"). + -dialyzer({no_match, [serialize_utf8_string/2]}). %%-------------------------------------------------------------------- @@ -98,6 +105,13 @@ parse(Bin) -> -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> {more, {none, Options}}; +parse(<>, {none, _Options}) + when IPVer =:= <<"TCP4 ">> orelse IPVer =:= <<"TCP6 ">> -> + error(proxy_protocol_config_disabled); +parse(<>, {none, _Options}) -> + error(proxy_protocol_config_disabled); +parse(<>, {none, _Options}) -> + error(proxy_protocol_config_disabled); parse(<>, {none, Options = #{strict_mode := StrictMode}}) -> %% Validate header if strict mode. diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index 447e04fea..8acd50d32 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -23,6 +23,7 @@ , init/4 %% XXX: Compatible with before 4.2 version , info/1 , check/2 + , update_overall_limiter/4 ]). -record(limiter, { @@ -152,3 +153,15 @@ is_message_limiter(conn_messages_in) -> true; is_message_limiter(conn_messages_routing) -> true; is_message_limiter(overall_messages_routing) -> true; is_message_limiter(_) -> false. + +update_overall_limiter(Zone, Name, Capacity, Interval) -> + case is_overall_limiter(Name) of + false -> false; + _ -> + try + esockd_limiter:update({Zone, Name}, Capacity, Interval), + true + catch _:_:_ -> + false + end + end. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ac975850e..ff7dccd32 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -63,7 +63,7 @@ %% Simulate the active_n opt active_n :: pos_integer(), %% MQTT Piggyback - mqtt_piggyback :: single | multiple, + mqtt_piggyback :: single | multiple, %% Limiter limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer @@ -486,6 +486,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, postpone({incoming, Packet}, NState)) catch + error:proxy_protocol_config_disabled -> + ?LOG(error, + "~nMalformed packet, " + "please check proxy_protocol config for specific listeners and zones~n"), + FrameError = {frame_error, proxy_protocol_config_disabled}, + postpone({incoming, FrameError} ,State); error:Reason:Stk -> ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p", [Reason, Stk, Data]), @@ -544,7 +550,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT postpone({check_gc, Stats}, State); false -> State end, - + {case MQTTPiggyback of single -> [{binary, IoData}]; multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData) @@ -689,4 +695,3 @@ trigger(Event) -> erlang:send(self(), Event). set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). - diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 613da40ac..12c024775 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -82,7 +82,7 @@ t_chan_caps(_) -> #{max_clientid_len := 65535, max_qos_allowed := 2, max_topic_alias := 65535, - max_topic_levels := 0, + max_topic_levels := 128, retain_available := true, shared_subscription := true, subscription_identifiers := true, @@ -768,4 +768,3 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, {overall_messages_routing, {10, 1}}]). -