diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 18c8b7882..a7bb2c93a 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -104,7 +104,7 @@ parameters() -> })}]. fields(message_summary) -> - [ {id, mk(binary(), #{desc => <<"Message ID">>})} + [ {msgid, mk(binary(), #{desc => <<"Message ID">>})} , {topic, mk(binary(), #{desc => "The topic"})} , {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})} , {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})} diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl index bf0f3947c..a63739cd9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_app.erl +++ b/apps/emqx_retainer/src/emqx_retainer_app.erl @@ -23,10 +23,8 @@ ]). start(_Type, _Args) -> - {ok, Sup} = emqx_retainer_sup:start_link(), - emqx_retainer_cli:load(), - {ok, Sup}. + emqx_retainer_sup:start_link(). + stop(_State) -> - emqx_retainer_cli:unload(). - + ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_cli.erl b/apps/emqx_retainer/src/emqx_retainer_cli.erl deleted file mode 100644 index 028031c06..000000000 --- a/apps/emqx_retainer/src/emqx_retainer_cli.erl +++ /dev/null @@ -1,37 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_retainer_cli). - --include("emqx_retainer.hrl"). - -%% APIs --export([ load/0 - , cmd/1 - , unload/0 - ]). - -load() -> - emqx_ctl:register_command(retainer, {?MODULE, cmd}, []). - -cmd(_) -> - emqx_ctl:usage([{"retainer info", "Show the count of retained messages"}, - {"retainer topics", "Show all topics of retained messages"}, - {"retainer clean", "Clean all retained messages"}, - {"retainer clean ", "Clean retained messages by the specified topic filter"}]). - -unload() -> - emqx_ctl:unregister_command(retainer). diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 2f4c4b594..d57c49799 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -25,6 +25,7 @@ -export([ start_link/2 , dispatch/2 , refresh_limiter/0 + , worker/0 ]). %% gen_server callbacks @@ -50,6 +51,9 @@ refresh_limiter() -> end, Workers). +worker() -> + gproc_pool:pick_worker(?POOL, self()). + %%-------------------------------------------------------------------- %% @doc %% Starts the server @@ -79,6 +83,7 @@ start_link(Pool, Id) -> {stop, Reason :: term()} | ignore. init([Pool, Id]) -> + erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), Limiter = emqx_limiter_server:connect(shared, Bucket), @@ -188,10 +193,6 @@ format_status(_Opt, Status) -> cast(Msg) -> gen_server:cast(worker(), Msg). -%% @private -worker() -> - gproc_pool:pick_worker(?POOL, self()). - -spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. dispatch(Context, Pid, Topic, Cursor, Limiter) -> Mod = emqx_retainer:get_backend_module(), diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index bd144fd84..5cf5ab3cf 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -99,6 +99,10 @@ t_store_and_clean(_) -> <<"this is a retained message">>, [{qos, 0}, {retain, true}]), timer:sleep(100), + + {ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10), + ?assertEqual(1, length(List)), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -109,6 +113,10 @@ t_store_and_clean(_) -> {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(0, length(receive_messages(1))), + ok = emqx_retainer:clean(), + {ok, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10), + ?assertEqual(0, length(List2)), + ok = emqtt:disconnect(C1). t_retain_handling(_) -> @@ -337,6 +345,96 @@ t_flow_control(_) -> timer:sleep(500), ok. +t_clear_expired(_) -> + ConfMod = fun(Conf) -> + Conf#{<<"msg_clear_interval">> := <<"1s">>, <<"msg_expiry_interval">> := <<"3s">>} + end, + + Case = fun() -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + + lists:foreach(fun(I) -> + emqtt:publish(C1, + <<"retained/", (I + 60):8/unsigned-integer>>, + #{'Message-Expiry-Interval' => 3}, + <<"retained">>, + [{qos, 0}, {retain, true}]) + end, + lists:seq(1, 5)), + timer:sleep(1000), + + {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + ?assertEqual(5, erlang:length(List)), + + timer:sleep(4500), + + {ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + ?assertEqual(0, erlang:length(List2)), + + ok = emqtt:disconnect(C1) + end, + with_conf(ConfMod, Case). + +t_max_payload_size(_) -> + ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end, + Case = fun() -> + emqx_retainer:clean(), + timer:sleep(500), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + + emqtt:publish(C1, + <<"retained/1">>, #{}, <<"1234">>, [{qos, 0}, {retain, true}]), + + emqtt:publish(C1, + <<"retained/2">>, #{}, <<"1234567">>, [{qos, 0}, {retain, true}]), + + timer:sleep(500), + {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + ?assertEqual(1, erlang:length(List)), + + ok = emqtt:disconnect(C1) + end, + with_conf(ConfMod, Case). + +t_page_read(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + ok = emqx_retainer:clean(), + timer:sleep(500), + + Fun = fun(I) -> + emqtt:publish(C1, + <<"retained/", (I + 60)>>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ) + end, + lists:foreach(Fun, lists:seq(1, 9)), + timer:sleep(200), + + {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5), + ?assertEqual(5, length(List)), + + {ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5), + ?assertEqual(4, length(List2)), + + ok = emqtt:disconnect(C1). + +t_only_for_coverage(_) -> + ?assertEqual("retainer", emqx_retainer_schema:namespace()), + ignored = gen_server:call(emqx_retainer, unexpected), + ok = gen_server:cast(emqx_retainer, unexpected), + unexpected = erlang:send(erlang:whereis(emqx_retainer), unexpected), + + Dispatcher = emqx_retainer_dispatcher:worker(), + ignored = gen_server:call(Dispatcher, unexpected), + ok = gen_server:cast(Dispatcher, unexpected), + unexpected = erlang:send(Dispatcher, unexpected), + true = erlang:exit(Dispatcher, normal), + ok. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -356,3 +454,15 @@ receive_messages(Count, Msgs) -> after 2000 -> Msgs end. + +with_conf(ConfMod, Case) -> + Conf = emqx:get_raw_config([retainer]), + NewConf = ConfMod(Conf), + emqx_retainer:update_config(NewConf), + try + Case(), + emqx_retainer:update_config(Conf) + catch Type:Error:Strace -> + emqx_retainer:update_config(Conf), + erlang:raise(Type, Error, Strace) + end. diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index 9c541b5d2..91f3104d8 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -22,128 +22,117 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --import(emqx_common_test_http, [ request_api/3 - , request_api/5 - , get_http_data/1 - , create_default_app/0 - , delete_default_app/0 - , default_auth_header/0 - ]). - --define(HOST, "http://127.0.0.1:8081/"). --define(API_VERSION, "v4"). --define(BASE_PATH, "api"). --define(CFG_URI, "/configs/retainer"). +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). +-import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]). all() -> - %% TODO: V5 API - %% emqx_common_test_helpers:all(?MODULE). - []. - -groups() -> - []. + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + application:load(emqx_conf), + ok = ekka:start(), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), + application:stop(emqx_retainer), - emqx_common_test_helpers:start_apps([emqx_retainer, emqx_management], fun set_special_configs/1), - create_default_app(), + emqx_retainer_SUITE:load_base_conf(), + emqx_mgmt_api_test_util:init_suite([emqx_retainer]), Config. -end_per_suite(_Config) -> - delete_default_app(), - emqx_common_test_helpers:stop_apps([emqx_management, emqx_retainer]). +end_per_suite(Config) -> + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + meck:unload(emqx_alarm), + emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]), + Config. init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), + application:ensure_all_started(emqx_retainer), + timer:sleep(500), Config. -set_special_configs(emqx_retainer) -> - emqx_retainer_SUITE:init_emqx_retainer_conf(); -set_special_configs(emqx_management) -> - emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], - applications =>[#{id => "admin", secret => "public"}]}), - ok; -set_special_configs(_) -> - ok. - %%------------------------------------------------------------------------------ %% Test Cases %%------------------------------------------------------------------------------ t_config(_Config) -> - {ok, Return} = request_http_rest_lookup([?CFG_URI]), - NowCfg = get_http_data(Return), - NewCfg = NowCfg#{<<"msg_expiry_interval">> => timer:seconds(60)}, - RetainerConf = #{<<"emqx_retainer">> => NewCfg}, + Path = api_path(["mqtt", "retainer"]), + {ok, ConfJson} = request_api(get, Path), + ReturnConf = decode_json(ConfJson), + ?assertMatch(#{backend := _, enable := _, flow_control := _, + max_payload_size := _, msg_clear_interval := _, + msg_expiry_interval := _}, + ReturnConf), - {ok, _} = request_http_rest_update([?CFG_URI], RetainerConf), - {ok, UpdateReturn} = request_http_rest_lookup(["retainer"]), - ?assertEqual(NewCfg, get_http_data(UpdateReturn)), - ok. + UpdateConf = fun(Enable) -> + RawConf = emqx_json:decode(ConfJson, [return_maps]), + UpdateJson = RawConf#{<<"enable">> := Enable}, + {ok, UpdateResJson} = request_api(put, + Path, [], auth_header_(), UpdateJson), + UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]), + ?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf)) + end, -t_enable_disable(_Config) -> - Conf = switch_emqx_retainer(undefined, true), + UpdateConf(false), + UpdateConf(true). + +t_messages(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqx_retainer:clean(), + timer:sleep(500), + + Each = fun(I) -> + emqtt:publish(C1, <<"retained/", (I + 60)>>, + <<"retained">>, + [{qos, 0}, {retain, true}]) + end, + + lists:foreach(Each, lists:seq(1, 5)), + + {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])), + Msgs = decode_json(MsgsJson), + ?assert(erlang:length(Msgs) >= 5), %% maybe has $SYS messages + + [First | _] = Msgs, + ?assertMatch(#{msgid := _, topic := _, qos := _, + publish_at := _, from_clientid := _, from_username := _ + }, + First), + + ok = emqtt:disconnect(C1). + +t_lookup_and_delete(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), + emqx_retainer:clean(), + timer:sleep(500), - emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), - timer:sleep(100), + emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), - ?assertEqual(1, length(receive_messages(1))), + API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]), + {ok, LookupJson} = request_api(get, API), + LookupResult = decode_json(LookupJson), - _ = switch_emqx_retainer(Conf, false), + ?assertMatch(#{msgid := _, topic := _, qos := _, payload := _, + publish_at := _, from_clientid := _, from_username := _ + }, + LookupResult), - {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>), - emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), - timer:sleep(100), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), - ?assertEqual(0, length(receive_messages(1))), + {ok, []} = request_api(delete, API), + + {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API), ok = emqtt:disconnect(C1). %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- -request_http_rest_lookup(Path) -> - request_api(get, uri([Path]), default_auth_header()). - -request_http_rest_update(Path, Params) -> - request_api(put, uri([Path]), [], default_auth_header(), Params). - -uri(Parts) when is_list(Parts) -> - NParts = [b2l(E) || E <- Parts], - ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). - -%% @private -b2l(B) when is_binary(B) -> - binary_to_list(B); -b2l(L) when is_list(L) -> - L. - -receive_messages(Count) -> - receive_messages(Count, []). -receive_messages(0, Msgs) -> - Msgs; -receive_messages(Count, Msgs) -> - receive - {publish, Msg} -> - ct:log("Msg: ~p ~n", [Msg]), - receive_messages(Count-1, [Msg|Msgs]); - Other -> - ct:log("Other Msg: ~p~n",[Other]), - receive_messages(Count, Msgs) - after 2000 -> - Msgs - end. - -switch_emqx_retainer(undefined, IsEnable) -> - {ok, Return} = request_http_rest_lookup([?COMMON_SHARD]), - NowCfg = get_http_data(Return), - switch_emqx_retainer(NowCfg, IsEnable); - -switch_emqx_retainer(NowCfg, IsEnable) -> - NewCfg = NowCfg#{<<"enable">> => IsEnable}, - RetainerConf = #{<<"emqx_retainer">> => NewCfg}, - {ok, _} = request_http_rest_update([?CFG_URI], RetainerConf), - NewCfg. +decode_json(Data) -> + BinJson = emqx_json:decode(Data, [return_maps]), + emqx_map_lib:unsafe_atom_key_map(BinJson).