diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 55c882f94..4d49f22c8 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -38,7 +38,8 @@ maybe_enable_modules() -> emqx_event_message:enable(), emqx_conf_cli:load(), ok = emqx_rewrite:enable(), - emqx_topic_metrics:enable(). + emqx_topic_metrics:enable(), + emqx_modules_conf:load(). maybe_disable_modules() -> emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(), @@ -47,4 +48,5 @@ maybe_disable_modules() -> emqx_event_message:disable(), emqx_rewrite:disable(), emqx_conf_cli:unload(), - emqx_topic_metrics:disable(). + emqx_topic_metrics:disable(), + emqx_modules_conf:unload(). diff --git a/apps/emqx_modules/src/emqx_modules_conf.erl b/apps/emqx_modules/src/emqx_modules_conf.erl new file mode 100644 index 000000000..a757e0878 --- /dev/null +++ b/apps/emqx_modules/src/emqx_modules_conf.erl @@ -0,0 +1,131 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The emqx-modules configration interoperable interfaces +-module(emqx_modules_conf). + +-behaviour(emqx_config_handler). + +%% Load/Unload +-export([ load/0 + , unload/0 + ]). + +%% topci-metrics +-export([ topic_metrics/0 + , add_topic_metrics/1 + , remove_topic_metrics/1 + ]). + +%% config handlers +-export([ pre_config_update/3 + , post_config_update/5 + ]). + +%%-------------------------------------------------------------------- +%% Load/Unload + +-spec load() -> ok. +load() -> + emqx_conf:add_handler([topic_metrics], ?MODULE). + +-spec unload() -> ok. +unload() -> + emqx_conf:remove_handler([topic_metrics]). + +%%-------------------------------------------------------------------- +%% Topic-Metrics + +-spec topic_metrics() -> [emqx_types:topic()]. +topic_metrics() -> + lists:map( + fun(#{topic := Topic}) -> Topic end, + emqx:get_config([topic_metrics]) + ). + +-spec add_topic_metrics(emqx_types:topic()) + -> {ok, emqx_types:topic()} + | {error, term()}. +add_topic_metrics(Topic) -> + case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of + {ok, _} -> {ok, Topic}; + {error, Reason} -> {error, Reason} + end. + +-spec remove_topic_metrics(emqx_types:topic()) + -> ok + | {error, term()}. +remove_topic_metrics(Topic) -> + case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of + {ok, _} -> ok; + {error, Reason} -> {error, Reason} + end. + +cfg_update(topic_metrics, Action, Params) -> + res(emqx_conf:update( + [topic_metrics], + {Action, Params}, + #{override_to => cluster})). + +res({ok, Result}) -> {ok, Result}; +res({error, {pre_config_update,?MODULE,Reason}}) -> {error, Reason}; +res({error, {post_config_update,?MODULE,Reason}}) -> {error, Reason}; +res({error, Reason}) -> {error, Reason}. + +%%-------------------------------------------------------------------- +%% Config Handler +%%-------------------------------------------------------------------- + +-spec pre_config_update(list(atom()), + emqx_config:update_request(), + emqx_config:raw_config()) -> + {ok, emqx_config:update_request()} | {error, term()}. +pre_config_update(_, {add_topic_metrics, Topic0}, RawConf) -> + Topic = #{<<"topic">> => Topic0}, + case lists:member(Topic, RawConf) of + true -> + {error, already_exist}; + _ -> + {ok, RawConf ++ [Topic]} + end; +pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) -> + Topic = #{<<"topic">> => Topic0}, + case lists:member(Topic, RawConf) of + true -> + {ok, RawConf -- [Topic]}; + _ -> + {error, not_found} + end. + +-spec post_config_update(list(atom()), + emqx_config:update_request(), + emqx_config:config(), + emqx_config:config(), emqx_config:app_envs()) + -> ok | {ok, Result::any()} | {error, Reason::term()}. + +post_config_update(_, {add_topic_metrics, Topic}, + _NewConfig, _OldConfig, _AppEnvs) -> + case emqx_topic_metrics:register(Topic) of + ok -> ok; + {error, Reason} -> {error, Reason} + end; + +post_config_update(_, {remove_topic_metrics, Topic}, + _NewConfig, _OldConfig, _AppEnvs) -> + case emqx_topic_metrics:deregister(Topic) of + ok -> ok; + {error, Reason} -> {error, Reason} + end. diff --git a/apps/emqx_modules/src/emqx_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl index 7ca14b921..e4a746139 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -220,7 +220,6 @@ handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) -> handle_call({deregister, all}, _From, State) -> true = ets:delete_all_objects(?TAB), - update_config([]), {reply, ok, State#state{speeds = #{}}}; handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) -> @@ -232,7 +231,6 @@ handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) -> NSpeeds = lists:foldl(fun(Metric, Acc) -> maps:remove({Topic, Metric}, Acc) end, Speeds, ?TOPIC_METRICS), - remove_topic_config(Topic), {reply, ok, State#state{speeds = NSpeeds}} end; @@ -316,7 +314,6 @@ do_register(Topic, Speeds) -> NSpeeds = lists:foldl(fun(Metric, Acc) -> maps:put({Topic, Metric}, #speed{}, Acc) end, Speeds, ?TOPIC_METRICS), - add_topic_config(Topic), {ok, NSpeeds}; {true, true} -> {error, bad_topic}; @@ -351,18 +348,6 @@ format({Topic, Data}) -> TopicMetrics#{reset_time => ResetTime} end. -remove_topic_config(Topic) when is_binary(Topic) -> - Topics = emqx_config:get_raw([<<"topic_metrics">>], []) -- [#{<<"topic">> => Topic}], - update_config(Topics). - -add_topic_config(Topic) when is_binary(Topic) -> - Topics = emqx_config:get_raw([<<"topic_metrics">>], []) ++ [#{<<"topic">> => Topic}], - update_config(lists:usort(Topics)). - -update_config(Topics) when is_list(Topics) -> - {ok, _} = emqx:update_config([topic_metrics], Topics), - ok. - try_inc(Topic, Metric) -> _ = inc(Topic, Metric), ok. diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index e8be39c47..a45a8fd33 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -73,6 +73,7 @@ properties() -> topic_metrics_api() -> MetaData = #{ + %% Get all nodes metrics and accumulate all of these get => #{ description => <<"List topic metrics">>, responses => #{ @@ -133,87 +134,160 @@ topic_param() -> }. %%-------------------------------------------------------------------- -%% api callback +%% HTTP Callbacks +%%-------------------------------------------------------------------- + topic_metrics(get, _) -> - list_metrics(); + case cluster_accumulation_metrics() of + {error, Reason} -> + {500, Reason}; + {ok, Metrics} -> + {200, Metrics} + end; + topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> - reset(Topic); + case reset(Topic) of + ok -> {200}; + {error, Reason} -> reason2httpresp(Reason) + end; topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> - reset(); + case reset() of + ok -> {200}; + {error, Reason} -> reason2httpresp(Reason) + end; + topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> {400, 'BAD_REQUEST', <<"Topic can not be empty">>}; topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> - register(Topic). - -operate_topic_metrics(Method, #{bindings := #{topic := Topic0}}) -> - Topic = decode_topic(Topic0), - case Method of - get -> - get_metrics(Topic); - put -> - register(Topic); - delete -> - deregister(Topic) + case emqx_modules_conf:add_topic_metrics(Topic) of + {ok, Topic} -> + {200}; + {error, Reason} -> + reason2httpresp(Reason) end. -decode_topic(Topic) -> - uri_string:percent_decode(Topic). +operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) -> + case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of + {ok, Metrics} -> + {200, Metrics}; + {error, Reason} -> + reason2httpresp(Reason) + end; + +operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) -> + case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of + ok -> {200}; + {error, Reason} -> + reason2httpresp(Reason) + end. %%-------------------------------------------------------------------- -%% api apply -list_metrics() -> - {200, emqx_topic_metrics:metrics()}. +%% Internal funcs +%%-------------------------------------------------------------------- -register(Topic) -> - case emqx_topic_metrics:register(Topic) of - {error, quota_exceeded} -> - Message = list_to_binary(io_lib:format("Max topic metrics count is ~p", - [emqx_topic_metrics:max_limit()])), - {409, #{code => ?EXCEED_LIMIT, message => Message}}; - {error, bad_topic} -> - Message = list_to_binary(io_lib:format("Bad Topic, topic cannot have wildcard ~p", - [Topic])), - {400, #{code => ?BAD_TOPIC, message => Message}}; - {error, {quota_exceeded, bad_topic}} -> - Message = list_to_binary( - io_lib:format( - "Max topic metrics count is ~p, and topic cannot have wildcard ~p", - [emqx_topic_metrics:max_limit(), Topic])), - {400, #{code => ?BAD_REQUEST, message => Message}}; - {error, already_existed} -> - Message = list_to_binary(io_lib:format("Topic ~p already registered", [Topic])), - {400, #{code => ?BAD_TOPIC, message => Message}}; - ok -> - {200} +cluster_accumulation_metrics() -> + case multicall(emqx_topic_metrics, metrics, []) of + {SuccResList, []} -> + {ok, accumulate_nodes_metrics(SuccResList)}; + {_, FailedNodes} -> + {error, {badrpc, FailedNodes}} end. -deregister(Topic) -> - case emqx_topic_metrics:deregister(Topic) of - {error, topic_not_found} -> - Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), - {404, #{code => ?ERROR_TOPIC, message => Message}}; - ok -> - {200} +cluster_accumulation_metrics(Topic) -> + case multicall(emqx_topic_metrics, metrics, [Topic]) of + {SuccResList, []} -> + case lists:filter(fun({error, _}) -> false; (_) -> true + end, SuccResList) of + [] -> {error, topic_not_found}; + TopicMetrics -> + NTopicMetrics = [ [T] || T <- TopicMetrics], + [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics), + {ok, AccMetrics} + end; + {_, FailedNodes} -> + {error, {badrpc, FailedNodes}} end. -get_metrics(Topic) -> - case emqx_topic_metrics:metrics(Topic) of - {error, topic_not_found} -> - Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), - {404, #{code => ?ERROR_TOPIC, message => Message}}; - Metrics -> - {200, Metrics} - end. +accumulate_nodes_metrics(NodesTopicMetrics) -> + AccMap = lists:foldl(fun(TopicMetrics, ExAcc) -> + MetricsMap = lists:foldl( + fun(#{topic := Topic, + metrics := Metrics, + create_time := CreateTime}, Acc) -> + Acc#{Topic => {Metrics, CreateTime}} + end, #{}, TopicMetrics), + accumulate_metrics(MetricsMap, ExAcc) + end, #{}, NodesTopicMetrics), + maps:fold(fun(Topic, {Metrics, CreateTime1}, Acc1) -> + [#{topic => Topic, + metrics => Metrics, + create_time => CreateTime1} | Acc1] + end, [], AccMap). + +%% @doc TopicMetricsIn :: #{<<"topic">> := {Metrics, CreateTime}} +accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) -> + Topics = maps:keys(TopicMetricsIn), + lists:foldl(fun(Topic, Acc) -> + {Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn), + NMetrics = do_accumulation_metrics( + Metrics, + maps:get(Topic, TopicMetricsAcc, undefined) + ), + maps:put(Topic, {NMetrics, CreateTime}, Acc) + end, #{}, Topics). + +%% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...} +do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn; +do_accumulation_metrics(MetricsIn, MetricsAcc) -> + Keys = maps:keys(MetricsIn), + lists:foldl(fun(Key, Acc) -> + InVal = maps:get(Key, MetricsIn), + NVal = InVal + maps:get(Key, MetricsAcc, 0), + maps:put(Key, NVal, Acc) + end, #{}, Keys). reset() -> - ok = emqx_topic_metrics:reset(), - {200}. + _ = multicall(emqx_topic_metrics, reset, []), + ok. reset(Topic) -> - case emqx_topic_metrics:reset(Topic) of - {error, topic_not_found} -> - Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), - {404, #{code => ?ERROR_TOPIC, message => Message}}; - ok -> - {200} + case multicall(emqx_topic_metrics, reset, [Topic]) of + {SuccResList, []} -> + case lists:filter(fun({error, _}) -> true; (_) -> false + end, SuccResList) of + [{error, Reason} | _] -> + {error, Reason}; + [] -> + ok + end end. + +%%-------------------------------------------------------------------- +%% utils + +multicall(M, F, A) -> + emqx_rpc:multicall(mria_mnesia:running_nodes(), M, F, A). + +reason2httpresp(quota_exceeded) -> + Msg = list_to_binary( + io_lib:format("Max topic metrics count is ~p", + [emqx_topic_metrics:max_limit()])), + {409, #{code => ?EXCEED_LIMIT, message => Msg}}; +reason2httpresp(bad_topic) -> + Msg = <<"Bad Topic, topic cannot have wildcard">>, + {400, #{code => ?BAD_TOPIC, message => Msg}}; +reason2httpresp({quota_exceeded, bad_topic}) -> + Msg = list_to_binary( + io_lib:format( + "Max topic metrics count is ~p, and topic cannot have wildcard", + [emqx_topic_metrics:max_limit()])), + {400, #{code => ?BAD_REQUEST, message => Msg}}; +reason2httpresp(already_existed) -> + Msg = <<"Topic already registered">>, + {400, #{code => ?BAD_TOPIC, message => Msg}}; +reason2httpresp(not_found) -> + Msg = <<"Topic not found">>, + {404, #{code => ?ERROR_TOPIC, message => Msg}}; +reason2httpresp(topic_not_found) -> + Msg = <<"Topic not found">>, + {404, #{code => ?ERROR_TOPIC, message => Msg}}. diff --git a/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl b/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl new file mode 100644 index 000000000..95ebb5711 --- /dev/null +++ b/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl @@ -0,0 +1,51 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_modules_conf_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + emqx_config:init_load(emqx_modules_schema, <<"gateway {}">>), + emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]), + Conf. + +end_per_suite(_Conf) -> + emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]). + +init_per_testcase(_CaseName, Conf) -> + Conf. + +%%-------------------------------------------------------------------- +%% Cases +%%-------------------------------------------------------------------- + +t_topic_metrics_list(_) -> + ok. + +t_topic_metrics_add_remove(_) -> + ok. +