diff --git a/apps/emqx/src/emqx_cm_registry_keeper.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl index e96fcdd7d..c78731dea 100644 --- a/apps/emqx/src/emqx_cm_registry_keeper.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -20,7 +20,8 @@ -export([ start_link/0, - count/1 + count/1, + purge/0 ]). %% gen_server callbacks @@ -48,7 +49,10 @@ start_link() -> init(_) -> case mria_config:whoami() =:= replicant of true -> - ignore; + %% Do not run delete loops on replicant nodes + %% because the core nodes will do it anyway + %% The process is started to serve the 'count' calls + {ok, #{no_deletes => true}}; false -> ok = send_delay_start(), {ok, #{next_clientid => undefined}} @@ -71,6 +75,19 @@ count(Since) -> gen_server:call(?MODULE, {count, Since}, infinity) end. +%% @doc Delete all retained history. Only for tests. +-spec purge() -> ok. +purge() -> + purge_loop(undefined). + +purge_loop(StartId) -> + case cleanup_one_chunk(StartId, _IsPurge = true) of + '$end_of_table' -> + ok; + NextId -> + purge_loop(NextId) + end. + handle_call({count, Since}, _From, State) -> {LastCountTime, LastCount} = case State of @@ -128,10 +145,13 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. cleanup_one_chunk(NextClientId) -> + cleanup_one_chunk(NextClientId, false). + +cleanup_one_chunk(NextClientId, IsPurge) -> Retain = retain_duration(), Now = now_ts(), IsExpired = fun(#channel{pid = Ts}) -> - is_integer(Ts) andalso (Ts < Now - Retain) + IsPurge orelse (is_integer(Ts) andalso (Ts < Now - Retain)) end, cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index cf80f7e73..44ede7fed 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -423,7 +423,10 @@ schema("/sessions_count") -> responses => #{ 200 => hoconsc:mk(binary(), #{ desc => <<"Number of sessions">> - }) + }), + 400 => emqx_dashboard_swagger:error_codes( + ['BAD_REQUEST'], <<"Node {name} cannot handle this request.">> + ) } } }. @@ -1498,6 +1501,12 @@ message_example() -> }. sessions_count(get, #{query_string := QString}) -> - Since = maps:get(<<"since">>, QString, 0), - Count = emqx_cm_registry_keeper:count(Since), - {200, integer_to_binary(Count)}. + try + Since = maps:get(<<"since">>, QString, 0), + Count = emqx_cm_registry_keeper:count(Since), + {200, integer_to_binary(Count)} + catch + exit:{noproc, _} -> + Msg = io_lib:format("Node (~s) cannot handle this request.", [node()]), + {400, 'BAD_REQUEST', iolist_to_binary(Msg)} + end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index a007de829..3527af464 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -798,6 +798,44 @@ t_client_id_not_found(_Config) -> %% Inflight messages ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))). +t_sessions_count(_Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/test_sessions_count">>, + Conf0 = emqx_config:get([broker]), + Conf1 = hocon_maps:deep_merge(Conf0, #{session_history_retain => 5}), + %% from 1 seconds ago, which is for sure less than histry retain duration + %% hence force a call to the gen_server emqx_cm_registry_keeper + Since = erlang:system_time(seconds) - 1, + ok = emqx_config:put(#{broker => Conf1}), + {ok, Client} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, true} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + Path = emqx_mgmt_api_test_util:api_path(["sessions_count"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ?assertMatch( + {ok, "1"}, + emqx_mgmt_api_test_util:request_api( + get, Path, "since=" ++ integer_to_list(Since), AuthHeader + ) + ), + ok = emqtt:disconnect(Client), + %% simulate the situation in which the process is not running + ok = supervisor:terminate_child(emqx_cm_sup, emqx_cm_registry_keeper), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "since=" ++ integer_to_list(Since), AuthHeader + ) + ), + %% restore default value + ok = emqx_config:put(#{broker => Conf0}), + ok = emqx_cm_registry_keeper:purge(), + ok. + t_mqueue_messages(Config) -> ClientId = atom_to_binary(?FUNCTION_NAME), Topic = <<"t/test_mqueue_msgs">>, diff --git a/changes/ce/feat-12326.en.md b/changes/ce/feat-12326.en.md index bfef51eb8..b2f94d709 100644 --- a/changes/ce/feat-12326.en.md +++ b/changes/ce/feat-12326.en.md @@ -11,4 +11,4 @@ A new gauge `cluster_sessions` is added to the metrics collection. Exposed to pr emqx_cluster_sessions_count 1234 ``` -The counter can only be used for an approximate estimation as the collection and calculations are async. +NOTE: The counter can only be used for an approximate estimation as the collection and calculations are async.