refactor(topic-metrics): cluster supported

This commit is contained in:
JianBo He 2021-12-29 10:10:48 +08:00
parent 888e7fd727
commit ebbb473d7a
5 changed files with 324 additions and 81 deletions

View File

@ -38,7 +38,8 @@ maybe_enable_modules() ->
emqx_event_message:enable(),
emqx_conf_cli:load(),
ok = emqx_rewrite:enable(),
emqx_topic_metrics:enable().
emqx_topic_metrics:enable(),
emqx_modules_conf:load().
maybe_disable_modules() ->
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
@ -47,4 +48,5 @@ maybe_disable_modules() ->
emqx_event_message:disable(),
emqx_rewrite:disable(),
emqx_conf_cli:unload(),
emqx_topic_metrics:disable().
emqx_topic_metrics:disable(),
emqx_modules_conf:unload().

View File

@ -0,0 +1,131 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The emqx-modules configration interoperable interfaces
-module(emqx_modules_conf).
-behaviour(emqx_config_handler).
%% Load/Unload
-export([ load/0
, unload/0
]).
%% topci-metrics
-export([ topic_metrics/0
, add_topic_metrics/1
, remove_topic_metrics/1
]).
%% config handlers
-export([ pre_config_update/3
, post_config_update/5
]).
%%--------------------------------------------------------------------
%% Load/Unload
-spec load() -> ok.
load() ->
emqx_conf:add_handler([topic_metrics], ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_conf:remove_handler([topic_metrics]).
%%--------------------------------------------------------------------
%% Topic-Metrics
-spec topic_metrics() -> [emqx_types:topic()].
topic_metrics() ->
lists:map(
fun(#{topic := Topic}) -> Topic end,
emqx:get_config([topic_metrics])
).
-spec add_topic_metrics(emqx_types:topic())
-> {ok, emqx_types:topic()}
| {error, term()}.
add_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
{ok, _} -> {ok, Topic};
{error, Reason} -> {error, Reason}
end.
-spec remove_topic_metrics(emqx_types:topic())
-> ok
| {error, term()}.
remove_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
{ok, _} -> ok;
{error, Reason} -> {error, Reason}
end.
cfg_update(topic_metrics, Action, Params) ->
res(emqx_conf:update(
[topic_metrics],
{Action, Params},
#{override_to => cluster})).
res({ok, Result}) -> {ok, Result};
res({error, {pre_config_update,?MODULE,Reason}}) -> {error, Reason};
res({error, {post_config_update,?MODULE,Reason}}) -> {error, Reason};
res({error, Reason}) -> {error, Reason}.
%%--------------------------------------------------------------------
%% Config Handler
%%--------------------------------------------------------------------
-spec pre_config_update(list(atom()),
emqx_config:update_request(),
emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update(_, {add_topic_metrics, Topic0}, RawConf) ->
Topic = #{<<"topic">> => Topic0},
case lists:member(Topic, RawConf) of
true ->
{error, already_exist};
_ ->
{ok, RawConf ++ [Topic]}
end;
pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
Topic = #{<<"topic">> => Topic0},
case lists:member(Topic, RawConf) of
true ->
{ok, RawConf -- [Topic]};
_ ->
{error, not_found}
end.
-spec post_config_update(list(atom()),
emqx_config:update_request(),
emqx_config:config(),
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update(_, {add_topic_metrics, Topic},
_NewConfig, _OldConfig, _AppEnvs) ->
case emqx_topic_metrics:register(Topic) of
ok -> ok;
{error, Reason} -> {error, Reason}
end;
post_config_update(_, {remove_topic_metrics, Topic},
_NewConfig, _OldConfig, _AppEnvs) ->
case emqx_topic_metrics:deregister(Topic) of
ok -> ok;
{error, Reason} -> {error, Reason}
end.

View File

@ -220,7 +220,6 @@ handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
handle_call({deregister, all}, _From, State) ->
true = ets:delete_all_objects(?TAB),
update_config([]),
{reply, ok, State#state{speeds = #{}}};
handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) ->
@ -232,7 +231,6 @@ handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) ->
NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:remove({Topic, Metric}, Acc)
end, Speeds, ?TOPIC_METRICS),
remove_topic_config(Topic),
{reply, ok, State#state{speeds = NSpeeds}}
end;
@ -316,7 +314,6 @@ do_register(Topic, Speeds) ->
NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:put({Topic, Metric}, #speed{}, Acc)
end, Speeds, ?TOPIC_METRICS),
add_topic_config(Topic),
{ok, NSpeeds};
{true, true} ->
{error, bad_topic};
@ -351,18 +348,6 @@ format({Topic, Data}) ->
TopicMetrics#{reset_time => ResetTime}
end.
remove_topic_config(Topic) when is_binary(Topic) ->
Topics = emqx_config:get_raw([<<"topic_metrics">>], []) -- [#{<<"topic">> => Topic}],
update_config(Topics).
add_topic_config(Topic) when is_binary(Topic) ->
Topics = emqx_config:get_raw([<<"topic_metrics">>], []) ++ [#{<<"topic">> => Topic}],
update_config(lists:usort(Topics)).
update_config(Topics) when is_list(Topics) ->
{ok, _} = emqx:update_config([topic_metrics], Topics),
ok.
try_inc(Topic, Metric) ->
_ = inc(Topic, Metric),
ok.

View File

@ -73,6 +73,7 @@ properties() ->
topic_metrics_api() ->
MetaData = #{
%% Get all nodes metrics and accumulate all of these
get => #{
description => <<"List topic metrics">>,
responses => #{
@ -133,87 +134,160 @@ topic_param() ->
}.
%%--------------------------------------------------------------------
%% api callback
%% HTTP Callbacks
%%--------------------------------------------------------------------
topic_metrics(get, _) ->
list_metrics();
case cluster_accumulation_metrics() of
{error, Reason} ->
{500, Reason};
{ok, Metrics} ->
{200, Metrics}
end;
topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) ->
reset(Topic);
case reset(Topic) of
ok -> {200};
{error, Reason} -> reason2httpresp(Reason)
end;
topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) ->
reset();
case reset() of
ok -> {200};
{error, Reason} -> reason2httpresp(Reason)
end;
topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) ->
{400, 'BAD_REQUEST', <<"Topic can not be empty">>};
topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
register(Topic).
operate_topic_metrics(Method, #{bindings := #{topic := Topic0}}) ->
Topic = decode_topic(Topic0),
case Method of
get ->
get_metrics(Topic);
put ->
register(Topic);
delete ->
deregister(Topic)
case emqx_modules_conf:add_topic_metrics(Topic) of
{ok, Topic} ->
{200};
{error, Reason} ->
reason2httpresp(Reason)
end.
decode_topic(Topic) ->
uri_string:percent_decode(Topic).
operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) ->
case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of
{ok, Metrics} ->
{200, Metrics};
{error, Reason} ->
reason2httpresp(Reason)
end;
operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) ->
case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of
ok -> {200};
{error, Reason} ->
reason2httpresp(Reason)
end.
%%--------------------------------------------------------------------
%% api apply
list_metrics() ->
{200, emqx_topic_metrics:metrics()}.
%% Internal funcs
%%--------------------------------------------------------------------
register(Topic) ->
case emqx_topic_metrics:register(Topic) of
{error, quota_exceeded} ->
Message = list_to_binary(io_lib:format("Max topic metrics count is ~p",
[emqx_topic_metrics:max_limit()])),
{409, #{code => ?EXCEED_LIMIT, message => Message}};
{error, bad_topic} ->
Message = list_to_binary(io_lib:format("Bad Topic, topic cannot have wildcard ~p",
[Topic])),
{400, #{code => ?BAD_TOPIC, message => Message}};
{error, {quota_exceeded, bad_topic}} ->
Message = list_to_binary(
io_lib:format(
"Max topic metrics count is ~p, and topic cannot have wildcard ~p",
[emqx_topic_metrics:max_limit(), Topic])),
{400, #{code => ?BAD_REQUEST, message => Message}};
{error, already_existed} ->
Message = list_to_binary(io_lib:format("Topic ~p already registered", [Topic])),
{400, #{code => ?BAD_TOPIC, message => Message}};
ok ->
{200}
cluster_accumulation_metrics() ->
case multicall(emqx_topic_metrics, metrics, []) of
{SuccResList, []} ->
{ok, accumulate_nodes_metrics(SuccResList)};
{_, FailedNodes} ->
{error, {badrpc, FailedNodes}}
end.
deregister(Topic) ->
case emqx_topic_metrics:deregister(Topic) of
{error, topic_not_found} ->
Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])),
{404, #{code => ?ERROR_TOPIC, message => Message}};
ok ->
{200}
cluster_accumulation_metrics(Topic) ->
case multicall(emqx_topic_metrics, metrics, [Topic]) of
{SuccResList, []} ->
case lists:filter(fun({error, _}) -> false; (_) -> true
end, SuccResList) of
[] -> {error, topic_not_found};
TopicMetrics ->
NTopicMetrics = [ [T] || T <- TopicMetrics],
[AccMetrics] = accumulate_nodes_metrics(NTopicMetrics),
{ok, AccMetrics}
end;
{_, FailedNodes} ->
{error, {badrpc, FailedNodes}}
end.
get_metrics(Topic) ->
case emqx_topic_metrics:metrics(Topic) of
{error, topic_not_found} ->
Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])),
{404, #{code => ?ERROR_TOPIC, message => Message}};
Metrics ->
{200, Metrics}
end.
accumulate_nodes_metrics(NodesTopicMetrics) ->
AccMap = lists:foldl(fun(TopicMetrics, ExAcc) ->
MetricsMap = lists:foldl(
fun(#{topic := Topic,
metrics := Metrics,
create_time := CreateTime}, Acc) ->
Acc#{Topic => {Metrics, CreateTime}}
end, #{}, TopicMetrics),
accumulate_metrics(MetricsMap, ExAcc)
end, #{}, NodesTopicMetrics),
maps:fold(fun(Topic, {Metrics, CreateTime1}, Acc1) ->
[#{topic => Topic,
metrics => Metrics,
create_time => CreateTime1} | Acc1]
end, [], AccMap).
%% @doc TopicMetricsIn :: #{<<"topic">> := {Metrics, CreateTime}}
accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) ->
Topics = maps:keys(TopicMetricsIn),
lists:foldl(fun(Topic, Acc) ->
{Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn),
NMetrics = do_accumulation_metrics(
Metrics,
maps:get(Topic, TopicMetricsAcc, undefined)
),
maps:put(Topic, {NMetrics, CreateTime}, Acc)
end, #{}, Topics).
%% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...}
do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn;
do_accumulation_metrics(MetricsIn, MetricsAcc) ->
Keys = maps:keys(MetricsIn),
lists:foldl(fun(Key, Acc) ->
InVal = maps:get(Key, MetricsIn),
NVal = InVal + maps:get(Key, MetricsAcc, 0),
maps:put(Key, NVal, Acc)
end, #{}, Keys).
reset() ->
ok = emqx_topic_metrics:reset(),
{200}.
_ = multicall(emqx_topic_metrics, reset, []),
ok.
reset(Topic) ->
case emqx_topic_metrics:reset(Topic) of
{error, topic_not_found} ->
Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])),
{404, #{code => ?ERROR_TOPIC, message => Message}};
ok ->
{200}
case multicall(emqx_topic_metrics, reset, [Topic]) of
{SuccResList, []} ->
case lists:filter(fun({error, _}) -> true; (_) -> false
end, SuccResList) of
[{error, Reason} | _] ->
{error, Reason};
[] ->
ok
end
end.
%%--------------------------------------------------------------------
%% utils
multicall(M, F, A) ->
emqx_rpc:multicall(mria_mnesia:running_nodes(), M, F, A).
reason2httpresp(quota_exceeded) ->
Msg = list_to_binary(
io_lib:format("Max topic metrics count is ~p",
[emqx_topic_metrics:max_limit()])),
{409, #{code => ?EXCEED_LIMIT, message => Msg}};
reason2httpresp(bad_topic) ->
Msg = <<"Bad Topic, topic cannot have wildcard">>,
{400, #{code => ?BAD_TOPIC, message => Msg}};
reason2httpresp({quota_exceeded, bad_topic}) ->
Msg = list_to_binary(
io_lib:format(
"Max topic metrics count is ~p, and topic cannot have wildcard",
[emqx_topic_metrics:max_limit()])),
{400, #{code => ?BAD_REQUEST, message => Msg}};
reason2httpresp(already_existed) ->
Msg = <<"Topic already registered">>,
{400, #{code => ?BAD_TOPIC, message => Msg}};
reason2httpresp(not_found) ->
Msg = <<"Topic not found">>,
{404, #{code => ?ERROR_TOPIC, message => Msg}};
reason2httpresp(topic_not_found) ->
Msg = <<"Topic not found">>,
{404, #{code => ?ERROR_TOPIC, message => Msg}}.

View File

@ -0,0 +1,51 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_conf_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:init_load(emqx_modules_schema, <<"gateway {}">>),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
Conf.
end_per_suite(_Conf) ->
emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
init_per_testcase(_CaseName, Conf) ->
Conf.
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_topic_metrics_list(_) ->
ok.
t_topic_metrics_add_remove(_) ->
ok.