diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index d3bf72fc3..ad9b23daa 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -52,6 +52,8 @@ on_client_unsubscribed/3 ]). +-export([post_config_update/5]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -82,6 +84,8 @@ ] ). +-define(CONF_KEY_PATH, [sys_topics]). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -124,7 +128,7 @@ sys_heatbeat_interval() -> emqx:get_config([sys_topics, sys_heartbeat_interval]). sys_event_messages() -> - emqx:get_config([sys_topics, sys_event_messages]). + maps:to_list(emqx:get_config([sys_topics, sys_event_messages])). %% @doc Get sys info -spec info() -> list(tuple()). @@ -136,13 +140,40 @@ info() -> {datetime, datetime()} ]. +%% Update the confgs at runtime +post_config_update(_, _Req, NewSysConf, OldSysConf, _AppEnvs) -> + {Added, Removed} = diff_hooks(NewSysConf, OldSysConf), + unload_event_hooks(Removed), + load_event_hooks(Added). + +diff_hooks(NewSysConf, OldSysConf) -> + NewEvents = maps:to_list(maps:get(sys_event_messages, NewSysConf, #{})), + OldEvents = maps:to_list(maps:get(sys_event_messages, OldSysConf, #{})), + diff_hooks(NewEvents, OldEvents, [], []). + +diff_hooks([], [], Added, Removed) -> + {lists:reverse(Added), lists:reverse(Removed)}; +diff_hooks([H | T1], [H | T2], Added, Removed) -> + diff_hooks(T1, T2, Added, Removed); +diff_hooks( + [New = {EventName, NewEnable} | T1], + [Old = {EventName, OldEnable} | T2], + Added, + Removed +) -> + case {NewEnable, OldEnable} of + {true, false} -> diff_hooks(T1, T2, [New | Added], Removed); + {false, true} -> diff_hooks(T1, T2, Added, [Old | Removed]) + end. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> + ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), State = #state{sysdescr = iolist_to_binary(sysdescr())}, - load_event_hooks(), + load_event_hooks(sys_event_messages()), {ok, heartbeat(tick(State))}. heartbeat(State) -> @@ -150,7 +181,9 @@ heartbeat(State) -> tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. -load_event_hooks() -> +load_event_hooks([]) -> + ok; +load_event_hooks(Events) -> lists:foreach( fun ({_, false}) -> @@ -159,7 +192,7 @@ load_event_hooks() -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) end, - maps:to_list(sys_event_messages()) + Events ). handle_call(Req, _From, State) -> @@ -186,16 +219,19 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> - unload_event_hooks(), + _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), + unload_event_hooks(sys_event_messages()), lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). -unload_event_hooks() -> +unload_event_hooks([]) -> + ok; +unload_event_hooks(Event) -> lists:foreach( fun({K, _}) -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:del(HookPoint, {?MODULE, Fun}) end, - maps:to_list(sys_event_messages()) + Event ). %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index d67bb18b3..c3a091ce3 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -31,7 +31,8 @@ emqx_stats, emqx_broker, mria_mnesia, - emqx_hooks + emqx_hooks, + emqx_config_handler ]). -define(ALL(Vars, Types, Exprs), @@ -114,7 +115,9 @@ do_mock(emqx_metrics) -> meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end); do_mock(emqx_hooks) -> meck:expect(emqx_hooks, put, fun(_HookPoint, _MFA) -> ok end), - meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end). + meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end); +do_mock(emqx_config_handler) -> + meck:expect(emqx_config_handler, add_handler, fun(_, _) -> ok end). %%-------------------------------------------------------------------- %% MODEL diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 4769d42f5..5143ff892 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -60,7 +60,8 @@ <<"delayed">>, <<"event_message">>, <<"prometheus">>, - <<"telemetry">> + <<"telemetry">>, + <<"sys_topics">> ] ++ global_zone_roots() ). diff --git a/apps/emqx_management/src/emqx_mgmt_api_sys.erl b/apps/emqx_management/src/emqx_mgmt_api_sys.erl new file mode 100644 index 000000000..220ce2563 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_sys.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_api_sys). + +-behaviour(minirest_api). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("typerefl/include/types.hrl"). + +%% API +-export([ api_spec/0 + , paths/0 + , schema/1 + , namespace/0 + ]). + +-export([ sys/2 + ]). + +-define(TAGS, [<<"sys">>]). + +namespace() -> "sys". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + ["/mqtt/sys_topics"]. + +sys(get, _Params) -> + {200, emqx_conf:get_raw([sys_topics], #{})}; +sys(put, #{body := Body}) -> + {ok, _} = emqx_conf:update([sys_topics], Body, #{override_to => cluster}), + {200, emqx_conf:get_raw([sys_topics], #{})}. + +%%-------------------------------------------------------------------- +%% Swagger defines +%%-------------------------------------------------------------------- + +schema("/mqtt/sys_topics") -> + #{ + 'operationId' => sys, + get => + #{ + tags => ?TAGS, + description => <<"Get System Topics config">>, + responses => + #{ + 200 => schema_sys_topics() + } + }, + put => + #{ + tags => ?TAGS, + description => <<"Update System Topics config">>, + 'requestBody' => schema_sys_topics(), + responses => + #{ + 200 => schema_sys_topics() + } + } + }. + +schema_sys_topics() -> + emqx_dashboard_swagger:schema_with_example( + hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics()). + +example_sys_topics() -> + #{<<"sys_event_messages">> => + #{<<"client_connected">> => true, + <<"client_disconnected">> => true, + <<"client_subscribed">> => false, + <<"client_unsubscribed">> => false}, + <<"sys_heartbeat_interval">> => <<"30s">>, + <<"sys_msg_interval">> => <<"1m">> + }. diff --git a/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl new file mode 100644 index 000000000..725eacb34 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_sys_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_mgmt_api_test_util:init_suite([emqx_conf]), + Config. + +end_per_suite(_) -> + emqx_mgmt_api_test_util:end_suite([emqx_conf]). + +t_get_put(_) -> + {ok, Default} = get_sys_topics_config(), + ?assertEqual( + #{<<"sys_event_messages">> => + #{<<"client_connected">> => true, + <<"client_disconnected">> => true, + <<"client_subscribed">> => false, + <<"client_unsubscribed">> => false + }, + <<"sys_heartbeat_interval">> => <<"30s">>, + <<"sys_msg_interval">> => <<"1m">>}, Default), + + NConfig = Default#{ + <<"sys_msg_interval">> => <<"4m">>, + <<"sys_event_messages">> => #{<<"client_subscribed">> => false} + }, + {ok, ConfigResp} = put_sys_topics_config(NConfig), + ?assertEqual(NConfig, ConfigResp), + {ok, Default} = put_sys_topics_config(Default). + +get_sys_topics_config() -> + Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]), + case emqx_mgmt_api_test_util:request_api(get, Path) of + {ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])}; + Error -> Error + end. + +put_sys_topics_config(Config) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]), + case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Config) of + {ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])}; + Error -> Error + end.