feat(emqx_retainer): add simple restapi for emqx_retainer

This commit is contained in:
lafirest 2021-07-05 14:31:21 +08:00 committed by turtleDeng
parent 53df218e6a
commit 72f9e60d63
10 changed files with 271 additions and 23 deletions

View File

@ -6,6 +6,8 @@
##
## Notice that all nodes in the same cluster have to be configured to
emqx_retainer: {
## enable/disable emqx_retainer
enable: true
## use the same storage_type.
##
## Value: ram | disc | disc_only

View File

@ -1,6 +1,6 @@
{application, emqx_retainer,
[{description, "EMQ X Retainer"},
{vsn, "4.3.2"}, % strict semver, bump manually!
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,15 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<"4.3.[0-1]">>, [
{restart_application, emqx_retainer}
]},
{<<".*">>, []}
],
[
{<<"4.3.[0-1]">>, [
{restart_application, emqx_retainer}
]},
{<<".*">>, []}
]
}.

View File

@ -27,15 +27,15 @@
-export([start_link/0]).
-export([ load/0
, unload/0
-export([unload/0
]).
-export([ on_session_subscribed/3
, on_message_publish/1
]).
-export([clean/1]).
-export([ clean/1
, update_config/1]).
%% for emqx_pool task func
-export([dispatch/2]).
@ -56,6 +56,7 @@
-define(DEF_MAX_RETAINED_MESSAGES, 0).
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
-define(DEF_EXPIRY_INTERVAL, 0).
-define(DEF_ENABLE_VAL, false).
%% convenient to generate stats_timer/expiry_timer
-define(MAKE_TIMER(State, Timer, Interval, Msg),
@ -130,6 +131,15 @@ clean(Topic) when is_binary(Topic) ->
{atomic, N} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), N
end.
%%--------------------------------------------------------------------
%% Update Config
%%--------------------------------------------------------------------
-spec update_config(hocon:config()) -> ok.
update_config(Conf) ->
OldCfg = emqx_config:get([?APP]),
emqx_config:put([?APP], Conf),
check_enable_when_update(OldCfg).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -162,6 +172,7 @@ init([]) ->
end,
StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'),
State = ?MAKE_TIMER(#state{stats_fun = StatsFun}, stats_timer, ?STATS_INTERVAL, stats),
check_enable_when_init(),
{ok, start_expire_timer(ExpiryInterval, State)}.
start_expire_timer(0, State) ->
@ -321,3 +332,23 @@ condition(Ws) ->
false -> Ws1;
_ -> (Ws1 -- ['#']) ++ '_'
end.
-spec check_enable_when_init() -> ok.
check_enable_when_init() ->
case emqx_config:get([?APP, enable], ?DEF_ENABLE_VAL) of
true -> load();
_ -> ok
end.
-spec check_enable_when_update(hocon:config()) -> ok.
check_enable_when_update(OldCfg) ->
OldVal = maps:get(enable, OldCfg, undefined),
case emqx_config:get([?APP, enable], ?DEF_ENABLE_VAL) of
OldVal ->
ok;
true ->
load();
_ ->
unload()
end.

View File

@ -0,0 +1,61 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_api).
-rest_api(#{name => lookup_config,
method => 'GET',
path => "/retainer",
func => lookup_config,
descr => "lookup retainer config"
}).
-rest_api(#{name => update_config,
method => 'PUT',
path => "/retainer",
func => update_config,
descr => "update retainer config"
}).
-export([ lookup_config/2
, update_config/2
]).
lookup_config(_Bindings, _Params) ->
Config = emqx_config:get([emqx_retainer]),
minirest:return({ok, Config}).
update_config(_Bindings, Params) ->
try
ConfigList = proplists:get_value(<<"emqx_retainer">>, Params),
{ok, RawConf} = hocon:binary(jsx:encode(#{<<"emqx_retainer">> => ConfigList}),
#{format => richmap}),
RichConf = hocon_schema:check(emqx_retainer_schema, RawConf, #{atom_key => true}),
#{emqx_retainer := Conf} = hocon_schema:richmap_to_map(RichConf),
Action = proplists:get_value(<<"action">>, Params, undefined),
do_update_config(Action, Conf),
minirest:return()
catch _:_:Reason ->
minirest:return({error, Reason})
end.
%%------------------------------------------------------------------------------
%% Interval Funcs
%%------------------------------------------------------------------------------
do_update_config(undefined, Config) ->
emqx_retainer:update_config(Config);
do_update_config(<<"test">>, _) ->
ok.

View File

@ -26,7 +26,6 @@
start(_Type, _Args) ->
{ok, Sup} = emqx_retainer_sup:start_link(),
emqx_retainer:load(),
emqx_retainer_cli:load(),
{ok, Sup}.

View File

@ -11,7 +11,8 @@
structs() -> ["emqx_retainer"].
fields("emqx_retainer") ->
[ {storage_type, t(storage_type(), ram)}
[ {enable, t(boolean(), false)}
, {storage_type, t(storage_type(), ram)}
, {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)}
, {max_payload_size, t(emqx_schema:bytesize(), "1MB")}
, {expiry_interval, t(emqx_schema:duration_ms(), "0s")}

View File

@ -55,7 +55,8 @@ set_special_configs(_) ->
init_emqx_retainer_conf(Expiry) ->
emqx_config:put([emqx_retainer],
#{storage_type => ram,
#{enable => true,
storage_type => ram,
max_retained_messages => 0,
max_payload_size => 1024 * 1024,
expiry_interval => Expiry}).

View File

@ -0,0 +1,158 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_retainer.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_ct_http, [ request_api/3
, request_api/5
, get_http_data/1
, create_default_app/0
, delete_default_app/0
, default_auth_header/0
]).
-define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
all() ->
emqx_ct:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
application:stop(emqx_retainer),
emqx_ct_helpers:start_apps([emqx_retainer, emqx_management], fun set_special_configs/1),
create_default_app(),
Config.
end_per_suite(_Config) ->
delete_default_app(),
emqx_ct_helpers:stop_apps([emqx_retainer]).
init_per_testcase(_, Config) ->
Config.
set_special_configs(emqx_retainer) ->
init_emqx_retainer_conf(0);
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}],
default_application_id => <<"admin">>,
default_application_secret => <<"public">>}),
ok;
set_special_configs(_) ->
ok.
init_emqx_retainer_conf(Expiry) ->
emqx_config:put([emqx_retainer],
#{enable => true,
storage_type => ram,
max_retained_messages => 0,
max_payload_size => 1024 * 1024,
expiry_interval => Expiry}).
%%------------------------------------------------------------------------------
%% Test Cases
%%------------------------------------------------------------------------------
t_config(_Config) ->
{ok, Return} = request_http_rest_lookup(["retainer"]),
NowCfg = get_http_data(Return),
NewCfg = NowCfg#{<<"expiry_interval">> => timer:seconds(60)},
RetainerConf = #{<<"emqx_retainer">> => NewCfg},
{ok, _} = request_http_rest_update(["retainer?action=test"], RetainerConf),
{ok, TestReturn} = request_http_rest_lookup(["retainer"]),
?assertEqual(NowCfg, get_http_data(TestReturn)),
{ok, _} = request_http_rest_update(["retainer"], RetainerConf),
{ok, UpdateReturn} = request_http_rest_lookup(["retainer"]),
?assertEqual(NewCfg, get_http_data(UpdateReturn)),
ok.
t_enable_disable(_Config) ->
Conf = switch_emqx_retainer(undefined, true),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
timer:sleep(100),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))),
_ = switch_emqx_retainer(Conf, false),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
timer:sleep(100),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(0, length(receive_messages(1))),
ok = emqtt:disconnect(C1).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
request_http_rest_lookup(Path) ->
request_api(get, uri([Path]), default_auth_header()).
request_http_rest_update(Path, Params) ->
request_api(put, uri([Path]), [], default_auth_header(), Params).
uri(Parts) when is_list(Parts) ->
NParts = [b2l(E) || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
%% @private
b2l(B) when is_binary(B) ->
binary_to_list(B);
b2l(L) when is_list(L) ->
L.
receive_messages(Count) ->
receive_messages(Count, []).
receive_messages(0, Msgs) ->
Msgs;
receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
ct:log("Msg: ~p ~n", [Msg]),
receive_messages(Count-1, [Msg|Msgs]);
Other ->
ct:log("Other Msg: ~p~n",[Other]),
receive_messages(Count, Msgs)
after 2000 ->
Msgs
end.
switch_emqx_retainer(undefined, IsEnable) ->
{ok, Return} = request_http_rest_lookup(["retainer"]),
NowCfg = get_http_data(Return),
switch_emqx_retainer(NowCfg, IsEnable);
switch_emqx_retainer(NowCfg, IsEnable) ->
NewCfg = NowCfg#{<<"enable">> => IsEnable},
RetainerConf = #{<<"emqx_retainer">> => NewCfg},
{ok, _} = request_http_rest_update(["retainer"], RetainerConf),
NewCfg.

View File

@ -27,7 +27,7 @@ init_per_suite(Config) ->
%% Meck emqtt
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
%% Start Apps
emqx_ct_helpers:start_apps([emqx_retainer]),
emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1),
Config.
end_per_suite(_Config) ->
@ -37,6 +37,16 @@ end_per_suite(_Config) ->
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
set_special_configs(emqx_retainer) ->
emqx_config:put([emqx_retainer],
#{enable => true,
storage_type => ram,
max_retained_messages => 0,
max_payload_size => 1024 * 1024,
expiry_interval => 0});
set_special_configs(_) ->
ok.
client_info(Key, Client) ->
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).