From 13f3dafe2297df37aab0d48511889275fb13350f Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Sun, 24 Apr 2022 14:10:51 +0800 Subject: [PATCH 01/23] refactor: enhance the flapping detect accuracy Count the `flapping` event as long as a client try to connect to server whatever it suceed or failed. It is more helpful to improve stablebility. --- src/emqx_channel.erl | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index b8f3c5b2c..38cc8f989 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -288,7 +288,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun enrich_client/2, fun set_log_meta/2, fun check_banned/2, - fun auth_connect/2 + fun auth_connect/2, + fun flapping_detect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> NChannel1 = NChannel#channel{ @@ -1022,11 +1023,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> shutdown(Reason, Channel); -handle_info({sock_closed, Reason}, Channel = - #channel{conn_state = connected, - clientinfo = ClientInfo = #{zone := Zone}}) -> - emqx_zone:enable_flapping_detect(Zone) - andalso emqx_flapping:detect(ClientInfo), +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) -> Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; @@ -1335,6 +1332,13 @@ auth_connect(#mqtt_packet_connect{password = Password}, {error, emqx_reason_codes:connack_error(Reason)} end. +%%-------------------------------------------------------------------- +%% Flapping + +flapping_detect(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> + _ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), + {ok, Channel}. + %%-------------------------------------------------------------------- %% Enhanced Authentication From 165842ded4822ef49cf7d6f6743816cc7539a816 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 26 Sep 2022 14:08:28 +0800 Subject: [PATCH 02/23] chore: update changes.md --- CHANGES-4.3.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 713f89f37..62df98f2d 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -32,6 +32,8 @@ File format: - Added a test to prevent a last will testament message to be published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) +- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045) + ### Bug fixes - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) From e2b0048e88850389f42e3b6c4abbae60f183dbf5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 26 Sep 2022 15:03:30 +0800 Subject: [PATCH 03/23] refactor(flapping): count flapping event if connecting failed --- src/emqx_channel.erl | 6 +++--- test/emqx_channel_SUITE.erl | 38 +++++++++++++++++++++++++++++------- test/emqx_flapping_SUITE.erl | 2 +- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 38cc8f989..1c0c6ee72 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -288,8 +288,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun enrich_client/2, fun set_log_meta/2, fun check_banned/2, - fun auth_connect/2, - fun flapping_detect/2 + fun count_flapping_event/2, + fun auth_connect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> NChannel1 = NChannel#channel{ @@ -1335,7 +1335,7 @@ auth_connect(#mqtt_packet_connect{password = Password}, %%-------------------------------------------------------------------- %% Flapping -flapping_detect(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> +count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> _ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), {ok, Channel}. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 00edde5b1..4f250bd4a 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -33,11 +33,6 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> %% CM Meck ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), - %% Access Control Meck - ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), - ok = meck:expect(emqx_access_control, authenticate, - fun(_) -> {ok, #{auth_result => success}} end), - ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end), %% Broker Meck ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), %% Hooks Meck @@ -53,8 +48,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - meck:unload([emqx_access_control, - emqx_metrics, + meck:unload([emqx_metrics, emqx_session, emqx_broker, emqx_hooks, @@ -63,10 +57,16 @@ end_per_suite(_Config) -> init_per_testcase(_TestCase, Config) -> meck:new(emqx_zone, [passthrough, no_history, no_link]), + %% Access Control Meck + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, authenticate, + fun(_) -> {ok, #{auth_result => success}} end), + ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end), Config. end_per_testcase(_TestCase, Config) -> meck:unload([emqx_zone]), + meck:unload([emqx_access_control]), Config. %%-------------------------------------------------------------------- @@ -853,6 +853,30 @@ t_ws_cookie_init(_) -> Channel = emqx_channel:init(ConnInfo, [{zone, zone}]), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). +%%-------------------------------------------------------------------- +%% Test cases for other mechnisms +%%-------------------------------------------------------------------- + +t_flapping_detect(_) -> + Parent = self(), + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end), + ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end), + ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end), + IdleChannel = channel(#{conn_state => idle}), + {shutdown, not_authorized, _ConnAck, _Channel} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel), + receive + flapping_detect -> ok + after 2000 -> + ?assert(false, "Flapping detect should be exected in connecting progress") + end, + meck:unload([emqx_flapping]). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index eadd89192..8074a8607 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -72,4 +72,4 @@ t_expired_detecting(_) -> (_) -> false end, ets:tab2list(emqx_flapping))), timer:sleep(200), ?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false; - (_) -> true end, ets:tab2list(emqx_flapping))). \ No newline at end of file + (_) -> true end, ets:tab2list(emqx_flapping))). From e5a673376feba0a4313cf616b001e717f92a1f58 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 9 Oct 2022 17:25:48 +0800 Subject: [PATCH 04/23] refactor: support the retry option --- .../emqx_auth_http/include/emqx_auth_http.hrl | 19 +++++++++++++++++++ apps/emqx_auth_http/src/emqx_acl_http.erl | 8 +++++--- apps/emqx_auth_http/src/emqx_auth_http.erl | 14 +++++++++----- .../emqx_auth_http/src/emqx_auth_http_cli.erl | 14 +++++++++----- 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/apps/emqx_auth_http/include/emqx_auth_http.hrl b/apps/emqx_auth_http/include/emqx_auth_http.hrl index 0eaa59daf..4e659293f 100644 --- a/apps/emqx_auth_http/include/emqx_auth_http.hrl +++ b/apps/emqx_auth_http/include/emqx_auth_http.hrl @@ -1 +1,20 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -define(APP, emqx_auth_http). + +%% equals to the default value of ehttpc +-define(DEFAULT_RETRY_TIMES, 2). diff --git a/apps/emqx_auth_http/src/emqx_acl_http.erl b/apps/emqx_auth_http/src/emqx_acl_http.erl index d4fd96a95..73cf6ce11 100644 --- a/apps/emqx_auth_http/src/emqx_acl_http.erl +++ b/apps/emqx_auth_http/src/emqx_acl_http.erl @@ -24,7 +24,7 @@ -logger_header("[ACL http]"). -import(emqx_auth_http_cli, - [ request/6 + [ request/7 , feedvar/2 ]). @@ -56,13 +56,15 @@ description() -> "ACL with HTTP API". %% Internal functions %%-------------------------------------------------------------------- -check_acl_request(#{pool_name := PoolName, +check_acl_request(Params = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). + Retry = maps:get(retry_times, Params, ?DEFAULT_RETRY_TIMES), + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). access(subscribe) -> 1; access(publish) -> 2. diff --git a/apps/emqx_auth_http/src/emqx_auth_http.erl b/apps/emqx_auth_http/src/emqx_auth_http.erl index 98a897a8c..0c8c46670 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http.erl @@ -25,7 +25,7 @@ -logger_header("[Auth http]"). -import(emqx_auth_http_cli, - [ request/6 + [ request/7 , feedvar/2 ]). @@ -63,24 +63,28 @@ description() -> "Authentication by HTTP API". %% Requests %%-------------------------------------------------------------------- -authenticate(#{pool_name := PoolName, +authenticate(Params = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). + Retry = maps:get(retry_times, Params, ?DEFAULT_RETRY_TIMES), + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). -spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()). is_superuser(undefined, _ClientInfo) -> false; -is_superuser(#{pool_name := PoolName, +is_superuser(Params = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of + Retry = maps:get(retry_times, Params, ?DEFAULT_RETRY_TIMES), + case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of {ok, 200, _Body} -> true; {ok, _Code, _Body} -> false; {error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), diff --git a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl index 3c7efd9c9..16c2c8574 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl @@ -19,6 +19,7 @@ -include("emqx_auth_http.hrl"). -export([ request/6 + , request/7 , feedvar/2 , feedvar/3 ]). @@ -27,18 +28,21 @@ %% HTTP Request %%-------------------------------------------------------------------- -request(PoolName, get, Path, Headers, Params, Timeout) -> - NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), - reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout)); +request(PoolName, Method, Path, Headers, Params, Timeout) -> + request(PoolName, Method, Path, Headers, Params, ?DEFAULT_RETRY_TIMES). -request(PoolName, post, Path, Headers, Params, Timeout) -> +request(PoolName, get, Path, Headers, Params, Timeout, Retry) -> + NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), + reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout, Retry)); + +request(PoolName, post, Path, Headers, Params, Timeout, Retry) -> Body = case proplists:get_value(<<"content-type">>, Headers) of "application/x-www-form-urlencoded" -> cow_qs:qs(bin_kw(Params)); "application/json" -> emqx_json:encode(bin_kw(Params)) end, - reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)). + reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout, Retry)). reply({ok, StatusCode, _Headers}) -> {ok, StatusCode, <<>>}; From 8e8ff08973c06951aea98a05c3523ff1db5d4792 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sun, 9 Oct 2022 11:31:54 +0200 Subject: [PATCH 05/23] fix(shared): check sticky if sticky pid is still a member Prior to this fix, in case of a subscriber unsubscribes without disconnect, the sticky dispatch strategy will continue to pick the old member. This commit fixes it by checking if the member is still in the group --- src/emqx_shared_sub.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index cc57e001f..2d85a834a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -301,7 +301,8 @@ fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), - case is_active_sub(Sub0, FailedSubs) of + All = subscribers(Group, Topic), + case is_active_sub(Sub0, FailedSubs, All) of true -> %% the old subscriber is still alive %% keep using it for sticky strategy @@ -471,8 +472,10 @@ update_stats(State) -> State. %% Return 'true' if the subscriber process is alive AND not in the failed list -is_active_sub(Pid, FailedSubs) -> - not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid). +is_active_sub(Pid, FailedSubs, All) -> + lists:member(Pid, All) andalso + (not maps:is_key(Pid, FailedSubs)) andalso + is_alive_sub(Pid). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> From 68dd29420ddc024b96852f5eb35c678c4a12c827 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 9 Oct 2022 17:43:30 +0800 Subject: [PATCH 06/23] chore: fix duplicated variable name --- apps/emqx_auth_http/src/emqx_acl_http.erl | 4 ++-- apps/emqx_auth_http/src/emqx_auth_http.app.src | 2 +- apps/emqx_auth_http/src/emqx_auth_http.erl | 8 ++++---- apps/emqx_auth_http/src/emqx_auth_http_cli.erl | 2 +- apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx_auth_http/src/emqx_acl_http.erl b/apps/emqx_auth_http/src/emqx_acl_http.erl index 73cf6ce11..51bf9c303 100644 --- a/apps/emqx_auth_http/src/emqx_acl_http.erl +++ b/apps/emqx_auth_http/src/emqx_acl_http.erl @@ -56,14 +56,14 @@ description() -> "ACL with HTTP API". %% Internal functions %%-------------------------------------------------------------------- -check_acl_request(Params = +check_acl_request(ACLParams = #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - Retry = maps:get(retry_times, Params, ?DEFAULT_RETRY_TIMES), + Retry = maps:get(retry_times, ACLParams, ?DEFAULT_RETRY_TIMES), request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). access(subscribe) -> 1; diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index e943317f8..87d087bae 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_http, [{description, "EMQ X Authentication/ACL with HTTP API"}, - {vsn, "4.3.7"}, % strict semver, bump manually! + {vsn, "4.3.8"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_http_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_auth_http/src/emqx_auth_http.erl b/apps/emqx_auth_http/src/emqx_auth_http.erl index 0c8c46670..620750bd0 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http.erl @@ -63,27 +63,27 @@ description() -> "Authentication by HTTP API". %% Requests %%-------------------------------------------------------------------- -authenticate(Params = +authenticate(AuthParams = #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - Retry = maps:get(retry_times, Params, ?DEFAULT_RETRY_TIMES), + Retry = maps:get(retry_times, AuthParams, ?DEFAULT_RETRY_TIMES), request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). -spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()). is_superuser(undefined, _ClientInfo) -> false; -is_superuser(Params = +is_superuser(SuperParams = #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - Retry = maps:get(retry_times, Params, ?DEFAULT_RETRY_TIMES), + Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES), case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of {ok, 200, _Body} -> true; {ok, _Code, _Body} -> false; diff --git a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl index 16c2c8574..c747b778a 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl @@ -29,7 +29,7 @@ %%-------------------------------------------------------------------- request(PoolName, Method, Path, Headers, Params, Timeout) -> - request(PoolName, Method, Path, Headers, Params, ?DEFAULT_RETRY_TIMES). + request(PoolName, Method, Path, Headers, Params, Timeout, ?DEFAULT_RETRY_TIMES). request(PoolName, get, Path, Headers, Params, Timeout, Retry) -> NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), diff --git a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl index f7071bc17..8529fb143 100644 --- a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl @@ -408,7 +408,7 @@ t_password_hash(_) -> ok = application:start(emqx_auth_mnesia). t_will_message_connection_denied(Config) when is_list(Config) -> - ClientId = Username = <<"subscriber">>, + ClientId = <<"subscriber">>, Password = <<"p">>, application:stop(emqx_auth_mnesia), ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]), From 761283f61639de11411d650e115b1d8a30383f54 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sun, 9 Oct 2022 11:49:11 +0200 Subject: [PATCH 07/23] docs: update change log --- CHANGES-4.3.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 26dd529a2..c15ac220b 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -58,6 +58,12 @@ File format: - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, but not the subscribing topic), caused messages to be lost when dispatching. +- Fix shared subscription group member unsubscribe issue when 'sticky' strategy is used. + Prior to this fix, if a previously picked member unsubscribes from the group (without reconnect) + the message is still dispatched to it. + This issue only occurs when unsubscribe with the session kept. + Fixed in [#9119](https://github.com/emqx/emqx/pull/9119) + ## v4.3.20 ### Bug fixes From bc68f60bb5f3372deaa6fcd5ff6b86dd61660b76 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 9 Oct 2022 17:52:59 +0800 Subject: [PATCH 08/23] chore: update appup.src --- .../src/emqx_auth_http.appup.src | 53 ++++++++++++------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/apps/emqx_auth_http/src/emqx_auth_http.appup.src b/apps/emqx_auth_http/src/emqx_auth_http.appup.src index f5c2bfe42..01d756b9e 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.appup.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.appup.src @@ -1,17 +1,27 @@ %% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.6", - [ %% There are only changes to the schema file, so we don't need any - %% commands here - ]}, + [{"4.3.7", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.6", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -20,21 +30,29 @@ {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, - [{restart_application,emqx_auth_http}]}, + {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<".*">>,[]}], - [{"4.3.6", - [ %% There are only changes to the schema file, so we don't need any - %% commands here - ]}, + [{"4.3.7", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.6", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -43,6 +61,5 @@ {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, - [{restart_application,emqx_auth_http}]}, + {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<".*">>,[]}]}. From 4f8a7349bfc7ae114e8fc9daa09d10fe95beb320 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sun, 9 Oct 2022 20:13:52 +0200 Subject: [PATCH 09/23] fix(shared): ensure sticky strategy sticks to the first pick Prior to this fix, the alive pids are never inserted due to a missing insert when handing remote pids from mnesia event. --- CHANGES-4.3.md | 7 +++++++ src/emqx_shared_sub.erl | 1 + 2 files changed, 8 insertions(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index c15ac220b..22a909780 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -64,6 +64,13 @@ File format: This issue only occurs when unsubscribe with the session kept. Fixed in [#9119](https://github.com/emqx/emqx/pull/9119) +- Fix shared subscription 'sticky' strategy when there is no local subscriptions at all. + Prior to this change, it may take a few rounds to randomly pick group members until a local subscriber + is hit (and then start sticking to it). + After this fix, it will start sticking to whichever randomly picked member even when it is a + subscriber from another node in the cluster. + Fixed in [#9122](https://github.com/emqx/emqx/pull/9122) + ## v4.3.20 ### Bug fixes diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 65645c86a..9c051c62a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -456,6 +456,7 @@ handle_cast(Msg, State) -> handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> #emqx_shared_subscription{subpid = SubPid} = NewRecord, + ok = maybe_insert_alive_tab(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until From 338b11ab95f879116a90dcd70415c06dcec00ed0 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 10 Oct 2022 17:11:57 +0800 Subject: [PATCH 10/23] fix: cannot reset metrics for fallback actions --- .../src/emqx_rule_metrics.erl | 13 ++- .../test/emqx_rule_engine_SUITE.erl | 79 +++++++++++++++---- 2 files changed, 73 insertions(+), 19 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index 624032056..2e30f40dc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -131,16 +131,15 @@ clear_metrics(Id) -> -spec(reset_metrics(rule_id()) -> ok). reset_metrics(Id) -> reset_speeds(Id), - reset_metrics(Id, rule_metrics()), + do_reset_metrics(Id, rule_metrics()), case emqx_rule_registry:get_rule(Id) of not_found -> ok; {ok, #rule{actions = Actions}} -> - [ reset_metrics(ActionId, action_metrics()) - || #action_instance{ id = ActionId} <- Actions], + reset_action_metrics(Actions), ok end. -reset_metrics(Id, Metrics) -> +do_reset_metrics(Id, Metrics) -> case couters_ref(Id) of not_found -> ok; Ref -> [counters:put(Ref, metrics_idx(Idx), 0) @@ -148,6 +147,12 @@ reset_metrics(Id, Metrics) -> ok end. +reset_action_metrics(Actions) -> + lists:foreach(fun(#action_instance{id = ActionId, fallbacks = FallbackActions}) -> + do_reset_metrics(ActionId, action_metrics()), + reset_action_metrics(FallbackActions) + end, Actions). + reset_speeds(Id) -> gen_server:call(?MODULE, {reset_speeds, Id}). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c8f66ea00..ec9717a75 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -50,6 +50,7 @@ groups() -> t_unregister_provider, t_create_rule, t_reset_metrics, + t_reset_metrics_fallbacks, t_create_resource ]}, {actions, [], @@ -379,18 +380,14 @@ t_inspect_action(_Config) -> t_reset_metrics(_Config) -> ok = emqx_rule_engine:load_providers(), - {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( - #{type => built_in, - config => #{}, - description => <<"debug resource">>}), - {ok, #rule{id = Id}} = emqx_rule_engine:create_rule( - #{rawsql => "select clientid as c, username as u " - "from \"t1\" ", - actions => [#{name => 'inspect', - args => #{'$resource' => ResId, a=>1, b=>2}}], - type => built_in, - description => <<"Inspect rule">> - }), + {ok, #rule{id = Id, actions = [#action_instance{id = ActId0}]}} = + emqx_rule_engine:create_rule( + #{rawsql => "select clientid as c, username as u " + "from \"t1\" ", + actions => [#{name => 'inspect', args => #{a=>1, b=>2}}], + type => built_in, + description => <<"Inspect rule">> + }), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), [ begin @@ -398,16 +395,68 @@ t_reset_metrics(_Config) -> timer:sleep(100) end || _ <- lists:seq(1,10)], + ?assertMatch(#{exception := 0, failed := 0, + matched := 10, no_result := 0, passed := 10}, + emqx_rule_metrics:get_rule_metrics(Id)), + ?assertMatch(#{failed := 0, success := 10, taken := 10}, + emqx_rule_metrics:get_action_metrics(ActId0)), emqx_rule_metrics:reset_metrics(Id), ?assertEqual(#{exception => 0,failed => 0, matched => 0,no_result => 0,passed => 0, speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, emqx_rule_metrics:get_rule_metrics(Id)), - ?assertEqual(#{failed => 0,success => 0,taken => 0}, - emqx_rule_metrics:get_action_metrics(ResId)), + ?assertEqual(#{failed => 0, success => 0, taken => 0}, + emqx_rule_metrics:get_action_metrics(ActId0)), emqtt:stop(Client), emqx_rule_registry:remove_rule(Id), - emqx_rule_registry:remove_resource(ResId), + ok. + +t_reset_metrics_fallbacks(_Config) -> + ok = emqx_rule_engine:load_providers(), + ok = emqx_rule_registry:add_action( + #action{name = 'crash_action', app = ?APP, + module = ?MODULE, on_create = crash_action, + types=[], params_spec = #{}, + title = #{en => <<"Crash Action">>}, + description = #{en => <<"This action will always fail!">>}}), + {ok, #rule{id = Id, actions = [#action_instance{id = ActId0, fallbacks = [ + #action_instance{id = ActId1}, + #action_instance{id = ActId2} + ]}]}} = + emqx_rule_engine:create_rule( + #{rawsql => "select clientid as c, username as u " + "from \"t1\" ", + actions => [#{name => 'crash_action', args => #{a=>1, b=>2}, fallbacks => [ + #{name => 'inspect', args => #{}, fallbacks => []}, + #{name => 'inspect', args => #{}, fallbacks => []} + ]}], + type => built_in, + description => <<"Inspect rule">> + }), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client), + [ begin + emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0), + timer:sleep(100) + end + || _ <- lists:seq(1,10)], + ?assertMatch(#{exception := 0, failed := 0, + matched := 10, no_result := 0, passed := 10}, + emqx_rule_metrics:get_rule_metrics(Id)), + [?assertMatch(#{failed := 10, success := 0, taken := 10}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0]], + [?assertMatch(#{failed := 0, success := 10, taken := 10}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ ActId1, ActId2]], + emqx_rule_metrics:reset_metrics(Id), + ?assertEqual(#{exception => 0,failed => 0, + matched => 0,no_result => 0,passed => 0, + speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, + emqx_rule_metrics:get_rule_metrics(Id)), + [?assertEqual(#{failed => 0, success => 0, taken => 0}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0, ActId1, ActId2]], + emqtt:stop(Client), + emqx_rule_registry:remove_rule(Id), + ok = emqx_rule_registry:remove_action('crash_action'), ok. t_republish_action(_Config) -> From 6d52f908d1689a2fe2a4636cec4e7e19a05ed5bc Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 10 Oct 2022 17:17:21 +0800 Subject: [PATCH 11/23] chore: update emqx_rule_engine.appup.src --- CHANGES-4.3.md | 2 ++ .../src/emqx_rule_engine.appup.src | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 22a909780..7762f5eb1 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -71,6 +71,8 @@ File format: subscriber from another node in the cluster. Fixed in [#9122](https://github.com/emqx/emqx/pull/9122) +- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125) + ## v4.3.20 ### Bug fixes diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 6aa35d2d6..130e5d4d4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -5,12 +5,14 @@ [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -19,6 +21,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -27,6 +30,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -36,6 +40,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -45,6 +50,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -56,6 +62,7 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, @@ -202,12 +209,14 @@ [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -216,6 +225,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -224,6 +234,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -233,6 +244,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -242,6 +254,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -254,6 +267,7 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", From edf69cee885b1d1976bc0183a750fd960c0819b5 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Oct 2022 16:01:36 +0200 Subject: [PATCH 12/23] feat: mute emqx shutdown log in rpc calls --- src/emqx.erl | 1 + src/emqx_misc.erl | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/src/emqx.erl b/src/emqx.erl index ae78e5795..4f5f1e3f2 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -227,6 +227,7 @@ shutdown() -> shutdown(normal). shutdown(Reason) -> + ok = emqx_misc:maybe_mute_rpc_log(), ?LOG(critical, "emqx shutdown for ~s", [Reason]), on_shutdown(Reason), _ = emqx_plugins:unload(), diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index d256569fb..021596957 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -23,6 +23,7 @@ -export([ merge_opts/2 , maybe_apply/2 + , maybe_mute_rpc_log/0 , compose/1 , compose/2 , run_fold/3 @@ -444,6 +445,27 @@ do_parallel_map(Fun, List) -> PidList ). +%% @doc Call this function to avoid logs printed to RPC caller node. +-spec maybe_mute_rpc_log() -> ok. +maybe_mute_rpc_log() -> + GlNode = node(group_leader()), + maybe_mute_rpc_log(GlNode). + +maybe_mute_rpc_log(Node) when Node =:= node() -> + %% do nothing, this is a local call + ok; +maybe_mute_rpc_log(Node) -> + case atom_to_list(Node) of + "remsh_" ++ _ -> + %% this is either an upgrade script or nodetool + %% do nothing, the log may go to the 'emqx' command line console + ok; + _ -> + %% otherwise set group leader to local node + _ = group_leader(whereis(init), self()), + ok + end. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). From e72e1567a1c8581f108da685587b1c6b145bcd68 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 10 Oct 2022 09:19:50 -0300 Subject: [PATCH 13/23] ci(test): stop containers at the beginning of the run An attempt to prevent updated container definitions clashing in CI GH runners between different branches. A self-hosted runner only runs a single job at a time. If a container is already running there, an updated docker compose file might fail to recreate that container, failing the run. --- .github/workflows/run_cts_tests.yaml | 21 +++++++++++++++++++++ .github/workflows/run_test_cases.yaml | 5 +++++ 2 files changed, 26 insertions(+) diff --git a/.github/workflows/run_cts_tests.yaml b/.github/workflows/run_cts_tests.yaml index 6b05a014e..269b8bc65 100644 --- a/.github/workflows/run_cts_tests.yaml +++ b/.github/workflows/run_cts_tests.yaml @@ -24,6 +24,11 @@ jobs: steps: - uses: actions/checkout@v1 + # to avoid dirty self-hosted runners + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker compose up env: LDAP_TAG: ${{ matrix.ldap_tag }} @@ -79,6 +84,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ @@ -150,6 +159,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up timeout-minutes: 5 run: | @@ -236,6 +249,10 @@ jobs: - tcp steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ @@ -317,6 +334,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index def12cb16..8b16e47db 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -42,6 +42,11 @@ jobs: use-self-hosted: false steps: - uses: actions/checkout@v2 + # to avoid dirty self-hosted runners + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker compose up if: endsWith(github.repository, 'emqx') env: From cfd1d7eea1f69f07c05d7d95ef3657432c707fec Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 11 Oct 2022 17:41:47 +0200 Subject: [PATCH 14/23] chore(bin/emqx): no need to disable SC2086 --- bin/emqx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/emqx b/bin/emqx index 82eb90e16..89fc564b1 100755 --- a/bin/emqx +++ b/bin/emqx @@ -483,7 +483,7 @@ esac if [ "$IS_BOOT_COMMAND" = 'no' ]; then # for non-boot commands, inspect vm.