diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index cb9c3ad82..876fe66e0 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -15,7 +15,9 @@ {emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. +{emqx_delayed,2}. {emqx_eviction_agent,1}. +{emqx_eviction_agent,2}. {emqx_exhook,1}. {emqx_ft_storage_exporter_fs,1}. {emqx_ft_storage_fs,1}. diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl index 7376c8069..ab2b9e66a 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -18,6 +18,7 @@ disable/1, status/0, connection_count/0, + all_channels_count/0, session_count/0, session_count/1, evict_connections/1, @@ -27,6 +28,9 @@ evict_session_channel/3 ]). +%% RPC targets +-export([all_local_channels_count/0]). + -behaviour(gen_server). -export([ @@ -240,6 +244,33 @@ channel_with_session_table(RequiredConnState) -> RequiredConnState =:= ConnState ]). +-spec all_channels_count() -> non_neg_integer(). +all_channels_count() -> + Nodes = emqx:running_nodes(), + Timeout = 15_000, + Results = emqx_eviction_agent_proto_v2:all_channels_count(Nodes, Timeout), + NodeResults = lists:zip(Nodes, Results), + Errors = lists:filter( + fun + ({_Node, {ok, _}}) -> false; + ({_Node, _Err}) -> true + end, + NodeResults + ), + Errors =/= [] andalso + ?SLOG( + warning, + #{ + msg => "error_collecting_all_channels_count", + errors => maps:from_list(Errors) + } + ), + lists:sum([N || {ok, N} <- Results]). + +-spec all_local_channels_count() -> non_neg_integer(). +all_local_channels_count() -> + table_count(emqx_cm:all_channels_table(?CONN_MODULES)). + session_count() -> session_count(any). @@ -303,7 +334,7 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) -> client_info => ClientInfo } ), - case emqx_eviction_agent_proto_v1:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of + case emqx_eviction_agent_proto_v2:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of {badrpc, Reason} -> ?SLOG( error, diff --git a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl new file mode 100644 index 000000000..2d204079c --- /dev/null +++ b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_eviction_agent_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + evict_session_channel/4, + + %% Introduced in v2: + all_channels_count/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.2.1". + +-spec evict_session_channel( + node(), + emqx_types:clientid(), + emqx_types:conninfo(), + emqx_types:clientinfo() +) -> supervisor:startchild_err() | emqx_rpc:badrpc(). +evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) -> + rpc:call(Node, emqx_eviction_agent, evict_session_channel, [ClientId, ConnInfo, ClientInfo]). + +%% Introduced in v2: +-spec all_channels_count([node()], time:time()) -> emqx_rpc:erpc_multicall(non_neg_integer()). +all_channels_count(Nodes, Timeout) -> + erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout). diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 559648bdd..22d18c180 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -45,18 +45,22 @@ code_change/3 ]). -%% gen_server callbacks +%% API -export([ load/0, unload/0, load_or_unload/1, get_conf/1, update_config/1, + delayed_count/0, list/1, get_delayed_message/1, get_delayed_message/2, delete_delayed_message/1, delete_delayed_message/2, + clear_all/0, + %% rpc target + clear_all_local/0, cluster_list/1 ]). @@ -167,6 +171,9 @@ unload() -> load_or_unload(Bool) -> gen_server:call(?SERVER, {do_load_or_unload, Bool}). +-spec delayed_count() -> non_neg_integer(). +delayed_count() -> mnesia:table_info(?TAB, size). + list(Params) -> emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). @@ -243,7 +250,7 @@ get_delayed_message(Id) -> get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Id); get_delayed_message(Node, Id) -> - emqx_delayed_proto_v1:get_delayed_message(Node, Id). + emqx_delayed_proto_v2:get_delayed_message(Node, Id). -spec delete_delayed_message(binary()) -> with_id_return(). delete_delayed_message(Id) -> @@ -258,7 +265,19 @@ delete_delayed_message(Id) -> delete_delayed_message(Node, Id) when Node =:= node() -> delete_delayed_message(Id); delete_delayed_message(Node, Id) -> - emqx_delayed_proto_v1:delete_delayed_message(Node, Id). + emqx_delayed_proto_v2:delete_delayed_message(Node, Id). + +-spec clear_all() -> ok. +clear_all() -> + Nodes = emqx:running_nodes(), + _ = emqx_delayed_proto_v2:clear_all(Nodes), + ok. + +%% rpc target +-spec clear_all_local() -> ok. +clear_all_local() -> + _ = mria:clear_table(?TAB), + ok. update_config(Config) -> emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). @@ -408,9 +427,6 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> end, do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]). --spec delayed_count() -> non_neg_integer(). -delayed_count() -> mnesia:table_info(?TAB, size). - do_load_or_unload(true, State) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB), State; diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl new file mode 100644 index 000000000..6d96dcc66 --- /dev/null +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%%Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_delayed_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_delayed_message/2, + delete_delayed_message/2, + + %% Introduced in v2: + clear_all/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.2.1". + +-spec get_delayed_message(node(), binary()) -> + emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). +get_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, get_delayed_message, [Id]). + +-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). +delete_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). + +%% Introduced in v2: + +-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok). +clear_all(Nodes) -> + erpc:multicall(Nodes, emqx_delayed, clear_all_local, []). diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 56eff8bfc..5085aa2da 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -164,15 +164,15 @@ t_cluster(_) -> ?assertMatch( {ok, _}, - emqx_delayed_proto_v1:get_delayed_message(node(), Id) + emqx_delayed_proto_v2:get_delayed_message(node(), Id) ), %% The 'local' and the 'fake-remote' values should be the same, %% however there is a race condition, so we are just assert that they are both 'ok' tuples ?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)), - ?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)), + ?assertMatch({ok, _}, emqx_delayed_proto_v2:get_delayed_message(node(), Id)), - ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id), + ok = emqx_delayed_proto_v2:delete_delayed_message(node(), Id), ?assertMatch( {error, _}, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl index 71820266c..81f1bfe03 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl @@ -43,7 +43,7 @@ %% gen_statem states -define(disabled, disabled). -define(purging, purging). --define(cleaning_retained_messages, cleaning_retained_messages). +-define(cleaning_data, cleaning_data). -type start_opts() :: #{ purge_rate => pos_integer() @@ -80,7 +80,7 @@ start_link() -> callback_mode() -> handle_event_function. -%% states: disabled, purging, cleaning_retained_messages +%% states: disabled, purging, cleaning_data init([]) -> {ok, disabled, #{}}. @@ -130,35 +130,35 @@ handle_event( purge_rate := PurgeRate } = Data ) -> - case emqx_eviction_agent:status() of - {enabled, #{sessions := Sessions}} when Sessions > 0 -> + case emqx_eviction_agent:all_channels_count() of + Sessions when Sessions > 0 -> ok = purge_sessions(PurgeRate), - ?tp(debug, cluster_purge_evict_session, #{purge_rate => PurgeRate}), - ?SLOG( + ?tp( warning, + "cluster_purge_evict_sessions", #{ - msg => "cluster_purge_evict_sessions", count => Sessions, purge_rate => PurgeRate } ), NewData = Data#{current_sessions => Sessions}, {keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]}; - {enabled, #{sessions := 0}} -> + _Sessions = 0 -> NewData = Data#{current_conns => 0}, ?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}), - {next_state, ?cleaning_retained_messages, NewData, [ + {next_state, ?cleaning_data, NewData, [ {state_timeout, 0, clean_retained_messages} ]} end; handle_event( state_timeout, clean_retained_messages, - ?cleaning_retained_messages, + ?cleaning_data, Data ) -> - ?SLOG(warning, #{msg => "cluster_purge_cleaning_retained_messages"}), + ?SLOG(warning, #{msg => "cluster_purge_cleaning_data"}), ok = emqx_retainer:clean(), + ok = emqx_delayed:clear_all(), ?tp(warning, "cluster_purge_done", #{}), ok = disable_purge(), ?tp(warning, "cluster_purge_finished_successfully", #{}), diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl index 363254298..7d0cab0ce 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl @@ -368,14 +368,7 @@ emqx_node_rebalance_cli(Node, Args) -> with_some_sessions(Fn) -> emqx_common_test_helpers:with_mock( emqx_eviction_agent, - status, - fun() -> - case meck:passthrough([]) of - {enabled, Status = #{sessions := _}} -> - {enabled, Status#{sessions := 100}}; - Res -> - Res - end - end, + all_channels_count, + fun() -> 100 end, Fn ). diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl index 2dd3ee93e..7cdcc4d71 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl @@ -109,6 +109,10 @@ app_specs() -> #{enable => true} } }}, + {emqx_modules, #{ + config => + #{delayed => #{enable => true}} + }}, emqx_eviction_agent, emqx_node_rebalance ]. @@ -133,15 +137,8 @@ with_some_sessions(Node, Fn) -> erpc:call(Node, fun() -> emqx_common_test_helpers:with_mock( emqx_eviction_agent, - status, - fun() -> - case meck:passthrough([]) of - {enabled, Status = #{sessions := _}} -> - {enabled, Status#{sessions := 100}}; - Res -> - Res - end - end, + all_channels_count, + fun() -> 100 end, Fn ) end). @@ -317,8 +314,13 @@ t_session_purged(Config) -> Port1 = get_mqtt_port(Node1, tcp), Port2 = get_mqtt_port(Node2, tcp), - Node1Clients = emqtt_connect_many(Port1, 20, _StartN1 = 1), - Node2Clients = emqtt_connect_many(Port2, 20, _StartN2 = 21), + %% N.B.: it's important to have an asymmetric number of clients for this test, as + %% otherwise the scenario might happen to finish successfully due to the wrong + %% reasons! + NumClientsNode1 = 5, + NumClientsNode2 = 35, + Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1), + Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21), lists:foreach( fun(C) -> ClientId = proplists:get_value(clientid, emqtt:info(C)), @@ -327,6 +329,8 @@ t_session_purged(Config) -> Payload = ClientId, Opts = [{retain, true}], ok = emqtt:publish(C, Topic, Props, Payload, Opts), + DelayedTopic = emqx_topic:join([<<"$delayed/120">>, Topic]), + ok = emqtt:publish(C, DelayedTopic, Payload), {ok, _, [?RC_GRANTED_QOS_0]} = emqtt:subscribe(C, Topic), ok end, @@ -334,12 +338,13 @@ t_session_purged(Config) -> ), ?assertEqual(40, erpc:call(Node2, emqx_retainer, retained_count, [])), + ?assertEqual(NumClientsNode1, erpc:call(Node1, emqx_delayed, delayed_count, [])), + ?assertEqual(NumClientsNode2, erpc:call(Node2, emqx_delayed, delayed_count, [])), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := "cluster_purge_done"}), 15_000 ), - %% ok = rpc:call(Node1, emqx_node_rebalance_purge, start_global, [Nodes, opts(Config)]), ok = rpc:call(Node1, emqx_node_rebalance_purge, start, [opts(Config)]), {ok, _} = snabbkaffe:receive_events(SRef0), @@ -347,6 +352,8 @@ t_session_purged(Config) -> ?assertEqual([], erpc:call(Node2, emqx_cm, all_channels, [])), ?assertEqual(0, erpc:call(Node1, emqx_retainer, retained_count, [])), ?assertEqual(0, erpc:call(Node2, emqx_retainer, retained_count, [])), + ?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])), + ?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])), ok = drain_exits(Node1Clients ++ Node2Clients),