From 8bb9ad7206b49f3b7083a0cd4a90ff9c63d7bd71 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 26 Jan 2022 15:03:00 +0800 Subject: [PATCH 01/13] fix: sys_mem alarm is not triggered after reboot. --- src/emqx.appup.src | 48 +++++++++++++++++++++++++++++++++++--- src/emqx_alarm.erl | 11 +++++---- src/emqx_app.erl | 3 --- src/emqx_os_mon.erl | 23 ++++++++++++++++-- test/emqx_os_mon_SUITE.erl | 21 ++++++++++------- 5 files changed, 86 insertions(+), 20 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index d37dc20c1..80d294f37 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -25,6 +25,7 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ @@ -49,6 +50,7 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ @@ -71,6 +73,7 @@ {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ @@ -80,6 +83,7 @@ {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, @@ -91,6 +95,7 @@ {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ @@ -100,6 +105,7 @@ {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, @@ -111,6 +117,7 @@ {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ @@ -124,6 +131,9 @@ {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ @@ -132,13 +142,24 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []} + ]}, + {<<"4.2.10">>, [ + {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module,emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -166,6 +187,7 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ @@ -190,6 +212,7 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ @@ -212,6 +235,7 @@ {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ @@ -231,6 +255,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, @@ -251,6 +277,8 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, @@ -264,6 +292,9 @@ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module,emqx_alarm, brutal_purge, soft_purge, []}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, @@ -273,13 +304,24 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module,emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module,emqx_alarm, brutal_purge, soft_purge, []} + ]}, + {<<"4.2.10">>, [ + {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module,emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index ced6622bf..d07354105 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -56,9 +56,9 @@ name :: binary() | atom(), details :: map() | list(), - + message :: binary(), - + activate_at :: integer() }). @@ -68,9 +68,9 @@ name :: binary() | atom(), details :: map() | list(), - + message :: binary(), - + deactivate_at :: integer() | infinity }). @@ -165,6 +165,8 @@ init([Opts]) -> Actions = proplists:get_value(actions, Opts), SizeLimit = proplists:get_value(size_limit, Opts), ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)), + emqx_alarm_handler:load(), + process_flag(trap_exit, true), {ok, ensure_delete_timer(#state{actions = Actions, size_limit = SizeLimit, validity_period = ValidityPeriod})}. @@ -228,6 +230,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, _State) -> + emqx_alarm_handler:unload(), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 881a1ee99..077b9c889 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -38,13 +38,11 @@ start(_Type, _Args) -> ok = emqx_plugins:init(), emqx_plugins:load(), register(emqx, self()), - emqx_alarm_handler:load(), print_vsn(), {ok, Sup}. -spec(stop(State :: term()) -> term()). stop(_State) -> - emqx_alarm_handler:unload(), emqx_boot:is_enabled(listeners) andalso emqx_listeners:stop(). @@ -68,4 +66,3 @@ start_autocluster() -> ekka:callback(prepare, fun emqx:shutdown/1), ekka:callback(reboot, fun emqx:reboot/0), ekka:autocluster(?APP). - diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 2852c917b..4e57c59ec 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -81,7 +81,7 @@ get_mem_check_interval() -> set_mem_check_interval(Seconds) when Seconds < 60 -> memsup:set_check_interval(1); -set_mem_check_interval(Seconds) -> +set_mem_check_interval(Seconds) -> memsup:set_check_interval(Seconds div 60). get_sysmem_high_watermark() -> @@ -105,8 +105,10 @@ call(Req) -> init([Opts]) -> set_mem_check_interval(proplists:get_value(mem_check_interval, Opts)), - set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts)), + HW = proplists:get_value(sysmem_high_watermark, Opts), + set_sysmem_high_watermark(HW), set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts)), + ensure_system_memory_alarm(HW), {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts), cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts), cpu_check_interval => proplists:get_value(cpu_check_interval, Opts), @@ -177,3 +179,20 @@ ensure_check_timer(State = #{cpu_check_interval := Interval}) -> "x86_64-pc-linux-musl" -> State; _ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)} end. + +%% At startup, memsup starts first and checks for memory alarms, +%% but emqx_alarm_handler is not yet used instead of alarm_handler, +%% so alarm_handler is used directly for notification (normally emqx_alarm_handler should be used). +%%The internal memsup will no longer trigger events that have been alerted, +%% and there is no exported function to remove the alerted flag, +%% so it can only be checked again at startup. +ensure_system_memory_alarm(HW) -> + case erlang:whereis(memsup) of + undefined -> ok; + _Pid -> + {Allocated, Total, _Worst} = memsup:get_memory_data(), + case Total =/= 0 andalso Allocated/Total * 100 >= HW of + true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW}); + false -> ok + end + end. diff --git a/test/emqx_os_mon_SUITE.erl b/test/emqx_os_mon_SUITE.erl index cb57b900f..07a7a5271 100644 --- a/test/emqx_os_mon_SUITE.erl +++ b/test/emqx_os_mon_SUITE.erl @@ -24,10 +24,23 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([], + fun(emqx) -> + application:set_env(emqx, os_mon, [ + {cpu_check_interval, 1}, + {cpu_high_watermark, 5}, + {cpu_low_watermark, 80}, + {mem_check_interval, 60}, + {sysmem_high_watermark, 70}, + {procmem_high_watermark, 5}]); + (_) -> ok + end), application:ensure_all_started(os_mon), Config. end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]), application:stop(os_mon). % t_set_mem_check_interval(_) -> @@ -40,13 +53,6 @@ end_per_suite(_Config) -> % error('TODO'). t_api(_) -> - gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), - {ok, _} = emqx_os_mon:start_link([{cpu_check_interval, 1}, - {cpu_high_watermark, 5}, - {cpu_low_watermark, 80}, - {mem_check_interval, 60}, - {sysmem_high_watermark, 70}, - {procmem_high_watermark, 5}]), ?assertEqual(1, emqx_os_mon:get_cpu_check_interval()), ?assertEqual(5, emqx_os_mon:get_cpu_high_watermark()), ?assertEqual(80, emqx_os_mon:get_cpu_low_watermark()), @@ -69,4 +75,3 @@ t_api(_) -> emqx_os_mon ! ignored, gen_server:stop(emqx_os_mon), ok. - From 0c6e83f08eb8be1e70d7a7a246602fd47d53d7a6 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 21 Feb 2022 20:13:21 +0800 Subject: [PATCH 02/13] fix(frame): `server_keepalive` only for MQTT v5.0 --- src/emqx_channel.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 1b20753aa..6c03a1a04 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1455,11 +1455,13 @@ enrich_connack_caps(AckProps, _Channel) -> AckProps. %%-------------------------------------------------------------------- %% Enrich server keepalive -enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) -> +enrich_server_keepalive(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{zone := Zone}}) -> case emqx_zone:server_keepalive(Zone) of undefined -> AckProps; Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} - end. + end; + +enrich_server_keepalive(AckProps, _Channel) -> AckProps. %%-------------------------------------------------------------------- %% Enrich response information @@ -1505,7 +1507,7 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined. %%-------------------------------------------------------------------- -%% Enrich Keepalive +%% Ensure Keepalive ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive_timer(Interval, Channel); @@ -1671,4 +1673,3 @@ flag(false) -> 0. set_field(Name, Value, Channel) -> Pos = emqx_misc:index_of(Name, record_info(fields, channel)), setelement(Pos+1, Channel, Value). - From c13813538e89a6242c72d063e8593bf56ec6974d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 21 Feb 2022 20:19:12 +0800 Subject: [PATCH 03/13] chroe(appup): update appup.src --- src/emqx.appup.src | 144 +++++++++++++++++++++++---------------------- 1 file changed, 74 insertions(+), 70 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 80d294f37..89ca543b8 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -22,11 +22,11 @@ {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_misc,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {add_module, emqx_congestion}, @@ -47,11 +47,11 @@ {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {add_module, emqx_congestion}, @@ -69,10 +69,10 @@ {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, @@ -91,10 +91,10 @@ {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, @@ -113,10 +113,10 @@ {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, @@ -127,10 +127,10 @@ {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, @@ -149,6 +149,7 @@ ]}, {<<"4.2.9">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, @@ -157,9 +158,10 @@ {load_module, emqx_os_mon, brutal_purge, soft_purge, []} ]}, {<<"4.2.10">>, [ - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module,emqx_alarm, brutal_purge, soft_purge, []} + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -184,11 +186,11 @@ {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -209,11 +211,11 @@ {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -231,11 +233,11 @@ {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ @@ -252,12 +254,12 @@ {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ @@ -274,12 +276,12 @@ {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ @@ -289,13 +291,13 @@ {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, - {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module,emqx_alarm, brutal_purge, soft_purge, []}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ @@ -304,24 +306,26 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module,emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module,emqx_alarm, brutal_purge, soft_purge, []} + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<"4.2.10">>, [ - {load_module,emqx_app, brutal_purge, soft_purge, []}, - {load_module,emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module,emqx_alarm, brutal_purge, soft_purge, []} + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] From 2b17d6e297cb5b16f11294fc7d43c1f54a1aff58 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 13:58:44 +0800 Subject: [PATCH 04/13] feat(frame): utf-8 string check in `strict_mode` --- src/emqx_frame.erl | 296 +++++++++++++++++++++++++++++---------------- 1 file changed, 192 insertions(+), 104 deletions(-) diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index f0e2f6e4e..60d8f52f8 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -196,8 +196,9 @@ packet(Header, Variable) -> packet(Header, Variable, Payload) -> #mqtt_packet{header = Header, variable = Variable, payload = Payload}. -parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> - {ProtoName, Rest} = parse_utf8_string(FrameBin), +parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, + #{strict_mode := StrictMode}) -> + {ProtoName, Rest} = parse_utf8_string(FrameBin, StrictMode), <> = Rest, % Note: Crash when reserved flag doesn't equal to 0, there is no strict % compliance with the MQTT5.0. @@ -211,8 +212,8 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> KeepAlive : 16/big, Rest2/binary>> = Rest1, - {Properties, Rest3} = parse_properties(Rest2, ProtoVer), - {ClientId, Rest4} = parse_utf8_string(Rest3), + {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), + {ClientId, Rest4} = parse_utf8_string(Rest3, StrictMode), ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, is_bridge = (BridgeTag =:= 8), @@ -224,14 +225,14 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> properties = Properties, clientid = ClientId }, - {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), - {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), - {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), + {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4, StrictMode), + {Username, Rest6} = parse_utf8_string(Rest5, StrictMode, bool(UsernameFlag)), + {Passsword, <<>>} = parse_utf8_string(Rest6, StrictMode, bool(PasswordFlag)), ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword}; -parse_packet(#mqtt_packet_header{type = ?CONNACK}, - <>, #{version := Ver}) -> - {Properties, <<>>} = parse_properties(Rest, Ver), +parse_packet(#mqtt_packet_header{type = ?CONNACK}, <>, + #{strict_mode := StrictMode, version := Ver}) -> + {Properties, <<>>} = parse_properties(Rest, Ver, StrictMode), #mqtt_packet_connack{ack_flags = AckFlags, reason_code = ReasonCode, properties = Properties @@ -239,21 +240,22 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, #{strict_mode := StrictMode, version := Ver}) -> - {TopicName, Rest} = parse_utf8_string(Bin), + {TopicName, Rest} = parse_utf8_string(Bin, StrictMode), {PacketId, Rest1} = case QoS of ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) end, (PacketId =/= undefined) andalso StrictMode andalso validate_packet_id(PacketId), - {Properties, Payload} = parse_properties(Rest1, Ver), + {Properties, Payload} = parse_properties(Rest1, Ver, StrictMode), Publish = #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, properties = Properties }, {Publish, Payload}; -parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode}) +parse_packet(#mqtt_packet_header{type = PubAck}, <>, + #{strict_mode := StrictMode}) when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> StrictMode andalso validate_packet_id(PacketId), #mqtt_packet_puback{packet_id = PacketId, reason_code = 0}; @@ -262,7 +264,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, < StrictMode andalso validate_packet_id(PacketId), - {Properties, <<>>} = parse_properties(Rest, Ver), + {Properties, <<>>} = parse_properties(Rest, Ver, StrictMode), #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, properties = Properties @@ -271,7 +273,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), TopicFilters = parse_topic_filters(subscribe, Rest1), ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]), #mqtt_packet_subscribe{packet_id = PacketId, @@ -282,7 +284,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_suback{packet_id = PacketId, properties = Properties, @@ -292,7 +294,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBACK}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), TopicFilters = parse_topic_filters(unsubscribe, Rest1), #mqtt_packet_unsubscribe{packet_id = PacketId, properties = Properties, @@ -307,7 +309,7 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_unsuback{packet_id = PacketId, properties = Properties, @@ -315,115 +317,119 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode), #mqtt_packet_disconnect{reason_code = ReasonCode, properties = Properties }; parse_packet(#mqtt_packet_header{type = ?AUTH}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode), #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}. parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, - proto_ver = Ver}, Bin) -> - {Props, Rest} = parse_properties(Bin, Ver), - {Topic, Rest1} = parse_utf8_string(Rest), + proto_ver = Ver}, + Bin, StrictMode) -> + {Props, Rest} = parse_properties(Bin, Ver, StrictMode), + {Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Payload, Rest2} = parse_binary_data(Rest1), {Packet#mqtt_packet_connect{will_props = Props, will_topic = Topic, will_payload = Payload }, Rest2}; -parse_will_message(Packet, Bin) -> {Packet, Bin}. +parse_will_message(Packet, Bin, _StrictMode) -> {Packet, Bin}. -compile({inline, [parse_packet_id/1]}). parse_packet_id(<>) -> {PacketId, Rest}. -parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> +parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 -> {#{}, Bin}; %% TODO: version mess? -parse_properties(<<>>, ?MQTT_PROTO_V5) -> +parse_properties(<<>>, ?MQTT_PROTO_V5, _StrictMode) -> {#{}, <<>>}; -parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) -> +parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5, _StrictMode) -> {#{}, Rest}; -parse_properties(Bin, ?MQTT_PROTO_V5) -> +parse_properties(Bin, ?MQTT_PROTO_V5, StrictMode) -> {Len, Rest} = parse_variable_byte_integer(Bin), <> = Rest, - {parse_property(PropsBin, #{}), Rest1}. + {parse_property(PropsBin, #{}, StrictMode), Rest1}. -parse_property(<<>>, Props) -> +parse_property(<<>>, Props, _StrictMode) -> Props; -parse_property(<<16#01, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}); -parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}); -parse_property(<<16#03, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Content-Type' => Val}); -parse_property(<<16#08, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Topic' => Val}); -parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Correlation-Data' => Val}); -parse_property(<<16#0B, Bin/binary>>, Props) -> +parse_property(<<16#01, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}, StrictMode); +parse_property(<<16#02, Val:32/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}, StrictMode); +parse_property(<<16#03, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Content-Type' => Val}, StrictMode); +parse_property(<<16#08, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Response-Topic' => Val}, StrictMode); +parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Correlation-Data' => Val}, StrictMode); +parse_property(<<16#0B, Bin/binary>>, Props, StrictMode) -> {Val, Rest} = parse_variable_byte_integer(Bin), - parse_property(Rest, Props#{'Subscription-Identifier' => Val}); -parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}); -parse_property(<<16#12, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}); -parse_property(<<16#13, Val:16, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Server-Keep-Alive' => Val}); -parse_property(<<16#15, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Authentication-Method' => Val}); -parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Authentication-Data' => Val}); -parse_property(<<16#17, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Problem-Information' => Val}); -parse_property(<<16#18, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Will-Delay-Interval' => Val}); -parse_property(<<16#19, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Response-Information' => Val}); -parse_property(<<16#1A, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Information' => Val}); -parse_property(<<16#1C, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Server-Reference' => Val}); -parse_property(<<16#1F, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Reason-String' => Val}); -parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Receive-Maximum' => Val}); -parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}); -parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias' => Val}); -parse_property(<<16#24, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-QoS' => Val}); -parse_property(<<16#25, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Retain-Available' => Val}); -parse_property(<<16#26, Bin/binary>>, Props) -> - {Pair, Rest} = parse_utf8_pair(Bin), + parse_property(Rest, Props#{'Subscription-Identifier' => Val}, StrictMode); +parse_property(<<16#11, Val:32/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}, StrictMode); +parse_property(<<16#12, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}, StrictMode); +parse_property(<<16#13, Val:16, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Server-Keep-Alive' => Val}, StrictMode); +parse_property(<<16#15, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Authentication-Method' => Val}, StrictMode); +parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Authentication-Data' => Val}, StrictMode); +parse_property(<<16#17, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Request-Problem-Information' => Val}, StrictMode); +parse_property(<<16#18, Val:32, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Will-Delay-Interval' => Val}, StrictMode); +parse_property(<<16#19, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Request-Response-Information' => Val}, StrictMode); +parse_property(<<16#1A, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Response-Information' => Val}, StrictMode); +parse_property(<<16#1C, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Server-Reference' => Val}, StrictMode); +parse_property(<<16#1F, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Reason-String' => Val}, StrictMode); +parse_property(<<16#21, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Receive-Maximum' => Val}, StrictMode); +parse_property(<<16#22, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}, StrictMode); +parse_property(<<16#23, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Topic-Alias' => Val}, StrictMode); +parse_property(<<16#24, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Maximum-QoS' => Val}, StrictMode); +parse_property(<<16#25, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Retain-Available' => Val}, StrictMode); +parse_property(<<16#26, Bin/binary>>, Props, StrictMode) -> + {Pair, Rest} = parse_utf8_pair(Bin, StrictMode), case maps:find('User-Property', Props) of {ok, UserProps} -> UserProps1 = lists:append(UserProps, [Pair]), - parse_property(Rest, Props#{'User-Property' := UserProps1}); + parse_property(Rest, Props#{'User-Property' := UserProps1}, StrictMode); error -> - parse_property(Rest, Props#{'User-Property' => [Pair]}) + parse_property(Rest, Props#{'User-Property' => [Pair]}, StrictMode) end; -parse_property(<<16#27, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}); -parse_property(<<16#28, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}); -parse_property(<<16#29, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); -parse_property(<<16#2A, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). +parse_property(<<16#27, Val:32, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}, StrictMode); +parse_property(<<16#28, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}, StrictMode); +parse_property(<<16#29, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}, StrictMode); +parse_property(<<16#2A, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}, StrictMode); +parse_property(<>, _Props, _StrictMode) -> + error(#{invalid_property_code => Property}). +%% TODO: invalid property in specific packet. parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin, 1, 0). @@ -445,20 +451,53 @@ parse_topic_filters(unsubscribe, Bin) -> parse_reason_codes(Bin) -> [Code || <> <= Bin]. -parse_utf8_pair(<>) -> - {{Key, Val}, Rest}. +%%-------------------- +%% parse utf8 pair +parse_utf8_pair( <> + , true) -> + {{validate_utf8(Key), validate_utf8(Val)}, Rest}; +parse_utf8_pair( <> + , false) -> + {{Key, Val}, Rest}; +parse_utf8_pair(<>, _StrictMode) + when LenK > byte_size(Rest) -> + error(user_property_not_enough_bytes); +parse_utf8_pair(<>, _StrictMode) + when LenV > byte_size(Rest) -> + error(malformed_user_property_value); +parse_utf8_pair(Bin, _StrictMode) + when 4 > byte_size(Bin) -> + error(user_property_not_enough_bytes). -parse_utf8_string(Bin, false) -> +%%-------------------- +%% parse utf8 string +parse_utf8_string(Bin, _StrictMode, false) -> {undefined, Bin}; -parse_utf8_string(Bin, true) -> - parse_utf8_string(Bin). +parse_utf8_string(Bin, StrictMode, true) -> + parse_utf8_string(Bin, StrictMode). -parse_utf8_string(<>) -> - {Str, Rest}. +parse_utf8_string(<>, true) -> + {validate_utf8(Str), Rest}; +parse_utf8_string(<>, false) -> + {Str, Rest}; +parse_utf8_string(<>, _) + when Len > byte_size(Rest) -> + error(malformed_utf8_string); +parse_utf8_string(Bin, _) + when 2 > byte_size(Bin) -> + error(malformed_utf8_string_length). parse_binary_data(<>) -> - {Data, Rest}. + {Data, Rest}; +parse_binary_data(<>) + when Len > byte_size(Rest) -> + error(malformed_binary_data); +parse_binary_data(Bin) + when 2 > byte_size(Bin) -> + error(malformed_binary_data_length). %%-------------------------------------------------------------------- %% Serialize MQTT Packet @@ -796,3 +835,52 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. + +validate_utf8(Bin) -> + case unicode:characters_to_binary(Bin) of + {error, _, _} -> + error(utf8_string_invalid); + {incomplete, _, _} -> + error(utf8_string_invalid); + Bin when is_binary(Bin) -> + case validate_mqtt_utf8_char(Bin) of + true -> Bin; + false -> error(utf8_string_invalid) + end + end. + +%% Is the utf8 string respecting UTF-8 characters defined by MQTT Spec? +%% i.e. contains invalid UTF-8 char or control char +validate_mqtt_utf8_char(<<>>) -> + true; +%% ==== 1-Byte UTF-8 invalid: [[U+0000 .. U+001F] && [U+007F]] +validate_mqtt_utf8_char(<>) + when B1 >= 16#20, B1 =< 16#7E -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<>) + when B1 >= 16#00, B1 =< 16#1F; + B1 =:= 16#7F -> + %% [U+0000 .. U+001F] && [U+007F] + false; +%% ==== 2-Bytes UTF-8 invalid: [U+0080 .. U+009F] +validate_mqtt_utf8_char(<>) + when B1 =:= 16#C2; + B2 >= 16#A0, B2 =< 16#BF; + B1 > 16#C3, B1 =< 16#DE; + B2 >= 16#80, B2 =< 16#BF -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<<16#C2, B2, _Bs/binary>>) + when B2 >= 16#80, B2 =< 16#9F -> + %% [U+0080 .. U+009F] + false; +%% ==== 3-Bytes UTF-8 invalid: [U+D800 .. U+DFFF] +validate_mqtt_utf8_char(<>) + when B1 >= 16#E0, B1 =< 16#EE; + B1 =:= 16#EF -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<<16#ED, _B2, _B3, _Bs/binary>>) -> + false; +%% ==== 4-Bytes UTF-8 +validate_mqtt_utf8_char(<>) + when B1 =:= 16#0F -> + validate_mqtt_utf8_char(Bs). From 3505e286baf6c670f66d14a1ee0ac1816e2282fb Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 14:01:29 +0800 Subject: [PATCH 05/13] test(frame): malformed utf-8 packet --- test/emqx_frame_SUITE.erl | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index aef88cf9f..ee2982611 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -43,7 +43,8 @@ groups() -> [{parse, [parallel], [t_parse_cont, t_parse_frame_too_large, - t_parse_frame_malformed_variable_byte_integer + t_parse_frame_malformed_variable_byte_integer, + t_parse_malformed_utf8_string ]}, {connect, [parallel], [t_serialize_parse_v3_connect, @@ -136,6 +137,24 @@ t_parse_frame_malformed_variable_byte_integer(_) -> ?catch_error(malformed_variable_byte_integer, emqx_frame:parse(MalformedPayload, ParseState)). + +t_parse_malformed_utf8_string(_) -> + MalformedPacket = <<16,31,0,4, + %% Specification name, should be "MQTT" + %% 77,81,84,84, + %% malformed 1-Byte UTF-8 in (U+0000 .. U+001F] && [U+007F]) + 16#00,16#01,16#1F,16#7F, + + 4,194,0,60, + 0,4,101,109, + 113,120,0,5, + 97,100,109,105, + 110,0,6,112, + 117,98,108,105, + 99>>, + ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), + ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). + t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, From c5c3f78c5a1d62d9e3245674475b77baf5bd2492 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 14:01:58 +0800 Subject: [PATCH 06/13] test(frame): variable_byte_integer and proxy_protocol disabled --- src/emqx_frame.erl | 4 ++++ test/emqx_frame_SUITE.erl | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 60d8f52f8..2149833d6 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -78,6 +78,10 @@ -dialyzer({no_match, [serialize_utf8_string/2]}). +-ifdef(TEST). +-export([parse_variable_byte_integer/1]). +-endif. + %%-------------------------------------------------------------------- %% Init Parse State %%-------------------------------------------------------------------- diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index ee2982611..c51ab8d7f 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -44,7 +44,9 @@ groups() -> [t_parse_cont, t_parse_frame_too_large, t_parse_frame_malformed_variable_byte_integer, - t_parse_malformed_utf8_string + t_parse_frame_variable_byte_integer, + t_parse_malformed_utf8_string, + t_parse_frame_proxy_protocol %% proxy_protocol_config_disabled packet. ]}, {connect, [parallel], [t_serialize_parse_v3_connect, @@ -137,6 +139,10 @@ t_parse_frame_malformed_variable_byte_integer(_) -> ?catch_error(malformed_variable_byte_integer, emqx_frame:parse(MalformedPayload, ParseState)). +t_parse_frame_variable_byte_integer(_) -> + Bin = <<2#10010011, 2#10000000, 2#10001000, 2#10011001, 2#10101101, 2#00110010>>, + ?catch_error(malformed_variable_byte_integer, + emqx_frame:parse_variable_byte_integer(Bin)). t_parse_malformed_utf8_string(_) -> MalformedPacket = <<16,31,0,4, @@ -155,6 +161,13 @@ t_parse_malformed_utf8_string(_) -> ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). +t_parse_frame_proxy_protocol(_) -> + BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> + , <<"\r\n\r\n\0\r\nQUIT\n">>], + [?assertError( proxy_protocol_config_disabled + , emqx_frame:parse(Bin)) + || Bin <- BinList]. + t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, @@ -550,4 +563,3 @@ parse_to_packet(Bin, Opts) -> Packet. payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). - From a2f9384dfcb7cc17c2c76e60d9827023b54e87b3 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 14:08:58 +0800 Subject: [PATCH 07/13] chore(appup): update appup.src --- .gitignore | 8 ++++++++ src/emqx.appup.src | 2 ++ 2 files changed, 10 insertions(+) diff --git a/.gitignore b/.gitignore index 2e19823c3..ded111ff0 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,11 @@ erlang.mk etc/emqx.conf.rendered Mnesia.*/ .stamp +erlang_ls.config +# Emacs Backup files +*~ +# Emacs temporary files +.#* +*# +# For direnv +.envrc diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 89ca543b8..75c35386d 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -159,6 +159,7 @@ ]}, {<<"4.2.10">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []} @@ -323,6 +324,7 @@ ]}, {<<"4.2.10">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []} From ee2423ee618dde17a745e9d4e2ee6e6aed4cdef9 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Mon, 11 Apr 2022 15:53:07 +0800 Subject: [PATCH 08/13] fix: usort plugins name --- src/emqx.appup.src | 54 +++++++++++++++++++++++++++++--------------- src/emqx_plugins.erl | 21 +++++++++++++---- 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 75c35386d..18c3fb04d 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -26,7 +26,8 @@ {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {"4.2.1", [ {add_module, emqx_congestion}, @@ -51,7 +52,8 @@ {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[23]">>, [ {add_module, emqx_congestion}, @@ -74,7 +76,8 @@ {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -96,7 +99,8 @@ {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -118,7 +122,8 @@ {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -134,7 +139,8 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -145,7 +151,8 @@ {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, - {load_module, emqx_os_mon, brutal_purge, soft_purge, []} + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -155,14 +162,16 @@ {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, - {load_module, emqx_os_mon, brutal_purge, soft_purge, []} + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.10">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module, emqx_alarm, brutal_purge, soft_purge, []} + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -191,7 +200,8 @@ {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {"4.2.1", [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -216,7 +226,8 @@ {load_module, emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_misc,brutal_purge,soft_purge,[]} + {load_module, emqx_misc,brutal_purge,soft_purge,[]}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[23]">>, [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -239,7 +250,8 @@ {load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -261,7 +273,8 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -283,7 +296,8 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -299,7 +313,8 @@ {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -310,7 +325,8 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, - {load_module, emqx_limiter, brutal_purge, soft_purge, []} + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -320,14 +336,16 @@ {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module, emqx_alarm, brutal_purge, soft_purge, []} + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.10">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, - {load_module, emqx_alarm, brutal_purge, soft_purge, []} + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index faa6abae3..e22cd2058 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -103,7 +103,7 @@ unload(PluginName) when is_atom(PluginName) -> ?LOG(error, "Plugin ~s is not started", [PluginName]), {error, not_started}; {_, _} -> - unload_plugin(PluginName, true) + unload_plugin(PluginName, true) end. reload(PluginName) when is_atom(PluginName)-> @@ -204,10 +204,21 @@ with_loaded_file(File, SuccFun) -> end. filter_plugins(Names) -> - lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; - ({Name1, true}) -> {true, Name1}; - ({_Name1, false}) -> false - end, Names). + filter_plugins(Names, []). + +filter_plugins([], Plugins) -> + lists:reverse(Plugins); +filter_plugins([{Name, Load} | Names], Plugins) -> + case {Load, lists:member(Name, Plugins)} of + {true, false} -> + filter_plugins(Names, [Name | Plugins]); + {false, true} -> + filter_plugins(Names, Plugins -- [Name]); + _ -> + filter_plugins(Names, Plugins) + end; +filter_plugins([Name | Names], Plugins) when is_atom(Name) -> + filter_plugins([{Name, true} | Names], Plugins). load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), From 3438570fc92bb39feb97b893d4465b1950056be3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 12 Apr 2022 13:50:24 +0800 Subject: [PATCH 09/13] fix(log): format the message id to hexstring before printing --- src/emqx.appup.src | 18 ++++++++++++++++++ src/emqx_message.erl | 3 ++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 18c3fb04d..fe9c71cb9 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ {VSN, [ {"4.2.0", [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {add_module, emqx_congestion}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -30,6 +31,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {"4.2.1", [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {add_module, emqx_congestion}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -56,6 +58,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[23]">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {add_module, emqx_congestion}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, @@ -80,6 +83,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, @@ -103,6 +107,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, @@ -126,6 +131,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, @@ -143,6 +149,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, @@ -155,6 +162,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, @@ -166,6 +174,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.10">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, @@ -177,6 +186,7 @@ ], [ {"4.2.0", [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, @@ -204,6 +214,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {"4.2.1", [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, @@ -230,6 +241,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[23]">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, @@ -254,6 +266,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, @@ -277,6 +290,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, @@ -300,6 +314,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, @@ -317,6 +332,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, @@ -329,6 +345,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, @@ -340,6 +357,7 @@ {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.10">>, [ + {load_module, emqx_message, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, diff --git a/src/emqx_message.erl b/src/emqx_message.erl index decbb0c18..7c626fa94 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -298,7 +298,8 @@ elapsed(Since) -> format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) -> io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)", - [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). + [emqx_guid:to_hexstr(Id), QoS, Topic, From, format(flags, Flags), + format(headers, Headers)]). format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); From 09e33934867936399b5f6d81920b871da192b859 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 13 Apr 2022 10:28:21 +0800 Subject: [PATCH 10/13] fix(frame): prohibit empty topic in strict mode --- src/emqx_frame.erl | 11 +++++++++++ test/emqx_frame_SUITE.erl | 17 +++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 2149833d6..8dd657253 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -252,6 +252,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, (PacketId =/= undefined) andalso StrictMode andalso validate_packet_id(PacketId), {Properties, Payload} = parse_properties(Rest1, Ver, StrictMode), + ok = ensure_topic_name_valid(StrictMode, TopicName, Properties), Publish = #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, properties = Properties @@ -338,6 +339,7 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, {Props, Rest} = parse_properties(Bin, Ver, StrictMode), {Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Payload, Rest2} = parse_binary_data(Rest1), + ok = ensure_topic_name_valid(StrictMode, Topic, Props), {Packet#mqtt_packet_connect{will_props = Props, will_topic = Topic, will_payload = Payload @@ -503,6 +505,15 @@ parse_binary_data(Bin) when 2 > byte_size(Bin) -> error(malformed_binary_data_length). +ensure_topic_name_valid(false, _TopicName, _Properties) -> + ok; +ensure_topic_name_valid(true, TopicName, _Properties) when TopicName =/= <<>> -> + ok; +ensure_topic_name_valid(true, <<>>, #{'Topic-Alias' := _}) -> + ok; +ensure_topic_name_valid(true, <<>>, _) -> + error(empty_topic_name). + %%-------------------------------------------------------------------- %% Serialize MQTT Packet %%-------------------------------------------------------------------- diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index c51ab8d7f..eca130bbe 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -46,6 +46,8 @@ groups() -> t_parse_frame_malformed_variable_byte_integer, t_parse_frame_variable_byte_integer, t_parse_malformed_utf8_string, + t_parse_empty_topic_name, + t_parse_empty_topic_name_with_alias, t_parse_frame_proxy_protocol %% proxy_protocol_config_disabled packet. ]}, {connect, [parallel], @@ -161,6 +163,21 @@ t_parse_malformed_utf8_string(_) -> ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). +t_parse_empty_topic_name(_) -> + Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<>>), + ?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => false})), + ?catch_error(empty_topic_name, parse_serialize(Packet, #{strict_mode => true})). + +t_parse_empty_topic_name_with_alias(_) -> + Props = #{'Topic-Alias' => 16#AB}, + Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, Props, <<>>), + ?assertEqual( + Packet, parse_serialize(Packet, #{strict_mode => false, version => ?MQTT_PROTO_V5}) + ), + ?assertEqual( + Packet, parse_serialize(Packet, #{strict_mode => true, version => ?MQTT_PROTO_V5}) + ). + t_parse_frame_proxy_protocol(_) -> BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> , <<"\r\n\r\n\0\r\nQUIT\n">>], From f62795d9197adf9ed8d874b86813281c91cf281a Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 13 Apr 2022 14:46:10 +0800 Subject: [PATCH 11/13] fix: add update_conf function for emqx_conf. --- priv/emqx.schema | 34 +++++++++++++++++++++++++++++----- src/emqx.appup.src | 20 ++++++++++++++++++++ src/emqx_limiter.erl | 27 ++++++++++++++++----------- src/emqx_listeners.erl | 15 +++++++++++++++ 4 files changed, 80 insertions(+), 16 deletions(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index 0ff956869..10daac600 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -255,6 +255,27 @@ end}. {validator, "range4ports", "must be 1024 to 134217727", fun(X) -> X >= 1024 andalso X =< 134217727 end}. +{validator, "range:0-2", "must be 0 to 2", + fun(X) -> X >= 0 andalso X =< 2 end}. + +{validator, "range:0-128", "must be 0 to 128", + fun(X) -> X >= 0 andalso X =< 128 end}. + +{validator, "range:0-65535", "must be 0 to 65535", + fun(X) -> X >= 0 andalso X =< 65535 end}. + +{validator, "range:1-65535", "must be 1 to 65535", + fun(X) -> X >= 1 andalso X =< 65535 end}. + +{validator, "range:1-9", "must be 1 to 9", + fun(X) -> X >= 1 andalso X =< 9 end}. + +{validator, "range:8-15", "must be 8 to 15", + fun(X) -> X >= 8 andalso X =< 15 end}. + +{validator, "range:0-1024", "must be 0 to 1024", + fun(X) -> X >= 0 andalso X =< 1024 end}. + %% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl {mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ {datatype, bytesize}, @@ -290,10 +311,10 @@ end}. {default, 1000}, {datatype, integer}, hidden, - {validators, ["positive_integer"]} + {validators, ["range:0-inf"]} ]}. -{validator, "positive_integer", "must be a positive integer", +{validator, "range:0-inf", "must be non neg_integer", fun(X) -> X >= 0 end}. %% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, @@ -758,7 +779,8 @@ end}. %% @doc Set the Maximum topic levels. {mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [ {default, 128}, - {datatype, integer} + {datatype, integer}, + {validators, ["range:0-inf"]} ]}. %% @doc Set the Maximum QoS allowed. @@ -771,7 +793,8 @@ end}. %% @doc Set the Maximum Topic Alias. {mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [ {default, 65535}, - {datatype, integer} + {datatype, integer}, + {validators, ["range:0-65535"]} ]}. %% @doc Whether the server supports MQTT retained messages. @@ -868,7 +891,8 @@ end}. %% @doc Set the Maximum topic levels. {mapping, "zone.$name.max_topic_levels", "emqx.zones", [ - {datatype, integer} + {datatype, integer}, + {validators, ["range:0-128"]} ]}. %% @doc Set the Maximum QoS allowed. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index fe9c71cb9..2adb72986 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -10,6 +10,7 @@ {load_module, emqx_session, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, @@ -37,6 +38,7 @@ {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, @@ -80,6 +82,7 @@ {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ @@ -104,6 +107,7 @@ {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ @@ -128,6 +132,7 @@ {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ @@ -146,6 +151,7 @@ {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ @@ -156,6 +162,7 @@ {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, @@ -168,6 +175,7 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, @@ -180,6 +188,8 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} @@ -192,6 +202,7 @@ {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, @@ -218,6 +229,7 @@ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, @@ -263,6 +275,7 @@ {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ @@ -287,6 +300,7 @@ {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ @@ -311,6 +325,7 @@ {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ @@ -329,6 +344,7 @@ {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ @@ -342,6 +358,7 @@ {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ @@ -351,6 +368,7 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, @@ -363,6 +381,8 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_limiter, brutal_purge, soft_purge, []}, + {load_module, emqx_listeners, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index 8acd50d32..4fe3a98e1 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -23,7 +23,8 @@ , init/4 %% XXX: Compatible with before 4.2 version , info/1 , check/2 - , update_overall_limiter/4 + , update_overall_limiter/3 + , delete_overall_limiter/1 ]). -record(limiter, { @@ -154,14 +155,18 @@ is_message_limiter(conn_messages_routing) -> true; is_message_limiter(overall_messages_routing) -> true; is_message_limiter(_) -> false. -update_overall_limiter(Zone, Name, Capacity, Interval) -> - case is_overall_limiter(Name) of - false -> false; - _ -> - try - esockd_limiter:update({Zone, Name}, Capacity, Interval), - true - catch _:_:_ -> - false - end +update_overall_limiter(Zone, Capacity, Interval) -> + try + esockd_limiter:update({Zone, overall_messages_routing}, Capacity, Interval), + true + catch _:_:_ -> + false + end. + +delete_overall_limiter(Zone) -> + try + esockd_limiter:delete({Zone, overall_messages_routing}), + true + catch _:_:_ -> + false end. diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index ce256d147..7dbd2c37d 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -29,6 +29,7 @@ , start_listener/3 , stop_listener/1 , stop_listener/3 + , update_listeners_env/2 , restart_listener/1 , restart_listener/3 ]). @@ -114,6 +115,20 @@ with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) -> restart() -> lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])). +update_listeners_env(Action, NewConf = #{name := NewName, proto := NewProto}) -> + Listener = emqx:get_env(listeners, []), + Listener1 = lists:filter( + fun(#{name := Name, proto := Proto}) -> + not (Name =:= NewName andalso Proto =:= NewProto) + end, Listener), + Listener2 = + case Action of + update -> [NewConf | Listener1]; + delete -> Listener1 + end, + application:set_env(emqx, listeners, Listener2), + ok. + -spec(restart_listener(listener()) -> any()). restart_listener({Proto, ListenOn, Options}) -> restart_listener(Proto, ListenOn, Options). From 596ec24c3a5a59e5055506a1f474c0c9cab1d0a3 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 13 Apr 2022 15:01:05 +0800 Subject: [PATCH 12/13] fix: uptime sometimes crash --- src/emqx.appup.src | 17 +++++++++++++++++ src/emqx_sys.erl | 33 ++++++++++----------------------- test/emqx_sys_SUITE.erl | 24 +++++++++++++++++++----- 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 2adb72986..dbaeae143 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -11,6 +11,7 @@ {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, @@ -39,6 +40,7 @@ {load_module, emqx_session, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, @@ -83,6 +85,7 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ @@ -108,6 +111,7 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ @@ -152,6 +156,7 @@ {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ @@ -163,6 +168,7 @@ {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, @@ -176,6 +182,7 @@ {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, @@ -190,6 +197,7 @@ {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} @@ -203,6 +211,7 @@ {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, @@ -230,6 +239,7 @@ {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, @@ -276,6 +286,7 @@ {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.4">>, [ @@ -301,6 +312,7 @@ {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ @@ -326,6 +338,7 @@ {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.[6-7]">>, [ @@ -345,6 +358,7 @@ {load_module, emqx_misc,brutal_purge,soft_purge,[]}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.8">>, [ @@ -359,6 +373,7 @@ {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<"4.2.9">>, [ @@ -369,6 +384,7 @@ {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, @@ -383,6 +399,7 @@ {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_listeners, brutal_purge, soft_purge, []}, + {load_module, emqx_sys, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 38387b3ce..4bfe20a89 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -96,7 +96,15 @@ sysdescr() -> %% @doc Get sys uptime -spec(uptime() -> string()). uptime() -> - gen_server:call(?SYS, uptime). + {TotalWallClock, _} = erlang:statistics(wall_clock), + uptime(TotalWallClock div 1000). + +uptime(Seconds) -> + {D, {H, M, S}} = calendar:seconds_to_daystime(Seconds), + L0 = [{D, " days"}, {H, " hours"}, {M, " minutes"}, {S, " seconds"}], + L1 = lists:dropwhile(fun({K, _}) -> K =:= 0 end, L0), + L2 = lists:map(fun({Time, Unit}) -> [integer_to_list(Time), Unit] end, L1), + lists:flatten(lists:join(", ", L2)). %% @doc Get sys datetime -spec(datetime() -> string()). @@ -139,9 +147,6 @@ heartbeat(State) -> tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. -handle_call(uptime, _From, State) -> - {reply, uptime(State), State}; - handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -151,7 +156,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> - publish(uptime, iolist_to_binary(uptime(State))), + publish(uptime, iolist_to_binary(uptime())), publish(datetime, iolist_to_binary(datetime())), {noreply, heartbeat(State)}; @@ -174,24 +179,6 @@ terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> %% Internal functions %%----------------------------------------------------------------------------- -uptime(#state{start_time = Ts}) -> - Secs = timer:now_diff(erlang:timestamp(), Ts) div 1000000, - lists:flatten(uptime(seconds, Secs)). -uptime(seconds, Secs) when Secs < 60 -> - [integer_to_list(Secs), " seconds"]; -uptime(seconds, Secs) -> - [uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"]; -uptime(minutes, M) when M < 60 -> - [integer_to_list(M), " minutes, "]; -uptime(minutes, M) -> - [uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "]; -uptime(hours, H) when H < 24 -> - [integer_to_list(H), " hours, "]; -uptime(hours, H) -> - [uptime(days, H div 24), integer_to_list(H rem 24), " hours, "]; -uptime(days, D) -> - [integer_to_list(D), " days, "]. - publish(uptime, Uptime) -> safe_publish(systop(uptime), Uptime); publish(datetime, Datetime) -> diff --git a/test/emqx_sys_SUITE.erl b/test/emqx_sys_SUITE.erl index 0814f8b00..c6d95b8ac 100644 --- a/test/emqx_sys_SUITE.erl +++ b/test/emqx_sys_SUITE.erl @@ -34,7 +34,7 @@ end_per_suite(_Config) -> application:unload(emqx), ok = emqx_logger:set_log_level(error), ok. - + % t_version(_) -> % error('TODO'). @@ -42,10 +42,24 @@ end_per_suite(_Config) -> % error('TODO'). t_uptime(_) -> - ?assertEqual(<<"1 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 1))), - ?assertEqual(<<"1 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 60))), - ?assertEqual(<<"1 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 3600))), - ?assertEqual(<<"1 days, 0 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 86400))). + ?assert(is_list(emqx_sys:uptime())), + ?assertEqual(<<"1 seconds">>, iolist_to_binary(emqx_sys:uptime(1))), + ?assertEqual(<<"1 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(60))), + ?assertEqual(<<"1 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(3600))), + ?assertEqual(<<"1 hours, 1 minutes, 1 seconds">>, iolist_to_binary(emqx_sys:uptime(3661))), + ?assertEqual(<<"1 days, 0 hours, 0 minutes, 0 seconds">>, + iolist_to_binary(emqx_sys:uptime(86400))), + lists:map(fun({D, H, M, S}) -> + Expect = << + (integer_to_binary(D))/binary, " days, ", + (integer_to_binary(H))/binary, " hours, ", + (integer_to_binary(M))/binary, " minutes, ", + (integer_to_binary(S))/binary, " seconds" + >>, + Actual = iolist_to_binary(emqx_sys:uptime(D * 86400 + H * 3600 + M * 60 + S)), + ?assertEqual(Expect, Actual) + end, + [{1, 2, 3, 4}, {10, 20, 30, 40}, {2222, 3, 56, 59}, {59, 23, 59, 59}]). % t_datetime(_) -> % error('TODO'). From dd76bdd2fe84d4f24362463aeadb14f2ba602124 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 13 Apr 2022 16:40:44 +0800 Subject: [PATCH 13/13] chore: Update src/emqx_os_mon.erl --- src/emqx_os_mon.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 4e57c59ec..5e580e032 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -190,7 +190,7 @@ ensure_system_memory_alarm(HW) -> case erlang:whereis(memsup) of undefined -> ok; _Pid -> - {Allocated, Total, _Worst} = memsup:get_memory_data(), + {Total, Allocated, _Worst} = memsup:get_memory_data(), case Total =/= 0 andalso Allocated/Total * 100 >= HW of true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW}); false -> ok