feat(sys): add sys_topics http api

This commit is contained in:
JianBo He 2022-04-18 15:32:34 +08:00
parent 0587318c0e
commit 59c41c7c6e
3 changed files with 198 additions and 7 deletions

View File

@ -52,6 +52,8 @@
on_client_unsubscribed/3 on_client_unsubscribed/3
]). ]).
-export([post_config_update/5]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -82,6 +84,8 @@
] ]
). ).
-define(CONF_KEY_PATH, [sys_topics]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -124,7 +128,7 @@ sys_heatbeat_interval() ->
emqx:get_config([sys_topics, sys_heartbeat_interval]). emqx:get_config([sys_topics, sys_heartbeat_interval]).
sys_event_messages() -> sys_event_messages() ->
emqx:get_config([sys_topics, sys_event_messages]). maps:to_list(emqx:get_config([sys_topics, sys_event_messages])).
%% @doc Get sys info %% @doc Get sys info
-spec info() -> list(tuple()). -spec info() -> list(tuple()).
@ -136,13 +140,39 @@ info() ->
{datetime, datetime()} {datetime, datetime()}
]. ].
%% Update the confgs at runtime
post_config_update(_, _Req, NewSysConf, OldSysConf, _AppEnvs) ->
{Added, Removed} = diff_hooks(NewSysConf, OldSysConf),
unload_event_hooks(Removed),
load_event_hooks(Added).
diff_hooks(NewSysConf, OldSysConf) ->
NewEvents = maps:get(sys_event_messages, NewSysConf, #{}),
OldEvents = maps:get(sys_event_messages, OldSysConf, #{}),
maps:fold(
fun(K, V, {Acc1, Acc2}) ->
case V =:= maps:get(K, OldEvents, false) of
true ->
{Acc1, Acc2};
false ->
case V of
true -> {[{K, V} | Acc1], [Acc2]};
false -> {Acc1, [{K, V} | Acc2]}
end
end
end,
{[], []},
NewEvents
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
State = #state{sysdescr = iolist_to_binary(sysdescr())}, State = #state{sysdescr = iolist_to_binary(sysdescr())},
load_event_hooks(), load_event_hooks(sys_event_messages()),
{ok, heartbeat(tick(State))}. {ok, heartbeat(tick(State))}.
heartbeat(State) -> heartbeat(State) ->
@ -150,7 +180,9 @@ heartbeat(State) ->
tick(State) -> tick(State) ->
State#state{ticker = start_timer(sys_interval(), tick)}. State#state{ticker = start_timer(sys_interval(), tick)}.
load_event_hooks() -> load_event_hooks([]) ->
ok;
load_event_hooks(Events) ->
lists:foreach( lists:foreach(
fun fun
({_, false}) -> ({_, false}) ->
@ -159,7 +191,7 @@ load_event_hooks() ->
{HookPoint, Fun} = hook_and_fun(K), {HookPoint, Fun} = hook_and_fun(K),
emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) emqx_hooks:put(HookPoint, {?MODULE, Fun, []})
end, end,
maps:to_list(sys_event_messages()) Events
). ).
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -186,16 +218,19 @@ handle_info(Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
unload_event_hooks(), _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
unload_event_hooks(sys_event_messages()),
lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]).
unload_event_hooks() -> unload_event_hooks([]) ->
ok;
unload_event_hooks(Event) ->
lists:foreach( lists:foreach(
fun({K, _}) -> fun({K, _}) ->
{HookPoint, Fun} = hook_and_fun(K), {HookPoint, Fun} = hook_and_fun(K),
emqx_hooks:del(HookPoint, {?MODULE, Fun}) emqx_hooks:del(HookPoint, {?MODULE, Fun})
end, end,
maps:to_list(sys_event_messages()) Event
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -0,0 +1,90 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_sys).
-behaviour(minirest_api).
-include_lib("emqx/include/emqx.hrl").
-include_lib("typerefl/include/types.hrl").
%% API
-export([ api_spec/0
, paths/0
, schema/1
, namespace/0
]).
-export([ sys/2
]).
-define(TAGS, [<<"sys">>]).
namespace() -> "sys".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
["/mqtt/sys_topics"].
sys(get, _Params) ->
{200, emqx_conf:get_raw([sys_topics], #{})};
sys(put, #{body := Body}) ->
{ok, _} = emqx_conf:update([sys_topics], Body, #{override_to => cluster}),
{200, emqx_conf:get_raw([sys_topics], #{})}.
%%--------------------------------------------------------------------
%% Swagger defines
%%--------------------------------------------------------------------
schema("/mqtt/sys_topics") ->
#{
'operationId' => sys,
get =>
#{
tags => ?TAGS,
description => <<"Get System Topics config">>,
responses =>
#{
200 => schema_sys_topics()
}
},
put =>
#{
tags => ?TAGS,
description => <<"Update System Topics config">>,
'requestBody' => schema_sys_topics(),
responses =>
#{
200 => schema_sys_topics()
}
}
}.
schema_sys_topics() ->
emqx_dashboard_swagger:schema_with_examples(
hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics()).
example_sys_topics() ->
#{<<"sys_event_messages">> =>
#{<<"client_connected">> => true,
<<"client_disconnected">> => true,
<<"client_subscribed">> => false,
<<"client_unsubscribed">> => false},
<<"sys_heartbeat_interval">> => <<"30s">>,
<<"sys_msg_interval">> => <<"1m">>
}.

View File

@ -0,0 +1,66 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_sys_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config.
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
t_get_put(_) ->
{ok, Default} = get_sys_topics_config(),
?assertEqual(
#{<<"sys_event_messages">> =>
#{<<"client_connected">> => true,
<<"client_disconnected">> => true,
<<"client_subscribed">> => false,
<<"client_unsubscribed">> => false
},
<<"sys_heartbeat_interval">> => <<"30s">>,
<<"sys_msg_interval">> => <<"1m">>}, Default),
NConfig = Default#{
<<"sys_msg_interval">> => <<"4m">>,
<<"sys_event_messages">> => #{<<"client_subscribed">> => false}
},
{ok, ConfigResp} = put_sys_topics_config(NConfig),
?assertEqual(NConfig, ConfigResp),
{ok, Default} = put_sys_topics_config(Default).
get_sys_topics_config() ->
Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]),
case emqx_mgmt_api_test_util:request_api(get, Path) of
{ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])};
Error -> Error
end.
put_sys_topics_config(Config) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]),
case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Config) of
{ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])};
Error -> Error
end.