Merge pull request #6698 from emqx/dev/e4.2.10

Auto-pull-request-on-2022-01-11
This commit is contained in:
tigercl 2022-01-13 10:37:18 +08:00 committed by GitHub
commit 8cdfb531a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 86 additions and 24 deletions

View File

@ -202,9 +202,11 @@ mqtt.max_packet_size = 1MB
mqtt.max_clientid_len = 65535
## Maximum topic levels allowed. 0 means no limit.
## Depth so big may lead to subscribing performance issues.
##
## Value: Number
mqtt.max_topic_levels = 0
## Value: Number [0-65535]
## Default: 128
mqtt.max_topic_levels = 128
## Maximum QoS allowed.
##
@ -242,7 +244,7 @@ mqtt.ignore_loop_deliver = false
mqtt.strict_mode = false
## Specify the response information returned to the client
##
##
## Value: String
## mqtt.response_information = example

View File

@ -57,8 +57,9 @@ zone.external.force_shutdown_policy = 10000|32MB
## zone.external.max_clientid_len = 1024
## Maximum topic levels allowed. 0 means no limit.
## Depth so big may lead to subscribing performance issues.
##
## Value: Number
## Value: Number [0-65535]
## zone.external.max_topic_levels = 7
## Maximum QoS allowed.
@ -250,7 +251,7 @@ zone.external.ignore_loop_deliver = false
zone.external.strict_mode = false
## Specify the response information returned to the client
##
##
## Value: String
## zone.external.response_information = example
@ -341,7 +342,7 @@ zone.internal.ignore_loop_deliver = false
zone.internal.strict_mode = false
## Specify the response information returned to the client
##
##
## Value: String
## zone.internal.response_information = example

View File

@ -757,7 +757,7 @@ end}.
%% @doc Set the Maximum topic levels.
{mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [
{default, 0},
{default, 128},
{datatype, integer}
]}.

View File

@ -6,7 +6,7 @@
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.10"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}

View File

@ -70,7 +70,8 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -89,7 +90,8 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.5">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -108,7 +110,8 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -120,13 +123,22 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.8">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []}
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.9">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
@ -199,7 +211,8 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -218,7 +231,8 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.5">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -237,7 +251,8 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -249,13 +264,22 @@
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.8">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []}
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.9">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]

View File

@ -589,6 +589,11 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
NState = State#state{parse_state = NParseState},
parse_incoming(Rest, [Packet|Packets], NState)
catch
error:proxy_protocol_config_disabled ->
?LOG(error,
"~nMalformed packet, "
"please check proxy_protocol config for specific listeners and zones~n"),
{[{frame_error, proxy_protocol_config_disabled} | Packets], State};
error:Reason:Stk ->
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
[Reason, Stk, Data]),
@ -812,4 +817,3 @@ stop(Reason, Reply, State) ->
set_field(Name, Value, State) ->
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
setelement(Pos+1, State, Value).

View File

@ -69,6 +69,13 @@
-define(MULTIPLIER_MAX, 16#200000).
%% proxy_protocol v1 header human readable
-define(PPV1_PROXY, "PROXY ").
-define(PPV1_PROXY_UNKNOWN, "PROXY UNKNOWN").
%% proxy_protocol v2 header signature:
%% 16#0D,16#0A, 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A
-define(PPV2_HEADER_SIG, "\r\n\r\n\0\r\nQUIT\n").
-dialyzer({no_match, [serialize_utf8_string/2]}).
%%--------------------------------------------------------------------
@ -98,6 +105,13 @@ parse(Bin) ->
-spec(parse(binary(), parse_state()) -> parse_result()).
parse(<<>>, {none, Options}) ->
{more, {none, Options}};
parse(<<?PPV1_PROXY, IPVer:5/binary, _Rest/binary>>, {none, _Options})
when IPVer =:= <<"TCP4 ">> orelse IPVer =:= <<"TCP6 ">> ->
error(proxy_protocol_config_disabled);
parse(<<?PPV1_PROXY_UNKNOWN, _Rest/binary>>, {none, _Options}) ->
error(proxy_protocol_config_disabled);
parse(<<?PPV2_HEADER_SIG, _Rest/binary>>, {none, _Options}) ->
error(proxy_protocol_config_disabled);
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
{none, Options = #{strict_mode := StrictMode}}) ->
%% Validate header if strict mode.

View File

@ -23,6 +23,7 @@
, init/4 %% XXX: Compatible with before 4.2 version
, info/1
, check/2
, update_overall_limiter/4
]).
-record(limiter, {
@ -152,3 +153,15 @@ is_message_limiter(conn_messages_in) -> true;
is_message_limiter(conn_messages_routing) -> true;
is_message_limiter(overall_messages_routing) -> true;
is_message_limiter(_) -> false.
update_overall_limiter(Zone, Name, Capacity, Interval) ->
case is_overall_limiter(Name) of
false -> false;
_ ->
try
esockd_limiter:update({Zone, Name}, Capacity, Interval),
true
catch _:_:_ ->
false
end
end.

View File

@ -63,7 +63,7 @@
%% Simulate the active_n opt
active_n :: pos_integer(),
%% MQTT Piggyback
mqtt_piggyback :: single | multiple,
mqtt_piggyback :: single | multiple,
%% Limiter
limiter :: maybe(emqx_limiter:limiter()),
%% Limit Timer
@ -486,6 +486,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
NState = State#state{parse_state = NParseState},
parse_incoming(Rest, postpone({incoming, Packet}, NState))
catch
error:proxy_protocol_config_disabled ->
?LOG(error,
"~nMalformed packet, "
"please check proxy_protocol config for specific listeners and zones~n"),
FrameError = {frame_error, proxy_protocol_config_disabled},
postpone({incoming, FrameError} ,State);
error:Reason:Stk ->
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p",
[Reason, Stk, Data]),
@ -544,7 +550,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT
postpone({check_gc, Stats}, State);
false -> State
end,
{case MQTTPiggyback of
single -> [{binary, IoData}];
multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData)
@ -689,4 +695,3 @@ trigger(Event) -> erlang:send(self(), Event).
set_field(Name, Value, State) ->
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
setelement(Pos+1, State, Value).

View File

@ -82,7 +82,7 @@ t_chan_caps(_) ->
#{max_clientid_len := 65535,
max_qos_allowed := 2,
max_topic_alias := 65535,
max_topic_levels := 0,
max_topic_levels := 128,
retain_available := true,
shared_subscription := true,
subscription_identifiers := true,
@ -768,4 +768,3 @@ session(InitFields) when is_map(InitFields) ->
quota() ->
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
{overall_messages_routing, {10, 1}}]).