diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 7916a6b58..692c7a62e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -40,11 +40,14 @@ -export([ samplers/0, samplers/2, - current_rate/0, current_rate/1, granularity_adapter/1 ]). +-ifdef(TEST). +-export([current_rate_cluster/0]). +-endif. + %% for rpc -export([do_sample/2]). @@ -112,8 +115,33 @@ granularity_adapter(List) when length(List) > 1000 -> granularity_adapter(List) -> List. +current_rate(all) -> + current_rate_cluster(); +current_rate(Node) when Node == node() -> + try + {ok, Rate} = do_call(current_rate), + {ok, Rate} + catch + _E:R -> + ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}), + %% Rate map 0, ensure api will not crash. + %% When joining cluster, dashboard monitor restart. + Rate0 = [ + {Key, 0} + || Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP) + ], + {ok, maps:merge(maps:from_list(Rate0), non_rate_value())} + end; +current_rate(Node) -> + case emqx_dashboard_proto_v1:current_rate(Node) of + {badrpc, Reason} -> + {badrpc, {Node, Reason}}; + {ok, Rate} -> + {ok, Rate} + end. + %% Get the current rate. Not the current sampler data. -current_rate() -> +current_rate_cluster() -> Fun = fun (Node, Cluster) when is_map(Cluster) -> @@ -133,31 +161,6 @@ current_rate() -> {ok, Rate} end. -current_rate(all) -> - current_rate(); -current_rate(Node) when Node == node() -> - try - {ok, Rate} = do_call(current_rate), - {ok, Rate} - catch - _E:R -> - ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}), - %% Rate map 0, ensure api will not crash. - %% When joining cluster, dashboard monitor restart. - Rate0 = [ - {Key, 0} - || Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP) - ], - {ok, maps:from_list(Rate0)} - end; -current_rate(Node) -> - case emqx_dashboard_proto_v1:current_rate(Node) of - {badrpc, Reason} -> - {badrpc, {Node, Reason}}; - {ok, Rate} -> - {ok, Rate} - end. - %% ------------------------------------------------------------------------------------------------- %% gen_server functions @@ -258,8 +261,13 @@ merge_cluster_sampler_map(M1, M2) -> merge_cluster_rate(Node, Cluster) -> Fun = fun - (topics, Value, NCluster) -> - NCluster#{topics => Value}; + %% cluster-synced values + (topics, V, NCluster) -> + NCluster#{topics => V}; + (retained_msg_count, V, NCluster) -> + NCluster#{retained_msg_count => V}; + (license_quota, V, NCluster) -> + NCluster#{license_quota => V}; (Key, Value, NCluster) -> ClusterValue = maps:get(Key, NCluster, 0), NCluster#{Key => Value + ClusterValue} diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 309137362..fc4b171a4 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -151,7 +151,11 @@ monitor_current(get, #{bindings := Bindings}) -> RawNode = maps:get(node, Bindings, <<"all">>), emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1). +-spec current_rate(atom()) -> + {error, term()} + | {ok, Result :: map()}. current_rate(Node) -> + %% Node :: 'all' or `NodeName` case emqx_dashboard_monitor:current_rate(Node) of {badrpc, _} = BadRpc -> {error, BadRpc};