From 59c41c7c6e6c77a8af07a9d72833d8ead5384682 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 18 Apr 2022 15:32:34 +0800 Subject: [PATCH] feat(sys): add sys_topics http api --- apps/emqx/src/emqx_sys.erl | 49 ++++++++-- .../emqx_management/src/emqx_mgmt_api_sys.erl | 90 +++++++++++++++++++ .../test/emqx_mgmt_api_sys_SUITE.erl | 66 ++++++++++++++ 3 files changed, 198 insertions(+), 7 deletions(-) create mode 100644 apps/emqx_management/src/emqx_mgmt_api_sys.erl create mode 100644 apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index d3bf72fc3..43a4569db 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -52,6 +52,8 @@ on_client_unsubscribed/3 ]). +-export([post_config_update/5]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -82,6 +84,8 @@ ] ). +-define(CONF_KEY_PATH, [sys_topics]). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -124,7 +128,7 @@ sys_heatbeat_interval() -> emqx:get_config([sys_topics, sys_heartbeat_interval]). sys_event_messages() -> - emqx:get_config([sys_topics, sys_event_messages]). + maps:to_list(emqx:get_config([sys_topics, sys_event_messages])). %% @doc Get sys info -spec info() -> list(tuple()). @@ -136,13 +140,39 @@ info() -> {datetime, datetime()} ]. +%% Update the confgs at runtime +post_config_update(_, _Req, NewSysConf, OldSysConf, _AppEnvs) -> + {Added, Removed} = diff_hooks(NewSysConf, OldSysConf), + unload_event_hooks(Removed), + load_event_hooks(Added). + +diff_hooks(NewSysConf, OldSysConf) -> + NewEvents = maps:get(sys_event_messages, NewSysConf, #{}), + OldEvents = maps:get(sys_event_messages, OldSysConf, #{}), + maps:fold( + fun(K, V, {Acc1, Acc2}) -> + case V =:= maps:get(K, OldEvents, false) of + true -> + {Acc1, Acc2}; + false -> + case V of + true -> {[{K, V} | Acc1], [Acc2]}; + false -> {Acc1, [{K, V} | Acc2]} + end + end + end, + {[], []}, + NewEvents + ). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> + ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), State = #state{sysdescr = iolist_to_binary(sysdescr())}, - load_event_hooks(), + load_event_hooks(sys_event_messages()), {ok, heartbeat(tick(State))}. heartbeat(State) -> @@ -150,7 +180,9 @@ heartbeat(State) -> tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. -load_event_hooks() -> +load_event_hooks([]) -> + ok; +load_event_hooks(Events) -> lists:foreach( fun ({_, false}) -> @@ -159,7 +191,7 @@ load_event_hooks() -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) end, - maps:to_list(sys_event_messages()) + Events ). handle_call(Req, _From, State) -> @@ -186,16 +218,19 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> - unload_event_hooks(), + _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), + unload_event_hooks(sys_event_messages()), lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). -unload_event_hooks() -> +unload_event_hooks([]) -> + ok; +unload_event_hooks(Event) -> lists:foreach( fun({K, _}) -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:del(HookPoint, {?MODULE, Fun}) end, - maps:to_list(sys_event_messages()) + Event ). %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_sys.erl b/apps/emqx_management/src/emqx_mgmt_api_sys.erl new file mode 100644 index 000000000..f02a25cbb --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_sys.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_api_sys). + +-behaviour(minirest_api). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("typerefl/include/types.hrl"). + +%% API +-export([ api_spec/0 + , paths/0 + , schema/1 + , namespace/0 + ]). + +-export([ sys/2 + ]). + +-define(TAGS, [<<"sys">>]). + +namespace() -> "sys". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + ["/mqtt/sys_topics"]. + +sys(get, _Params) -> + {200, emqx_conf:get_raw([sys_topics], #{})}; +sys(put, #{body := Body}) -> + {ok, _} = emqx_conf:update([sys_topics], Body, #{override_to => cluster}), + {200, emqx_conf:get_raw([sys_topics], #{})}. + +%%-------------------------------------------------------------------- +%% Swagger defines +%%-------------------------------------------------------------------- + +schema("/mqtt/sys_topics") -> + #{ + 'operationId' => sys, + get => + #{ + tags => ?TAGS, + description => <<"Get System Topics config">>, + responses => + #{ + 200 => schema_sys_topics() + } + }, + put => + #{ + tags => ?TAGS, + description => <<"Update System Topics config">>, + 'requestBody' => schema_sys_topics(), + responses => + #{ + 200 => schema_sys_topics() + } + } + }. + +schema_sys_topics() -> + emqx_dashboard_swagger:schema_with_examples( + hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics()). + +example_sys_topics() -> + #{<<"sys_event_messages">> => + #{<<"client_connected">> => true, + <<"client_disconnected">> => true, + <<"client_subscribed">> => false, + <<"client_unsubscribed">> => false}, + <<"sys_heartbeat_interval">> => <<"30s">>, + <<"sys_msg_interval">> => <<"1m">> + }. diff --git a/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl new file mode 100644 index 000000000..725eacb34 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_sys_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_mgmt_api_test_util:init_suite([emqx_conf]), + Config. + +end_per_suite(_) -> + emqx_mgmt_api_test_util:end_suite([emqx_conf]). + +t_get_put(_) -> + {ok, Default} = get_sys_topics_config(), + ?assertEqual( + #{<<"sys_event_messages">> => + #{<<"client_connected">> => true, + <<"client_disconnected">> => true, + <<"client_subscribed">> => false, + <<"client_unsubscribed">> => false + }, + <<"sys_heartbeat_interval">> => <<"30s">>, + <<"sys_msg_interval">> => <<"1m">>}, Default), + + NConfig = Default#{ + <<"sys_msg_interval">> => <<"4m">>, + <<"sys_event_messages">> => #{<<"client_subscribed">> => false} + }, + {ok, ConfigResp} = put_sys_topics_config(NConfig), + ?assertEqual(NConfig, ConfigResp), + {ok, Default} = put_sys_topics_config(Default). + +get_sys_topics_config() -> + Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]), + case emqx_mgmt_api_test_util:request_api(get, Path) of + {ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])}; + Error -> Error + end. + +put_sys_topics_config(Config) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]), + case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Config) of + {ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])}; + Error -> Error + end.