Merge pull request #12699 from zmstone/0313-fix-session-count-on-replicant-node
fix: do not crash on replicant node
This commit is contained in:
commit
cb77dea1e9
|
@ -20,7 +20,8 @@
|
|||
|
||||
-export([
|
||||
start_link/0,
|
||||
count/1
|
||||
count/1,
|
||||
purge/0
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -48,7 +49,10 @@ start_link() ->
|
|||
init(_) ->
|
||||
case mria_config:whoami() =:= replicant of
|
||||
true ->
|
||||
ignore;
|
||||
%% Do not run delete loops on replicant nodes
|
||||
%% because the core nodes will do it anyway
|
||||
%% The process is started to serve the 'count' calls
|
||||
{ok, #{no_deletes => true}};
|
||||
false ->
|
||||
ok = send_delay_start(),
|
||||
{ok, #{next_clientid => undefined}}
|
||||
|
@ -71,6 +75,19 @@ count(Since) ->
|
|||
gen_server:call(?MODULE, {count, Since}, infinity)
|
||||
end.
|
||||
|
||||
%% @doc Delete all retained history. Only for tests.
|
||||
-spec purge() -> ok.
|
||||
purge() ->
|
||||
purge_loop(undefined).
|
||||
|
||||
purge_loop(StartId) ->
|
||||
case cleanup_one_chunk(StartId, _IsPurge = true) of
|
||||
'$end_of_table' ->
|
||||
ok;
|
||||
NextId ->
|
||||
purge_loop(NextId)
|
||||
end.
|
||||
|
||||
handle_call({count, Since}, _From, State) ->
|
||||
{LastCountTime, LastCount} =
|
||||
case State of
|
||||
|
@ -128,10 +145,13 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
{ok, State}.
|
||||
|
||||
cleanup_one_chunk(NextClientId) ->
|
||||
cleanup_one_chunk(NextClientId, false).
|
||||
|
||||
cleanup_one_chunk(NextClientId, IsPurge) ->
|
||||
Retain = retain_duration(),
|
||||
Now = now_ts(),
|
||||
IsExpired = fun(#channel{pid = Ts}) ->
|
||||
is_integer(Ts) andalso (Ts < Now - Retain)
|
||||
IsPurge orelse (is_integer(Ts) andalso (Ts < Now - Retain))
|
||||
end,
|
||||
cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired).
|
||||
|
||||
|
|
|
@ -423,7 +423,10 @@ schema("/sessions_count") ->
|
|||
responses => #{
|
||||
200 => hoconsc:mk(binary(), #{
|
||||
desc => <<"Number of sessions">>
|
||||
})
|
||||
}),
|
||||
400 => emqx_dashboard_swagger:error_codes(
|
||||
['BAD_REQUEST'], <<"Node {name} cannot handle this request.">>
|
||||
)
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
@ -1504,6 +1507,12 @@ message_example() ->
|
|||
}.
|
||||
|
||||
sessions_count(get, #{query_string := QString}) ->
|
||||
try
|
||||
Since = maps:get(<<"since">>, QString, 0),
|
||||
Count = emqx_cm_registry_keeper:count(Since),
|
||||
{200, integer_to_binary(Count)}.
|
||||
{200, integer_to_binary(Count)}
|
||||
catch
|
||||
exit:{noproc, _} ->
|
||||
Msg = io_lib:format("Node (~s) cannot handle this request.", [node()]),
|
||||
{400, 'BAD_REQUEST', iolist_to_binary(Msg)}
|
||||
end.
|
||||
|
|
|
@ -798,6 +798,44 @@ t_client_id_not_found(_Config) ->
|
|||
%% Inflight messages
|
||||
?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))).
|
||||
|
||||
t_sessions_count(_Config) ->
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
Topic = <<"t/test_sessions_count">>,
|
||||
Conf0 = emqx_config:get([broker]),
|
||||
Conf1 = hocon_maps:deep_merge(Conf0, #{session_history_retain => 5}),
|
||||
%% from 1 seconds ago, which is for sure less than histry retain duration
|
||||
%% hence force a call to the gen_server emqx_cm_registry_keeper
|
||||
Since = erlang:system_time(seconds) - 1,
|
||||
ok = emqx_config:put(#{broker => Conf1}),
|
||||
{ok, Client} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
{clientid, ClientId},
|
||||
{clean_start, true}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
{ok, _, _} = emqtt:subscribe(Client, Topic, 1),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["sessions_count"]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
?assertMatch(
|
||||
{ok, "1"},
|
||||
emqx_mgmt_api_test_util:request_api(
|
||||
get, Path, "since=" ++ integer_to_list(Since), AuthHeader
|
||||
)
|
||||
),
|
||||
ok = emqtt:disconnect(Client),
|
||||
%% simulate the situation in which the process is not running
|
||||
ok = supervisor:terminate_child(emqx_cm_sup, emqx_cm_registry_keeper),
|
||||
?assertMatch(
|
||||
{error, {_, 400, _}},
|
||||
emqx_mgmt_api_test_util:request_api(
|
||||
get, Path, "since=" ++ integer_to_list(Since), AuthHeader
|
||||
)
|
||||
),
|
||||
%% restore default value
|
||||
ok = emqx_config:put(#{broker => Conf0}),
|
||||
ok = emqx_cm_registry_keeper:purge(),
|
||||
ok.
|
||||
|
||||
t_mqueue_messages(Config) ->
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
Topic = <<"t/test_mqueue_msgs">>,
|
||||
|
|
|
@ -11,4 +11,4 @@ A new gauge `cluster_sessions` is added to the metrics collection. Exposed to pr
|
|||
emqx_cluster_sessions_count 1234
|
||||
```
|
||||
|
||||
The counter can only be used for an approximate estimation as the collection and calculations are async.
|
||||
NOTE: The counter can only be used for an approximate estimation as the collection and calculations are async.
|
||||
|
|
Loading…
Reference in New Issue