diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 08220a207..f91b3aa4f 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -6,6 +6,8 @@ ## ## Notice that all nodes in the same cluster have to be configured to emqx_retainer: { + ## enable/disable emqx_retainer + enable: true ## use the same storage_type. ## ## Value: ram | disc | disc_only diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index c5ca7599d..4bc3b962b 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQ X Retainer"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.appup.src b/apps/emqx_retainer/src/emqx_retainer.appup.src deleted file mode 100644 index 759ec56bd..000000000 --- a/apps/emqx_retainer/src/emqx_retainer.appup.src +++ /dev/null @@ -1,15 +0,0 @@ -%% -*-: erlang -*- -{VSN, - [ - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_retainer} - ]}, - {<<".*">>, []} - ], - [ - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_retainer} - ]}, - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index affbc5ca3..961578870 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -27,15 +27,15 @@ -export([start_link/0]). --export([ load/0 - , unload/0 +-export([unload/0 ]). -export([ on_session_subscribed/3 , on_message_publish/1 ]). --export([clean/1]). +-export([ clean/1 + , update_config/1]). %% for emqx_pool task func -export([dispatch/2]). @@ -56,6 +56,7 @@ -define(DEF_MAX_RETAINED_MESSAGES, 0). -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_EXPIRY_INTERVAL, 0). +-define(DEF_ENABLE_VAL, false). %% convenient to generate stats_timer/expiry_timer -define(MAKE_TIMER(State, Timer, Interval, Msg), @@ -130,6 +131,15 @@ clean(Topic) when is_binary(Topic) -> {atomic, N} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), N end. +%%-------------------------------------------------------------------- +%% Update Config +%%-------------------------------------------------------------------- +-spec update_config(hocon:config()) -> ok. +update_config(Conf) -> + OldCfg = emqx_config:get([?APP]), + emqx_config:put([?APP], Conf), + check_enable_when_update(OldCfg). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -162,6 +172,7 @@ init([]) -> end, StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'), State = ?MAKE_TIMER(#state{stats_fun = StatsFun}, stats_timer, ?STATS_INTERVAL, stats), + check_enable_when_init(), {ok, start_expire_timer(ExpiryInterval, State)}. start_expire_timer(0, State) -> @@ -321,3 +332,23 @@ condition(Ws) -> false -> Ws1; _ -> (Ws1 -- ['#']) ++ '_' end. + +-spec check_enable_when_init() -> ok. +check_enable_when_init() -> + case emqx_config:get([?APP, enable], ?DEF_ENABLE_VAL) of + true -> load(); + _ -> ok + end. + +-spec check_enable_when_update(hocon:config()) -> ok. +check_enable_when_update(OldCfg) -> + OldVal = maps:get(enable, OldCfg, undefined), + case emqx_config:get([?APP, enable], ?DEF_ENABLE_VAL) of + OldVal -> + ok; + true -> + load(); + _ -> + unload() + end. + diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl new file mode 100644 index 000000000..237a3b19c --- /dev/null +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -0,0 +1,61 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_retainer_api). + +-rest_api(#{name => lookup_config, + method => 'GET', + path => "/retainer", + func => lookup_config, + descr => "lookup retainer config" + }). + +-rest_api(#{name => update_config, + method => 'PUT', + path => "/retainer", + func => update_config, + descr => "update retainer config" + }). + +-export([ lookup_config/2 + , update_config/2 + ]). + +lookup_config(_Bindings, _Params) -> + Config = emqx_config:get([emqx_retainer]), + minirest:return({ok, Config}). + +update_config(_Bindings, Params) -> + try + ConfigList = proplists:get_value(<<"emqx_retainer">>, Params), + {ok, RawConf} = hocon:binary(jsx:encode(#{<<"emqx_retainer">> => ConfigList}), + #{format => richmap}), + RichConf = hocon_schema:check(emqx_retainer_schema, RawConf, #{atom_key => true}), + #{emqx_retainer := Conf} = hocon_schema:richmap_to_map(RichConf), + Action = proplists:get_value(<<"action">>, Params, undefined), + do_update_config(Action, Conf), + minirest:return() + catch _:_:Reason -> + minirest:return({error, Reason}) + end. + +%%------------------------------------------------------------------------------ +%% Interval Funcs +%%------------------------------------------------------------------------------ +do_update_config(undefined, Config) -> + emqx_retainer:update_config(Config); +do_update_config(<<"test">>, _) -> + ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl index 3f42ddbd6..3626bbd00 100644 --- a/apps/emqx_retainer/src/emqx_retainer_app.erl +++ b/apps/emqx_retainer/src/emqx_retainer_app.erl @@ -26,7 +26,6 @@ start(_Type, _Args) -> {ok, Sup} = emqx_retainer_sup:start_link(), - emqx_retainer:load(), emqx_retainer_cli:load(), {ok, Sup}. diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index ece873dae..14f643823 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -11,7 +11,8 @@ structs() -> ["emqx_retainer"]. fields("emqx_retainer") -> - [ {storage_type, t(storage_type(), ram)} + [ {enable, t(boolean(), false)} + , {storage_type, t(storage_type(), ram)} , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} , {max_payload_size, t(emqx_schema:bytesize(), "1MB")} , {expiry_interval, t(emqx_schema:duration_ms(), "0s")} diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 4f549a385..cf74b1334 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -55,7 +55,8 @@ set_special_configs(_) -> init_emqx_retainer_conf(Expiry) -> emqx_config:put([emqx_retainer], - #{storage_type => ram, + #{enable => true, + storage_type => ram, max_retained_messages => 0, max_payload_size => 1024 * 1024, expiry_interval => Expiry}). diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl new file mode 100644 index 000000000..57196c7bd --- /dev/null +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -0,0 +1,158 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_retainer_api_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("emqx_retainer.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-import(emqx_ct_http, [ request_api/3 + , request_api/5 + , get_http_data/1 + , create_default_app/0 + , delete_default_app/0 + , default_auth_header/0 + ]). + +-define(HOST, "http://127.0.0.1:8081/"). +-define(API_VERSION, "v4"). +-define(BASE_PATH, "api"). + +all() -> + emqx_ct:all(?MODULE). + +groups() -> + []. + +init_per_suite(Config) -> + application:stop(emqx_retainer), + emqx_ct_helpers:start_apps([emqx_retainer, emqx_management], fun set_special_configs/1), + create_default_app(), + Config. + +end_per_suite(_Config) -> + delete_default_app(), + emqx_ct_helpers:stop_apps([emqx_retainer]). + +init_per_testcase(_, Config) -> + Config. + +set_special_configs(emqx_retainer) -> + init_emqx_retainer_conf(0); +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}], + default_application_id => <<"admin">>, + default_application_secret => <<"public">>}), + ok; +set_special_configs(_) -> + ok. + +init_emqx_retainer_conf(Expiry) -> + emqx_config:put([emqx_retainer], + #{enable => true, + storage_type => ram, + max_retained_messages => 0, + max_payload_size => 1024 * 1024, + expiry_interval => Expiry}). +%%------------------------------------------------------------------------------ +%% Test Cases +%%------------------------------------------------------------------------------ + +t_config(_Config) -> + {ok, Return} = request_http_rest_lookup(["retainer"]), + NowCfg = get_http_data(Return), + NewCfg = NowCfg#{<<"expiry_interval">> => timer:seconds(60)}, + RetainerConf = #{<<"emqx_retainer">> => NewCfg}, + + {ok, _} = request_http_rest_update(["retainer?action=test"], RetainerConf), + {ok, TestReturn} = request_http_rest_lookup(["retainer"]), + ?assertEqual(NowCfg, get_http_data(TestReturn)), + + {ok, _} = request_http_rest_update(["retainer"], RetainerConf), + {ok, UpdateReturn} = request_http_rest_lookup(["retainer"]), + ?assertEqual(NewCfg, get_http_data(UpdateReturn)), + ok. + +t_enable_disable(_Config) -> + Conf = switch_emqx_retainer(undefined, true), + + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + + emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), + timer:sleep(100), + + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(1, length(receive_messages(1))), + + _ = switch_emqx_retainer(Conf, false), + + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>), + emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), + timer:sleep(100), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(0, length(receive_messages(1))), + + ok = emqtt:disconnect(C1). + +%%-------------------------------------------------------------------- +%% HTTP Request +%%-------------------------------------------------------------------- +request_http_rest_lookup(Path) -> + request_api(get, uri([Path]), default_auth_header()). + +request_http_rest_update(Path, Params) -> + request_api(put, uri([Path]), [], default_auth_header(), Params). + +uri(Parts) when is_list(Parts) -> + NParts = [b2l(E) || E <- Parts], + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). + +%% @private +b2l(B) when is_binary(B) -> + binary_to_list(B); +b2l(L) when is_list(L) -> + L. + +receive_messages(Count) -> + receive_messages(Count, []). +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + ct:log("Msg: ~p ~n", [Msg]), + receive_messages(Count-1, [Msg|Msgs]); + Other -> + ct:log("Other Msg: ~p~n",[Other]), + receive_messages(Count, Msgs) + after 2000 -> + Msgs + end. + +switch_emqx_retainer(undefined, IsEnable) -> + {ok, Return} = request_http_rest_lookup(["retainer"]), + NowCfg = get_http_data(Return), + switch_emqx_retainer(NowCfg, IsEnable); + +switch_emqx_retainer(NowCfg, IsEnable) -> + NewCfg = NowCfg#{<<"enable">> => IsEnable}, + RetainerConf = #{<<"emqx_retainer">> => NewCfg}, + {ok, _} = request_http_rest_update(["retainer"], RetainerConf), + NewCfg. diff --git a/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl b/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl index cec492c6a..19d404010 100644 --- a/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl @@ -27,7 +27,7 @@ init_per_suite(Config) -> %% Meck emqtt ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]), %% Start Apps - emqx_ct_helpers:start_apps([emqx_retainer]), + emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1), Config. end_per_suite(_Config) -> @@ -37,6 +37,16 @@ end_per_suite(_Config) -> %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- +set_special_configs(emqx_retainer) -> + emqx_config:put([emqx_retainer], + #{enable => true, + storage_type => ram, + max_retained_messages => 0, + max_payload_size => 1024 * 1024, + expiry_interval => 0}); + +set_special_configs(_) -> + ok. client_info(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).