Merge pull request #6581 from HJianBo/integrate_emqx_conf_into_modules

This commit is contained in:
JianBo He 2021-12-31 14:03:37 +08:00 committed by GitHub
commit 5e349e9d1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 322 additions and 82 deletions

View File

@ -38,7 +38,8 @@ maybe_enable_modules() ->
emqx_event_message:enable(), emqx_event_message:enable(),
emqx_conf_cli:load(), emqx_conf_cli:load(),
ok = emqx_rewrite:enable(), ok = emqx_rewrite:enable(),
emqx_topic_metrics:enable(). emqx_topic_metrics:enable(),
emqx_modules_conf:load().
maybe_disable_modules() -> maybe_disable_modules() ->
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(), emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
@ -47,4 +48,5 @@ maybe_disable_modules() ->
emqx_event_message:disable(), emqx_event_message:disable(),
emqx_rewrite:disable(), emqx_rewrite:disable(),
emqx_conf_cli:unload(), emqx_conf_cli:unload(),
emqx_topic_metrics:disable(). emqx_topic_metrics:disable(),
emqx_modules_conf:unload().

View File

@ -0,0 +1,131 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The emqx-modules configration interoperable interfaces
-module(emqx_modules_conf).
-behaviour(emqx_config_handler).
%% Load/Unload
-export([ load/0
, unload/0
]).
%% topci-metrics
-export([ topic_metrics/0
, add_topic_metrics/1
, remove_topic_metrics/1
]).
%% config handlers
-export([ pre_config_update/3
, post_config_update/5
]).
%%--------------------------------------------------------------------
%% Load/Unload
-spec load() -> ok.
load() ->
emqx_conf:add_handler([topic_metrics], ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_conf:remove_handler([topic_metrics]).
%%--------------------------------------------------------------------
%% Topic-Metrics
-spec topic_metrics() -> [emqx_types:topic()].
topic_metrics() ->
lists:map(
fun(#{topic := Topic}) -> Topic end,
emqx:get_config([topic_metrics])
).
-spec add_topic_metrics(emqx_types:topic())
-> {ok, emqx_types:topic()}
| {error, term()}.
add_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
{ok, _} -> {ok, Topic};
{error, Reason} -> {error, Reason}
end.
-spec remove_topic_metrics(emqx_types:topic())
-> ok
| {error, term()}.
remove_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
{ok, _} -> ok;
{error, Reason} -> {error, Reason}
end.
cfg_update(topic_metrics, Action, Params) ->
res(emqx_conf:update(
[topic_metrics],
{Action, Params},
#{override_to => cluster})).
res({ok, Result}) -> {ok, Result};
res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason};
res({error, {post_config_update, ?MODULE, Reason}}) -> {error, Reason};
res({error, Reason}) -> {error, Reason}.
%%--------------------------------------------------------------------
%% Config Handler
%%--------------------------------------------------------------------
-spec pre_config_update(list(atom()),
emqx_config:update_request(),
emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update(_, {add_topic_metrics, Topic0}, RawConf) ->
Topic = #{<<"topic">> => Topic0},
case lists:member(Topic, RawConf) of
true ->
{error, already_existed};
_ ->
{ok, RawConf ++ [Topic]}
end;
pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
Topic = #{<<"topic">> => Topic0},
case lists:member(Topic, RawConf) of
true ->
{ok, RawConf -- [Topic]};
_ ->
{error, not_found}
end.
-spec post_config_update(list(atom()),
emqx_config:update_request(),
emqx_config:config(),
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update(_, {add_topic_metrics, Topic},
_NewConfig, _OldConfig, _AppEnvs) ->
case emqx_topic_metrics:register(Topic) of
ok -> ok;
{error, Reason} -> {error, Reason}
end;
post_config_update(_, {remove_topic_metrics, Topic},
_NewConfig, _OldConfig, _AppEnvs) ->
case emqx_topic_metrics:deregister(Topic) of
ok -> ok;
{error, Reason} -> {error, Reason}
end.

View File

@ -220,7 +220,6 @@ handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
handle_call({deregister, all}, _From, State) -> handle_call({deregister, all}, _From, State) ->
true = ets:delete_all_objects(?TAB), true = ets:delete_all_objects(?TAB),
update_config([]),
{reply, ok, State#state{speeds = #{}}}; {reply, ok, State#state{speeds = #{}}};
handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) -> handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) ->
@ -232,7 +231,6 @@ handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) ->
NSpeeds = lists:foldl(fun(Metric, Acc) -> NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:remove({Topic, Metric}, Acc) maps:remove({Topic, Metric}, Acc)
end, Speeds, ?TOPIC_METRICS), end, Speeds, ?TOPIC_METRICS),
remove_topic_config(Topic),
{reply, ok, State#state{speeds = NSpeeds}} {reply, ok, State#state{speeds = NSpeeds}}
end; end;
@ -316,7 +314,6 @@ do_register(Topic, Speeds) ->
NSpeeds = lists:foldl(fun(Metric, Acc) -> NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:put({Topic, Metric}, #speed{}, Acc) maps:put({Topic, Metric}, #speed{}, Acc)
end, Speeds, ?TOPIC_METRICS), end, Speeds, ?TOPIC_METRICS),
add_topic_config(Topic),
{ok, NSpeeds}; {ok, NSpeeds};
{true, true} -> {true, true} ->
{error, bad_topic}; {error, bad_topic};
@ -351,18 +348,6 @@ format({Topic, Data}) ->
TopicMetrics#{reset_time => ResetTime} TopicMetrics#{reset_time => ResetTime}
end. end.
remove_topic_config(Topic) when is_binary(Topic) ->
Topics = emqx_config:get_raw([<<"topic_metrics">>], []) -- [#{<<"topic">> => Topic}],
update_config(Topics).
add_topic_config(Topic) when is_binary(Topic) ->
Topics = emqx_config:get_raw([<<"topic_metrics">>], []) ++ [#{<<"topic">> => Topic}],
update_config(lists:usort(Topics)).
update_config(Topics) when is_list(Topics) ->
{ok, _} = emqx:update_config([topic_metrics], Topics),
ok.
try_inc(Topic, Metric) -> try_inc(Topic, Metric) ->
_ = inc(Topic, Metric), _ = inc(Topic, Metric),
ok. ok.

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% TODO: refactor uri path
-module(emqx_topic_metrics_api). -module(emqx_topic_metrics_api).
-behaviour(minirest_api). -behaviour(minirest_api).
@ -73,6 +73,7 @@ properties() ->
topic_metrics_api() -> topic_metrics_api() ->
MetaData = #{ MetaData = #{
%% Get all nodes metrics and accumulate all of these
get => #{ get => #{
description => <<"List topic metrics">>, description => <<"List topic metrics">>,
responses => #{ responses => #{
@ -133,87 +134,157 @@ topic_param() ->
}. }.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% api callback %% HTTP Callbacks
%%--------------------------------------------------------------------
topic_metrics(get, _) -> topic_metrics(get, _) ->
list_metrics(); case cluster_accumulation_metrics() of
{error, Reason} ->
{500, Reason};
{ok, Metrics} ->
{200, Metrics}
end;
topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) ->
reset(Topic); case reset(Topic) of
ok -> {200};
{error, Reason} -> reason2httpresp(Reason)
end;
topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) ->
reset(); reset(),
{200};
topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) ->
{400, 'BAD_REQUEST', <<"Topic can not be empty">>}; {400, 'BAD_REQUEST', <<"Topic can not be empty">>};
topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
register(Topic). case emqx_modules_conf:add_topic_metrics(Topic) of
{ok, Topic} ->
operate_topic_metrics(Method, #{bindings := #{topic := Topic0}}) -> {200};
Topic = decode_topic(Topic0), {error, Reason} ->
case Method of reason2httpresp(Reason)
get ->
get_metrics(Topic);
put ->
register(Topic);
delete ->
deregister(Topic)
end. end.
decode_topic(Topic) -> operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) ->
uri_string:percent_decode(Topic). case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of
{ok, Metrics} ->
{200, Metrics};
{error, Reason} ->
reason2httpresp(Reason)
end;
operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) ->
case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of
ok -> {200};
{error, Reason} -> reason2httpresp(Reason)
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% api apply %% Internal funcs
list_metrics() -> %%--------------------------------------------------------------------
{200, emqx_topic_metrics:metrics()}.
register(Topic) -> cluster_accumulation_metrics() ->
case emqx_topic_metrics:register(Topic) of case multicall(emqx_topic_metrics, metrics, []) of
{error, quota_exceeded} -> {SuccResList, []} ->
Message = list_to_binary(io_lib:format("Max topic metrics count is ~p", {ok, accumulate_nodes_metrics(SuccResList)};
[emqx_topic_metrics:max_limit()])), {_, FailedNodes} ->
{409, #{code => ?EXCEED_LIMIT, message => Message}}; {error, {badrpc, FailedNodes}}
{error, bad_topic} ->
Message = list_to_binary(io_lib:format("Bad Topic, topic cannot have wildcard ~p",
[Topic])),
{400, #{code => ?BAD_TOPIC, message => Message}};
{error, {quota_exceeded, bad_topic}} ->
Message = list_to_binary(
io_lib:format(
"Max topic metrics count is ~p, and topic cannot have wildcard ~p",
[emqx_topic_metrics:max_limit(), Topic])),
{400, #{code => ?BAD_REQUEST, message => Message}};
{error, already_existed} ->
Message = list_to_binary(io_lib:format("Topic ~p already registered", [Topic])),
{400, #{code => ?BAD_TOPIC, message => Message}};
ok ->
{200}
end. end.
deregister(Topic) -> cluster_accumulation_metrics(Topic) ->
case emqx_topic_metrics:deregister(Topic) of case multicall(emqx_topic_metrics, metrics, [Topic]) of
{error, topic_not_found} -> {SuccResList, []} ->
Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), case lists:filter(fun({error, _}) -> false; (_) -> true
{404, #{code => ?ERROR_TOPIC, message => Message}}; end, SuccResList) of
ok -> [] -> {error, topic_not_found};
{200} TopicMetrics ->
NTopicMetrics = [ [T] || T <- TopicMetrics],
[AccMetrics] = accumulate_nodes_metrics(NTopicMetrics),
{ok, AccMetrics}
end;
{_, FailedNodes} ->
{error, {badrpc, FailedNodes}}
end. end.
get_metrics(Topic) -> accumulate_nodes_metrics(NodesTopicMetrics) ->
case emqx_topic_metrics:metrics(Topic) of AccMap = lists:foldl(fun(TopicMetrics, ExAcc) ->
{error, topic_not_found} -> MetricsMap = lists:foldl(
Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), fun(#{topic := Topic,
{404, #{code => ?ERROR_TOPIC, message => Message}}; metrics := Metrics,
Metrics -> create_time := CreateTime}, Acc) ->
{200, Metrics} Acc#{Topic => {Metrics, CreateTime}}
end. end, #{}, TopicMetrics),
accumulate_metrics(MetricsMap, ExAcc)
end, #{}, NodesTopicMetrics),
maps:fold(fun(Topic, {Metrics, CreateTime1}, Acc1) ->
[#{topic => Topic,
metrics => Metrics,
create_time => CreateTime1} | Acc1]
end, [], AccMap).
%% @doc TopicMetricsIn :: #{<<"topic">> := {Metrics, CreateTime}}
accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) ->
Topics = maps:keys(TopicMetricsIn),
lists:foldl(fun(Topic, Acc) ->
{Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn),
NMetrics = do_accumulation_metrics(
Metrics,
maps:get(Topic, TopicMetricsAcc, undefined)
),
maps:put(Topic, {NMetrics, CreateTime}, Acc)
end, #{}, Topics).
%% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...}
do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn;
do_accumulation_metrics(MetricsIn, MetricsAcc) ->
Keys = maps:keys(MetricsIn),
lists:foldl(fun(Key, Acc) ->
InVal = maps:get(Key, MetricsIn),
NVal = InVal + maps:get(Key, MetricsAcc, 0),
maps:put(Key, NVal, Acc)
end, #{}, Keys).
reset() -> reset() ->
ok = emqx_topic_metrics:reset(), _ = multicall(emqx_topic_metrics, reset, []),
{200}. ok.
reset(Topic) -> reset(Topic) ->
case emqx_topic_metrics:reset(Topic) of case multicall(emqx_topic_metrics, reset, [Topic]) of
{error, topic_not_found} -> {SuccResList, []} ->
Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), case lists:filter(fun({error, _}) -> true; (_) -> false
{404, #{code => ?ERROR_TOPIC, message => Message}}; end, SuccResList) of
ok -> [{error, Reason} | _] ->
{200} {error, Reason};
[] ->
ok
end
end. end.
%%--------------------------------------------------------------------
%% utils
multicall(M, F, A) ->
emqx_rpc:multicall(mria_mnesia:running_nodes(), M, F, A).
reason2httpresp(quota_exceeded) ->
Msg = list_to_binary(
io_lib:format("Max topic metrics count is ~p",
[emqx_topic_metrics:max_limit()])),
{409, #{code => ?EXCEED_LIMIT, message => Msg}};
reason2httpresp(bad_topic) ->
Msg = <<"Bad Topic, topic cannot have wildcard">>,
{400, #{code => ?BAD_TOPIC, message => Msg}};
reason2httpresp({quota_exceeded, bad_topic}) ->
Msg = list_to_binary(
io_lib:format(
"Max topic metrics count is ~p, and topic cannot have wildcard",
[emqx_topic_metrics:max_limit()])),
{400, #{code => ?BAD_REQUEST, message => Msg}};
reason2httpresp(already_existed) ->
Msg = <<"Topic already registered">>,
{400, #{code => ?BAD_TOPIC, message => Msg}};
reason2httpresp(topic_not_found) ->
Msg = <<"Topic not found">>,
{404, #{code => ?ERROR_TOPIC, message => Msg}};
reason2httpresp(not_found) ->
Msg = <<"Topic not found">>,
{404, #{code => ?ERROR_TOPIC, message => Msg}}.

View File

@ -0,0 +1,51 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_conf_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:init_load(emqx_modules_schema, <<"gateway {}">>),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
Conf.
end_per_suite(_Conf) ->
emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
init_per_testcase(_CaseName, Conf) ->
Conf.
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_topic_metrics_list(_) ->
ok.
t_topic_metrics_add_remove(_) ->
ok.