From f1acfece6b79ed69b491da03783a7adaa7627b96 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 19 Apr 2022 10:41:54 +0800 Subject: [PATCH] chore(retainer): reformat retainer codes --- apps/emqx_retainer/include/emqx_retainer.hrl | 6 +- apps/emqx_retainer/rebar.config | 44 ++- apps/emqx_retainer/src/emqx_retainer.app.src | 30 +- apps/emqx_retainer/src/emqx_retainer.erl | 220 ++++++------ apps/emqx_retainer/src/emqx_retainer_api.erl | 195 ++++++----- apps/emqx_retainer/src/emqx_retainer_app.erl | 8 +- .../src/emqx_retainer_dispatcher.erl | 136 ++++---- .../src/emqx_retainer_mnesia.erl | 153 +++++---- .../src/emqx_retainer_schema.erl | 134 +++++--- apps/emqx_retainer/src/emqx_retainer_sup.erl | 28 +- .../test/emqx_retainer_SUITE.erl | 319 +++++++++++------- .../test/emqx_retainer_api_SUITE.erl | 84 +++-- .../test/emqx_retainer_cli_SUITE.erl | 1 - .../test/emqx_retainer_mqtt_v5_SUITE.erl | 142 +++++--- 14 files changed, 895 insertions(+), 605 deletions(-) diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl index 7a5d4d83f..95d5eb9fb 100644 --- a/apps/emqx_retainer/include/emqx_retainer.hrl +++ b/apps/emqx_retainer/include/emqx_retainer.hrl @@ -24,8 +24,10 @@ -type payload() :: binary(). -type message() :: #message{}. --type context() :: #{context_id := pos_integer(), - atom() => term()}. +-type context() :: #{ + context_id := pos_integer(), + atom() => term() +}. -define(DELIVER_SEMAPHORE, deliver_remained_quota). -type semaphore() :: ?DELIVER_SEMAPHORE. diff --git a/apps/emqx_retainer/rebar.config b/apps/emqx_retainer/rebar.config index b3f60c616..7e791f90f 100644 --- a/apps/emqx_retainer/rebar.config +++ b/apps/emqx_retainer/rebar.config @@ -1,27 +1,35 @@ %% -*- mode: erlang -*- -{deps, [ {emqx, {path, "../emqx"}} - ]}. +{deps, [{emqx, {path, "../emqx"}}]}. {edoc_opts, [{preprocess, true}]}. -{erl_opts, [warn_unused_vars, - warn_shadow_vars, - warn_unused_import, - warn_obsolete_guard, - debug_info, - {parse_transform}]}. +{erl_opts, [ + warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_obsolete_guard, + debug_info, + {parse_transform} +]}. -{xref_checks, [undefined_function_calls, undefined_functions, - locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions]}. +{xref_checks, [ + undefined_function_calls, + undefined_functions, + locals_not_used, + deprecated_function_calls, + warnings_as_errors, + deprecated_functions +]}. {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. -{profiles, - [{test, - [{deps, - [ - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}}]} - ]} - ]}. +{profiles, [ + {test, [ + {deps, [ + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}} + ]} + ]} +]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index ab5f2e1cd..ea6b58317 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,15 +1,17 @@ %% -*- mode: erlang -*- -{application, emqx_retainer, - [{description, "EMQX Retainer"}, - {vsn, "5.0.0"}, % strict semver, bump manually! - {modules, []}, - {registered, [emqx_retainer_sup]}, - {applications, [kernel,stdlib,emqx]}, - {mod, {emqx_retainer_app,[]}}, - {env, []}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQX Team "]}, - {links, [{"Homepage", "https://emqx.io/"}, - {"Github", "https://github.com/emqx/emqx-retainer"} - ]} - ]}. +{application, emqx_retainer, [ + {description, "EMQX Retainer"}, + % strict semver, bump manually! + {vsn, "5.0.0"}, + {modules, []}, + {registered, [emqx_retainer_sup]}, + {applications, [kernel, stdlib, emqx]}, + {mod, {emqx_retainer_app, []}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQX Team "]}, + {links, [ + {"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx-retainer"} + ]} +]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 247f597b0..51ffe950b 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -23,38 +23,43 @@ -export([start_link/0]). --export([ on_session_subscribed/4 - , on_message_publish/2 - ]). +-export([ + on_session_subscribed/4, + on_message_publish/2 +]). --export([ delete_message/2 - , store_retained/2 - , get_backend_module/0 - ]). +-export([ + delete_message/2, + store_retained/2, + get_backend_module/0 +]). --export([ get_expiry_time/1 - , update_config/1 - , clean/0 - , delete/1 - , page_read/3 - , post_config_update/5 - , stats_fun/0 - ]). +-export([ + get_expiry_time/1, + update_config/1, + clean/0, + delete/1, + page_read/3, + post_config_update/5, + stats_fun/0 +]). %% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). --type state() :: #{ enable := boolean() - , context_id := non_neg_integer() - , context := undefined | context() - , clear_timer := undefined | reference() - }. +-type state() :: #{ + enable := boolean(), + context_id := non_neg_integer(), + context := undefined | context(), + clear_timer := undefined | reference() +}. -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_EXPIRY_INTERVAL, 0). @@ -86,10 +91,14 @@ on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> end. %% RETAIN flag set to 1 and payload containing zero bytes -on_message_publish(Msg = #message{flags = #{retain := true}, - topic = Topic, - payload = <<>>}, - Context) -> +on_message_publish( + Msg = #message{ + flags = #{retain := true}, + topic = Topic, + payload = <<>> + }, + Context +) -> delete_message(Context, Topic), case get_stop_publish_clear_msg() of true -> @@ -97,7 +106,6 @@ on_message_publish(Msg = #message{flags = #{retain := true}, _ -> {ok, Msg} end; - on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> Msg1 = emqx_message:set_header(retained, true, Msg), store_retained(Context, Msg1), @@ -110,14 +118,16 @@ on_message_publish(Msg, _) -> %%-------------------------------------------------------------------- %% @doc Start the retainer --spec(start_link() -> emqx_types:startlink_ret()). +-spec start_link() -> emqx_types:startlink_ret(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> 0; -get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, - timestamp = Ts}) -> +get_expiry_time(#message{ + headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, + timestamp = Ts +}) -> Ts + Interval * 1000; get_expiry_time(#message{timestamp = Ts}) -> Interval = emqx_conf:get([retainer, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), @@ -161,30 +171,26 @@ init([]) -> State = new_state(), #{enable := Enable} = Cfg = emqx:get_config([retainer]), {ok, - case Enable of - true -> - enable_retainer(State, Cfg); - _ -> - State - end}. + case Enable of + true -> + enable_retainer(State, Cfg); + _ -> + State + end}. handle_call({update_config, NewConf, OldConf}, _, State) -> State2 = update_config(State, NewConf, OldConf), {reply, ok, State2}; - handle_call(clean, _, #{context := Context} = State) -> clean(Context), {reply, ok, State}; - handle_call({delete, Topic}, _, #{context := Context} = State) -> delete_message(Context, Topic), {reply, ok, State}; - handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) -> Mod = get_backend_module(), Result = Mod:page_read(Context, Topic, Page, Limit), {reply, Result, State}; - handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -194,7 +200,6 @@ handle_cast(stats_fun, #{context := Context} = State) -> Size = Mod:size(Context), emqx_stats:setstat('retained.count', 'retained.max', Size), {noreply, State}; - handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. @@ -204,7 +209,6 @@ handle_info(clear_expired, #{context := Context} = State) -> Mod:clear_expired(Context), Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -222,11 +226,12 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -spec new_state() -> state(). new_state() -> - #{enable => false, - context_id => 0, - context => undefined, - clear_timer => undefined - }. + #{ + enable => false, + context_id => 0, + context => undefined, + clear_timer => undefined + }. -spec new_context(pos_integer()) -> context(). new_context(Id) -> @@ -249,11 +254,13 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> Size = iolist_size(Payload), case payload_size_limit() of Limit when Limit > 0 andalso Limit < Size -> - ?SLOG(error, #{msg => "retain_failed_for_payload_size_exceeded_limit", - topic => Topic, - config => emqx_hocon:format_path(?MAX_PAYLOAD_SIZE_CONFIG_PATH), - size => Size, - limit => Limit}); + ?SLOG(error, #{ + msg => "retain_failed_for_payload_size_exceeded_limit", + topic => Topic, + config => emqx_hocon:format_path(?MAX_PAYLOAD_SIZE_CONFIG_PATH), + size => Size, + limit => Limit + }); _ -> Mod = get_backend_module(), Mod:store_retained(Context, Msg) @@ -266,23 +273,30 @@ clean(Context) -> -spec update_config(state(), hocons:config(), hocons:config()) -> state(). update_config(State, Conf, OldConf) -> - update_config(maps:get(enable, Conf), - maps:get(enable, OldConf), - State, - Conf, - OldConf). + update_config( + maps:get(enable, Conf), + maps:get(enable, OldConf), + State, + Conf, + OldConf + ). -spec update_config(boolean(), boolean(), state(), hocons:config(), hocons:config()) -> state(). update_config(false, _, State, _, _) -> disable_retainer(State); - update_config(true, false, State, NewConf, _) -> enable_retainer(State, NewConf); - -update_config(true, true, - #{clear_timer := ClearTimer} = State, NewConf, OldConf) -> - #{backend := BackendCfg, - msg_clear_interval := ClearInterval} = NewConf, +update_config( + true, + true, + #{clear_timer := ClearTimer} = State, + NewConf, + OldConf +) -> + #{ + backend := BackendCfg, + msg_clear_interval := ClearInterval + } = NewConf, #{backend := OldBackendCfg} = OldConf, @@ -290,34 +304,49 @@ update_config(true, true, OldStrorageType = maps:get(type, OldBackendCfg), case OldStrorageType of StorageType -> - State#{clear_timer := check_timer(ClearTimer, - ClearInterval, - clear_expired)}; + State#{ + clear_timer := check_timer( + ClearTimer, + ClearInterval, + clear_expired + ) + }; _ -> State2 = disable_retainer(State), enable_retainer(State2, NewConf) end. -spec enable_retainer(state(), hocon:config()) -> state(). -enable_retainer(#{context_id := ContextId} = State, - #{msg_clear_interval := ClearInterval, - backend := BackendCfg}) -> +enable_retainer( + #{context_id := ContextId} = State, + #{ + msg_clear_interval := ClearInterval, + backend := BackendCfg + } +) -> NewContextId = ContextId + 1, Context = create_resource(new_context(NewContextId), BackendCfg), load(Context), - State#{enable := true, - context_id := NewContextId, - context := Context, - clear_timer := add_timer(ClearInterval, clear_expired)}. + State#{ + enable := true, + context_id := NewContextId, + context := Context, + clear_timer := add_timer(ClearInterval, clear_expired) + }. -spec disable_retainer(state()) -> state(). -disable_retainer(#{clear_timer := ClearTimer, - context := Context} = State) -> +disable_retainer( + #{ + clear_timer := ClearTimer, + context := Context + } = State +) -> unload(), ok = close_resource(Context), - State#{enable := false, - clear_timer := stop_timer(ClearTimer) - }. + State#{ + enable := false, + clear_timer := stop_timer(ClearTimer) + }. -spec stop_timer(undefined | reference()) -> undefined. stop_timer(undefined) -> @@ -344,24 +373,27 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - ModName = case emqx:get_config([retainer, backend]) of - #{type := built_in_database} -> mnesia; - #{type := Backend} -> Backend - end, + ModName = + case emqx:get_config([retainer, backend]) of + #{type := built_in_database} -> mnesia; + #{type := Backend} -> Backend + end, erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])). create_resource(Context, #{type := built_in_database} = Cfg) -> emqx_retainer_mnesia:create_resource(Cfg), Context; - create_resource(Context, #{type := DB} = Config) -> ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]), - case emqx_resource:create( - ResourceID, - <<"emqx_retainer">>, - list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), - Config, - #{}) of + case + emqx_resource:create( + ResourceID, + <<"emqx_retainer">>, + list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), + Config, + #{} + ) + of {ok, already_created} -> Context#{resource_id => ResourceID}; {ok, _} -> diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index db7c25cbb..d85db4ae6 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -24,14 +24,17 @@ %% API -export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]). --export([ lookup_retained_warp/2 - , with_topic_warp/2 - , config/2]). +-export([ + lookup_retained_warp/2, + with_topic_warp/2, + config/2 +]). -import(hoconsc, [mk/2, ref/1, ref/2, array/1]). -import(emqx_dashboard_swagger, [error_codes/2]). --define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 +%% 1MB = 1024 x 1024 +-define(MAX_PAYLOAD_SIZE, 1048576). -define(PREFIX, "/mqtt/retainer"). -define(TAGS, [<<"retainer">>]). @@ -44,52 +47,65 @@ paths() -> [?PREFIX, ?PREFIX ++ "/messages", ?PREFIX ++ "/message/:topic"]. schema(?PREFIX) -> - #{'operationId' => config, - get => #{tags => ?TAGS, - description => <<"Get retainer config">>, - responses => #{200 => mk(conf_schema(), #{desc => "The config content"}), - 404 => error_codes(['NOT_FOUND'], <<"Config not found">>) - } - }, - put => #{tags => ?TAGS, - description => <<"Update retainer config">>, - 'requestBody' => mk(conf_schema(), #{desc => "The config content"}), - responses => #{200 => mk(conf_schema(), #{desc => "Update configs successfully"}), - 400 => error_codes(['UPDATE_FAILED'], <<"Update config failed">>) - } - } - }; - + #{ + 'operationId' => config, + get => #{ + tags => ?TAGS, + description => <<"Get retainer config">>, + responses => #{ + 200 => mk(conf_schema(), #{desc => "The config content"}), + 404 => error_codes(['NOT_FOUND'], <<"Config not found">>) + } + }, + put => #{ + tags => ?TAGS, + description => <<"Update retainer config">>, + 'requestBody' => mk(conf_schema(), #{desc => "The config content"}), + responses => #{ + 200 => mk(conf_schema(), #{desc => "Update configs successfully"}), + 400 => error_codes(['UPDATE_FAILED'], <<"Update config failed">>) + } + } + }; schema(?PREFIX ++ "/messages") -> - #{'operationId' => lookup_retained_warp, - get => #{tags => ?TAGS, - description => <<"List retained messages">>, - parameters => page_params(), - responses => #{200 => mk(array(ref(message_summary)), #{desc => "The result list"}), - 400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>) - } - } - }; - + #{ + 'operationId' => lookup_retained_warp, + get => #{ + tags => ?TAGS, + description => <<"List retained messages">>, + parameters => page_params(), + responses => #{ + 200 => mk(array(ref(message_summary)), #{desc => "The result list"}), + 400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>) + } + } + }; schema(?PREFIX ++ "/message/:topic") -> - #{'operationId' => with_topic_warp, - get => #{tags => ?TAGS, - description => <<"Lookup a message by a topic without wildcards">>, - parameters => parameters(), - responses => #{200 => mk(ref(message), #{desc => "Details of the message"}), - 404 => error_codes(['NOT_FOUND'], <<"Viewed message doesn't exist">>), - 400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>) - } - }, - delete => #{tags => ?TAGS, - description => <<"Delete matching messages">>, - parameters => parameters(), - responses => #{204 => <<>>, - 400 => error_codes(['BAD_REQUEST'], - <<"Unsupported backend">>) - } - } - }. + #{ + 'operationId' => with_topic_warp, + get => #{ + tags => ?TAGS, + description => <<"Lookup a message by a topic without wildcards">>, + parameters => parameters(), + responses => #{ + 200 => mk(ref(message), #{desc => "Details of the message"}), + 404 => error_codes(['NOT_FOUND'], <<"Viewed message doesn't exist">>), + 400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>) + } + }, + delete => #{ + tags => ?TAGS, + description => <<"Delete matching messages">>, + parameters => parameters(), + responses => #{ + 204 => <<>>, + 400 => error_codes( + ['BAD_REQUEST'], + <<"Unsupported backend">> + ) + } + } + }. page_params() -> emqx_dashboard_swagger:fields(page) ++ emqx_dashboard_swagger:fields(limit). @@ -98,23 +114,29 @@ conf_schema() -> ref(emqx_retainer_schema, "retainer"). parameters() -> - [{topic, mk(binary(), #{in => path, - required => true, - desc => "The target topic" - })}]. + [ + {topic, + mk(binary(), #{ + in => path, + required => true, + desc => "The target topic" + })} + ]. fields(message_summary) -> - [ {msgid, mk(binary(), #{desc => <<"Message ID">>})} - , {topic, mk(binary(), #{desc => "The topic"})} - , {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})} - , {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})} - , {from_clientid, mk(binary(), #{desc => "Publisher ClientId"})} - , {from_username, mk(binary(), #{desc => "Publisher Username"})} + [ + {msgid, mk(binary(), #{desc => <<"Message ID">>})}, + {topic, mk(binary(), #{desc => "The topic"})}, + {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})}, + {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})}, + {from_clientid, mk(binary(), #{desc => "Publisher ClientId"})}, + {from_username, mk(binary(), #{desc => "Publisher Username"})} ]; - fields(message) -> - [{payload, mk(binary(), #{desc => "The payload content"})} | - fields(message_summary)]. + [ + {payload, mk(binary(), #{desc => "The payload content"})} + | fields(message_summary) + ]. lookup_retained_warp(Type, Params) -> check_backend(Type, Params, fun lookup_retained/2). @@ -124,15 +146,16 @@ with_topic_warp(Type, Params) -> config(get, _) -> {200, emqx:get_raw_config([retainer])}; - config(put, #{body := Body}) -> try {ok, _} = emqx_retainer:update_config(Body), {200, emqx:get_raw_config([retainer])} - catch _:Reason:_ -> - {400, - #{code => <<"UPDATE_FAILED">>, - message => iolist_to_binary(io_lib:format("~p~n", [Reason]))}} + catch + _:Reason:_ -> + {400, #{ + code => <<"UPDATE_FAILED">>, + message => iolist_to_binary(io_lib:format("~p~n", [Reason])) + }} end. %%------------------------------------------------------------------------------ @@ -151,26 +174,36 @@ with_topic(get, #{bindings := Bindings}) -> [H | _] -> {200, format_detail_message(H)}; _ -> - {404, #{code => <<"NOT_FOUND">>, - message => <<"Viewed message doesn't exist">> - }} + {404, #{ + code => <<"NOT_FOUND">>, + message => <<"Viewed message doesn't exist">> + }} end; - with_topic(delete, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), emqx_retainer_mnesia:delete_message(undefined, Topic), {204}. -format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From - , timestamp = Timestamp, headers = Headers}) -> - #{msgid => emqx_guid:to_hexstr(ID), - qos => Qos, - topic => Topic, - publish_at => list_to_binary(calendar:system_time_to_rfc3339( - Timestamp, [{unit, millisecond}])), - from_clientid => to_bin_string(From), - from_username => maps:get(username, Headers, <<>>) - }. +format_message(#message{ + id = ID, + qos = Qos, + topic = Topic, + from = From, + timestamp = Timestamp, + headers = Headers +}) -> + #{ + msgid => emqx_guid:to_hexstr(ID), + qos => Qos, + topic => Topic, + publish_at => list_to_binary( + calendar:system_time_to_rfc3339( + Timestamp, [{unit, millisecond}] + ) + ), + from_clientid => to_bin_string(From), + from_username => maps:get(username, Headers, <<>>) + }. format_detail_message(#message{payload = Payload} = Msg) -> Base = format_message(Msg), @@ -183,7 +216,7 @@ format_detail_message(#message{payload = Payload} = Msg) -> to_bin_string(Data) when is_binary(Data) -> Data; -to_bin_string(Data) -> +to_bin_string(Data) -> list_to_binary(io_lib:format("~p", [Data])). check_backend(Type, Params, Cont) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl index a63739cd9..2a7620664 100644 --- a/apps/emqx_retainer/src/emqx_retainer_app.erl +++ b/apps/emqx_retainer/src/emqx_retainer_app.erl @@ -18,13 +18,13 @@ -behaviour(application). --export([ start/2 - , stop/1 - ]). +-export([ + start/2, + stop/1 +]). start(_Type, _Args) -> emqx_retainer_sup:start_link(). - stop(_State) -> ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 6eb5457b7..1bdf11432 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -22,15 +22,23 @@ -include_lib("emqx/include/logger.hrl"). %% API --export([ start_link/2 - , dispatch/2 - , refresh_limiter/0 - , worker/0 - ]). +-export([ + start_link/2, + dispatch/2, + refresh_limiter/0, + worker/0 +]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, format_status/2]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). -type limiter() :: emqx_htb_limiter:limiter(). @@ -46,10 +54,12 @@ dispatch(Context, Topic) -> %% an limiter update handler maybe added later, now this is a workaround refresh_limiter() -> Workers = gproc_pool:active_workers(?POOL), - lists:foreach(fun({_, Pid}) -> - gen_server:cast(Pid, ?FUNCTION_NAME) - end, - Workers). + lists:foreach( + fun({_, Pid}) -> + gen_server:cast(Pid, ?FUNCTION_NAME) + end, + Workers + ). worker() -> gproc_pool:pick_worker(?POOL, self()). @@ -59,13 +69,18 @@ worker() -> %% Starts the server %% @end %%-------------------------------------------------------------------- --spec start_link(atom(), pos_integer()) -> {ok, Pid :: pid()} | - {error, Error :: {already_started, pid()}} | - {error, Error :: term()} | - ignore. +-spec start_link(atom(), pos_integer()) -> + {ok, Pid :: pid()} + | {error, Error :: {already_started, pid()}} + | {error, Error :: term()} + | ignore. start_link(Pool, Id) -> - gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, - ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + gen_server:start_link( + {local, emqx_misc:proc_name(?MODULE, Id)}, + ?MODULE, + [Pool, Id], + [{hibernate_after, 1000}] + ). %%%=================================================================== %%% gen_server callbacks @@ -77,11 +92,12 @@ start_link(Pool, Id) -> %% Initializes the server %% @end %%-------------------------------------------------------------------- --spec init(Args :: term()) -> {ok, State :: term()} | - {ok, State :: term(), Timeout :: timeout()} | - {ok, State :: term(), hibernate} | - {stop, Reason :: term()} | - ignore. +-spec init(Args :: term()) -> + {ok, State :: term()} + | {ok, State :: term(), Timeout :: timeout()} + | {ok, State :: term(), hibernate} + | {stop, Reason :: term()} + | ignore. init([Pool, Id]) -> erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), @@ -96,14 +112,14 @@ init([Pool, Id]) -> %% @end %%-------------------------------------------------------------------- -spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> - {reply, Reply :: term(), NewState :: term()} | - {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | - {reply, Reply :: term(), NewState :: term(), hibernate} | - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: term()} | - {stop, Reason :: term(), NewState :: term()}. + {reply, Reply :: term(), NewState :: term()} + | {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} + | {reply, Reply :: term(), NewState :: term(), hibernate} + | {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: term(), Reply :: term(), NewState :: term()} + | {stop, Reason :: term(), NewState :: term()}. handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -115,19 +131,17 @@ handle_call(Req, _From, State) -> %% @end %%-------------------------------------------------------------------- -spec handle_cast(Request :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), NewState :: term()}. + {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: term(), NewState :: term()}. handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {noreply, State#{limiter := Limiter2}}; - handle_cast(refresh_limiter, State) -> BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), Limiter = emqx_limiter_server:connect(batch, BucketName), {noreply, State#{limiter := Limiter}}; - handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. @@ -139,10 +153,10 @@ handle_cast(Msg, State) -> %% @end %%-------------------------------------------------------------------- -spec handle_info(Info :: timeout() | term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: normal | term(), NewState :: term()}. + {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: normal | term(), NewState :: term()}. handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -156,8 +170,10 @@ handle_info(Info, State) -> %% with Reason. The return value is ignored. %% @end %%-------------------------------------------------------------------- --spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), - State :: term()) -> any(). +-spec terminate( + Reason :: normal | shutdown | {shutdown, term()} | term(), + State :: term() +) -> any(). terminate(_Reason, #{pool := Pool, id := Id}) -> gproc_pool:disconnect_worker(Pool, {Pool, Id}). %%-------------------------------------------------------------------- @@ -166,10 +182,13 @@ terminate(_Reason, #{pool := Pool, id := Id}) -> %% Convert process state when code is changed %% @end %%-------------------------------------------------------------------- --spec code_change(OldVsn :: term() | {down, term()}, - State :: term(), - Extra :: term()) -> {ok, NewState :: term()} | - {error, Reason :: term()}. +-spec code_change( + OldVsn :: term() | {down, term()}, + State :: term(), + Extra :: term() +) -> + {ok, NewState :: term()} + | {error, Reason :: term()}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -181,8 +200,10 @@ code_change(_OldVsn, State, _Extra) -> %% or when it appears in termination error logs. %% @end %%-------------------------------------------------------------------- --spec format_status(Opt :: normal | terminate, - Status :: list()) -> Status :: term(). +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). format_status(_Opt, Status) -> Status. @@ -200,19 +221,17 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) -> false -> {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), deliver(Result, Context, Pid, Topic, undefined, Limiter); - true -> + true -> {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), deliver(Result, Context, Pid, Topic, NewCursor, Limiter) end. -spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> - {ok, limiter()}. + {ok, limiter()}. deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> {ok, Limiter}; - deliver([], Context, Pid, Topic, Cursor, Limiter) -> dispatch(Context, Pid, Topic, Cursor, Limiter); - deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> case erlang:is_process_alive(Pid) of false -> @@ -235,7 +254,6 @@ deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> {ok, Limiter}; - do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), case emqx_htb_limiter:consume(Num, Limiter) of @@ -243,17 +261,17 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> do_deliver(ToDelivers, Pid, Topic), do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); {drop, _} = Drop -> - ?SLOG(error, #{msg => "retained_message_dropped", - reason => "reached_ratelimit", - dropped_count => length(ToDelivers) - }), + ?SLOG(error, #{ + msg => "retained_message_dropped", + reason => "reached_ratelimit", + dropped_count => length(ToDelivers) + }), Drop end. do_deliver([Msg | T], Pid, Topic) -> Pid ! {deliver, Topic, Msg}, do_deliver(T, Pid, Topic); - do_deliver([], _, _) -> ok. @@ -262,9 +280,7 @@ safe_split(N, List) -> safe_split(0, List, Count, Acc) -> {Count, lists:reverse(Acc), List}; - safe_split(_N, [], Count, Acc) -> {Count, lists:reverse(Acc), []}; - safe_split(N, [H | T], Count, Acc) -> safe_split(N - 1, T, Count + 1, [H | Acc]). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 93f5251a6..fd76442b9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -23,83 +23,99 @@ -include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/qlc.hrl"). - --export([ delete_message/2 - , store_retained/2 - , read_message/2 - , page_read/4 - , match_messages/3 - , clear_expired/1 - , clean/1 - , size/1 - ]). +-export([ + delete_message/2, + store_retained/2, + read_message/2, + page_read/4, + match_messages/3, + clear_expired/1, + clean/1, + size/1 +]). -export([create_resource/1]). -record(retained, {topic, msg, expiry_time}). -type batch_read_result() :: - {ok, list(emqx:message()), cursor()}. + {ok, list(emqx:message()), cursor()}. %%-------------------------------------------------------------------- %% emqx_retainer_storage callbacks %%-------------------------------------------------------------------- create_resource(#{storage_type := StorageType}) -> - Copies = case StorageType of - ram -> ram_copies; - disc -> disc_copies - end, + Copies = + case StorageType of + ram -> ram_copies; + disc -> disc_copies + end, - StoreProps = [{ets, [compressed, - {read_concurrency, true}, - {write_concurrency, true}]}, - {dets, [{auto_save, 1000}]}], + StoreProps = [ + {ets, [ + compressed, + {read_concurrency, true}, + {write_concurrency, true} + ]}, + {dets, [{auto_save, 1000}]} + ], ok = mria:create_table(?TAB, [ - {type, ordered_set}, - {rlog_shard, ?RETAINER_SHARD}, - {storage, Copies}, - {record_name, retained}, - {attributes, record_info(fields, retained)}, - {storage_properties, StoreProps} - ]), + {type, ordered_set}, + {rlog_shard, ?RETAINER_SHARD}, + {storage, Copies}, + {record_name, retained}, + {attributes, record_info(fields, retained)}, + {storage_properties, StoreProps} + ]), ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity), case mnesia:table_info(?TAB, storage_type) of - Copies -> ok; + Copies -> + ok; _Other -> {atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies), ok end. -store_retained(_, Msg =#message{topic = Topic}) -> +store_retained(_, Msg = #message{topic = Topic}) -> ExpiryTime = emqx_retainer:get_expiry_time(Msg), case is_table_full() of false -> - mria:dirty_write(?TAB, - #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = ExpiryTime}); + mria:dirty_write( + ?TAB, + #retained{ + topic = topic2tokens(Topic), + msg = Msg, + expiry_time = ExpiryTime + } + ); _ -> Tokens = topic2tokens(Topic), Fun = fun() -> - case mnesia:read(?TAB, Tokens) of - [_] -> - mnesia:write(?TAB, - #retained{topic = Tokens, - msg = Msg, - expiry_time = ExpiryTime}, - write); - [] -> - mnesia:abort(table_is_full) - end + case mnesia:read(?TAB, Tokens) of + [_] -> + mnesia:write( + ?TAB, + #retained{ + topic = Tokens, + msg = Msg, + expiry_time = ExpiryTime + }, + write + ); + [] -> + mnesia:abort(table_is_full) + end end, case mria:transaction(?RETAINER_SHARD, Fun) of - {atomic, ok} -> ok; + {atomic, ok} -> + ok; {aborted, Reason} -> - ?SLOG(error, #{ msg => "failed_to_retain_message" - , topic => Topic - , reason => Reason - }) + ?SLOG(error, #{ + msg => "failed_to_retain_message", + topic => Topic, + reason => Reason + }) end end. @@ -108,20 +124,21 @@ clear_expired(_) -> MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'}, Ms = [{MsHd, [{'=/=', '$3', 0}, {'<', '$3', NowMs}], ['$1']}], Fun = fun() -> - Keys = mnesia:select(?TAB, Ms, write), - lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) - end, + Keys = mnesia:select(?TAB, Ms, write), + lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) + end, {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun), ok. delete_message(_, Topic) -> case emqx_topic:wildcard(Topic) of - true -> match_delete_messages(Topic); + true -> + match_delete_messages(Topic); false -> Tokens = topic2tokens(Topic), Fun = fun() -> - mnesia:delete({?TAB, Tokens}) - end, + mnesia:delete({?TAB, Tokens}) + end, _ = mria:transaction(?RETAINER_SHARD, Fun), ok end, @@ -169,8 +186,7 @@ size(_) -> %%-------------------------------------------------------------------- sort_retained([]) -> []; sort_retained([Msg]) -> [Msg]; -sort_retained(Msgs) -> - lists:sort(fun compare_message/2, Msgs). +sort_retained(Msgs) -> lists:sort(fun compare_message/2, Msgs). compare_message(M1, M2) -> M1#message.timestamp =< M2#message.timestamp. @@ -194,12 +210,13 @@ batch_read_messages(Cursor, MaxReadNum) -> {ok, Answers, Cursor} end. --spec(read_messages(emqx_types:topic()) - -> [emqx_types:message()]). +-spec read_messages(emqx_types:topic()) -> + [emqx_types:message()]. read_messages(Topic) -> Tokens = topic2tokens(Topic), case mnesia:dirty_read(?TAB, Tokens) of - [] -> []; + [] -> + []; [#retained{msg = Msg, expiry_time = Et}] -> case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of true -> [Msg]; @@ -207,13 +224,13 @@ read_messages(Topic) -> end end. --spec(match_messages(emqx_types:topic()) - -> [emqx_types:message()]). +-spec match_messages(emqx_types:topic()) -> + [emqx_types:message()]. match_messages(Filter) -> Ms = make_match_spec(Filter), mnesia:dirty_select(?TAB, Ms). --spec(match_delete_messages(emqx_types:topic()) -> ok). +-spec match_delete_messages(emqx_types:topic()) -> ok. match_delete_messages(Filter) -> Cond = condition(emqx_topic:words(Filter)), MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, @@ -223,7 +240,13 @@ match_delete_messages(Filter) -> %% @private condition(Ws) -> - Ws1 = [case W =:= '+' of true -> '_'; _ -> W end || W <- Ws], + Ws1 = [ + case W =:= '+' of + true -> '_'; + _ -> W + end + || W <- Ws + ], case lists:last(Ws1) =:= '#' of false -> Ws1; _ -> (Ws1 -- ['#']) ++ '_' @@ -240,8 +263,10 @@ make_match_spec(Topic) -> condition(emqx_topic:words(Topic)) end, MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'}, - [{MsHd, [{'=:=', '$3', 0}], ['$2']}, - {MsHd, [{'>', '$3', NowMs}], ['$2']}]. + [ + {MsHd, [{'=:=', '$3', 0}], ['$2']}, + {MsHd, [{'>', '$3', NowMs}], ['$2']} + ]. -spec make_cursor(undefined | topic()) -> qlc:query_cursor(). make_cursor(Topic) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 74732bf9b..4940a5c41 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -11,56 +11,84 @@ namespace() -> "retainer". roots() -> ["retainer"]. fields("retainer") -> - [ {enable, sc(boolean(), "Enable retainer feature.", false)} - , {msg_expiry_interval, sc(emqx_schema:duration_ms(), - "Message retention time. 0 means message will never be expired.", - "0s")} - , {msg_clear_interval, sc(emqx_schema:duration_ms(), - "Periodic interval for cleaning up expired messages. " - "Never clear if the value is 0.", - "0s")} - , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} - , {max_payload_size, sc(emqx_schema:bytesize(), - "Maximum retained message size.", - "1MB")} - , {stop_publish_clear_msg, sc(boolean(), - "When the retained flag of the `PUBLISH` message is set and Payload is empty, " - "whether to continue to publish the message.
" - "See: " - "http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038", - false)} - , {backend, backend_config()} + [ + {enable, sc(boolean(), "Enable retainer feature.", false)}, + {msg_expiry_interval, + sc( + emqx_schema:duration_ms(), + "Message retention time. 0 means message will never be expired.", + "0s" + )}, + {msg_clear_interval, + sc( + emqx_schema:duration_ms(), + "Periodic interval for cleaning up expired messages. " + "Never clear if the value is 0.", + "0s" + )}, + {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}, + {max_payload_size, + sc( + emqx_schema:bytesize(), + "Maximum retained message size.", + "1MB" + )}, + {stop_publish_clear_msg, + sc( + boolean(), + "When the retained flag of the `PUBLISH` message is set and Payload is empty, " + "whether to continue to publish the message.
" + "See: " + "http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038", + false + )}, + {backend, backend_config()} ]; - fields(mnesia_config) -> - [ {type, hoconsc:mk(hoconsc:union([built_in_database]), #{desc => "Backend type."})} - , {storage_type, sc(hoconsc:union([ram, disc]), - "Specifies whether the messages are stored in RAM or persisted on disc.", - ram)} - , {max_retained_messages, sc(integer(), - "Maximum number of retained messages. 0 means no limit.", - 0, - fun is_pos_integer/1)} + [ + {type, hoconsc:mk(hoconsc:union([built_in_database]), #{desc => "Backend type."})}, + {storage_type, + sc( + hoconsc:union([ram, disc]), + "Specifies whether the messages are stored in RAM or persisted on disc.", + ram + )}, + {max_retained_messages, + sc( + integer(), + "Maximum number of retained messages. 0 means no limit.", + 0, + fun is_pos_integer/1 + )} ]; - fields(flow_control) -> - [ {batch_read_number, sc(integer(), - "Size of the batch when reading messages from storage. 0 means no limit.", - 0, - fun is_pos_integer/1)} - , {batch_deliver_number, sc(range(0, 1000), - "The number of retained messages can be delivered per batch.", - 0)} - , {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), - "The rate limiter name for retained messages' delivery.
" - "Limiter helps to avoid delivering too many messages to the client at once, " - "which may cause the client " - "to block or crash, or drop messages due to exceeding the size of the message" - " queue.
" - "The names of the available rate limiters are taken from the existing rate " - "limiters under `limiter.batch`.
" - "If this field is empty, limiter is not used.", - undefined)} + [ + {batch_read_number, + sc( + integer(), + "Size of the batch when reading messages from storage. 0 means no limit.", + 0, + fun is_pos_integer/1 + )}, + {batch_deliver_number, + sc( + range(0, 1000), + "The number of retained messages can be delivered per batch.", + 0 + )}, + {batch_deliver_limiter, + sc( + emqx_limiter_schema:bucket_name(), + "The rate limiter name for retained messages' delivery.
" + "Limiter helps to avoid delivering too many messages to the client at once, " + "which may cause the client " + "to block or crash, or drop messages due to exceeding the size of the message" + " queue.
" + "The names of the available rate limiters are taken from the existing rate " + "limiters under `limiter.batch`.
" + "If this field is empty, limiter is not used.", + undefined + )} ]. desc("retainer") -> @@ -79,15 +107,17 @@ sc(Type, Desc, Default) -> hoconsc:mk(Type, #{default => Default, desc => Desc}). sc(Type, Desc, Default, Validator) -> - hoconsc:mk(Type, #{default => Default, - desc => Desc, - validator => Validator}). + hoconsc:mk(Type, #{ + default => Default, + desc => Desc, + validator => Validator + }). is_pos_integer(V) -> V >= 0. backend_config() -> #{ - type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)]), - desc => "Settings for the database storing the retained messages." - }. + type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)]), + desc => "Settings for the database storing the retained messages." + }. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index 5073bb987..9938f9881 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -26,13 +26,21 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - PoolSpec = emqx_pool_sup:spec([emqx_retainer_dispatcher, hash, emqx_vm:schedulers(), - {emqx_retainer_dispatcher, start_link, []}]), - {ok, {{one_for_one, 10, 3600}, - [#{id => retainer, - start => {emqx_retainer, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_retainer]}, - PoolSpec]}}. + PoolSpec = emqx_pool_sup:spec([ + emqx_retainer_dispatcher, + hash, + emqx_vm:schedulers(), + {emqx_retainer_dispatcher, start_link, []} + ]), + {ok, + {{one_for_one, 10, 3600}, [ + #{ + id => retainer, + start => {emqx_retainer, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_retainer] + }, + PoolSpec + ]}}. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 9e957f214..30201bdef 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -27,23 +27,27 @@ all() -> emqx_common_test_helpers:all(?MODULE). --define(BASE_CONF, <<""" -retainer { - enable = true - msg_clear_interval = 0s - msg_expiry_interval = 0s - max_payload_size = 1MB - flow_control { - batch_read_number = 0 - batch_deliver_number = 0 - batch_deliver_limiter = retainer - } - backend { - type = built_in_database - storage_type = ram - max_retained_messages = 0 - } -}""">>). +-define(BASE_CONF, << + "" + "\n" + "retainer {\n" + " enable = true\n" + " msg_clear_interval = 0s\n" + " msg_expiry_interval = 0s\n" + " max_payload_size = 1MB\n" + " flow_control {\n" + " batch_read_number = 0\n" + " batch_deliver_number = 0\n" + " batch_deliver_limiter = retainer\n" + " }\n" + " backend {\n" + " type = built_in_database\n" + " storage_type = ram\n" + " max_retained_messages = 0\n" + " }\n" + "}" + "" +>>). %%-------------------------------------------------------------------- %% Setups @@ -82,9 +86,11 @@ t_store_and_clean(_) -> {ok, _} = emqtt:connect(C1), emqtt:publish( - C1, <<"retained">>, - <<"this is a retained message">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained">>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ), timer:sleep(100), {ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10), @@ -121,9 +127,11 @@ t_retain_handling(_) -> {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>), emqtt:publish( - C1, <<"retained">>, - <<"this is a retained message">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained">>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -154,20 +162,26 @@ t_wildcard_subscription(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( - C1, <<"retained/0">>, - <<"this is a retained message 0">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/1">>, - <<"this is a retained message 1">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/a/b/c">>, - <<"this is a retained message 2">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/a/b/c">>, + <<"this is a retained message 2">>, + [{qos, 0}, {retain, true}] + ), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0), ?assertEqual(3, length(receive_messages(3))), emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), @@ -180,25 +194,38 @@ t_message_expiry(_) -> {ok, _} = emqtt:connect(C1), emqtt:publish( - C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, - <<"don't expire">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/0">>, + #{'Message-Expiry-Interval' => 0}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, - <<"expire">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/1">>, + #{'Message-Expiry-Interval' => 2}, + <<"expire">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, - <<"don't expire">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/2">>, + #{'Message-Expiry-Interval' => 5}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/3">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/3">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"$SYS/retained/4">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}]), + C1, + <<"$SYS/retained/4">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), @@ -240,17 +267,23 @@ t_clean(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( - C1, <<"retained/0">>, - <<"this is a retained message 0">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/1">>, - <<"this is a retained message 1">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/test/0">>, - <<"this is a retained message 2">>, - [{qos, 0}, {retain, true}]), + C1, + <<"retained/test/0">>, + <<"this is a retained message 2">>, + [{qos, 0}, {retain, true}] + ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), @@ -266,10 +299,11 @@ t_stop_publish_clear_msg(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( - C1, <<"retained/0">>, - <<"this is a retained message 0">>, - [{qos, 0}, {retain, true}] - ), + C1, + <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -282,9 +316,13 @@ t_stop_publish_clear_msg(_) -> t_flow_control(_) -> #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]), - RetainerCfg2 = RetainerCfg#{per_client := - PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), - capacity := 1}}, + RetainerCfg2 = RetainerCfg#{ + per_client := + PerClient#{ + rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), + capacity := 1 + } + }, emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2), emqx_limiter_manager:restart_server(shared), timer:sleep(500), @@ -292,35 +330,44 @@ t_flow_control(_) -> emqx_retainer_dispatcher:refresh_limiter(), timer:sleep(500), - emqx_retainer:update_config(#{<<"flow_control">> => - #{<<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1, - <<"batch_deliver_limiter">> => retainer}}), + emqx_retainer:update_config(#{ + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1, + <<"batch_deliver_limiter">> => retainer + } + }), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( - C1, <<"retained/0">>, - <<"this is a retained message 0">>, - [{qos, 0}, {retain, true}] - ), + C1, + <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/1">>, - <<"this is a retained message 1">>, - [{qos, 0}, {retain, true}] - ), + C1, + <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}] + ), emqtt:publish( - C1, <<"retained/3">>, - <<"this is a retained message 3">>, - [{qos, 0}, {retain, true}] - ), + C1, + <<"retained/3">>, + <<"this is a retained message 3">>, + [{qos, 0}, {retain, true}] + ), Begin = erlang:system_time(millisecond), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), End = erlang:system_time(millisecond), Diff = End - Begin, - ?assert(Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), - lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))), + ?assert( + Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), + lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) + ), ok = emqtt:disconnect(C1), @@ -335,56 +382,72 @@ t_flow_control(_) -> t_clear_expired(_) -> ConfMod = fun(Conf) -> - Conf#{<<"msg_clear_interval">> := <<"1s">>, - <<"msg_expiry_interval">> := <<"3s">>} - end, + Conf#{ + <<"msg_clear_interval">> := <<"1s">>, + <<"msg_expiry_interval">> := <<"3s">> + } + end, Case = fun() -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), - lists:foreach(fun(I) -> - emqtt:publish(C1, - <<"retained/", (I + 60):8/unsigned-integer>>, - #{'Message-Expiry-Interval' => 3}, - <<"retained">>, - [{qos, 0}, {retain, true}]) - end, - lists:seq(1, 5)), - timer:sleep(1000), + lists:foreach( + fun(I) -> + emqtt:publish( + C1, + <<"retained/", (I + 60):8/unsigned-integer>>, + #{'Message-Expiry-Interval' => 3}, + <<"retained">>, + [{qos, 0}, {retain, true}] + ) + end, + lists:seq(1, 5) + ), + timer:sleep(1000), - {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), - ?assertEqual(5, erlang:length(List)), + {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + ?assertEqual(5, erlang:length(List)), - timer:sleep(4500), + timer:sleep(4500), - {ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), - ?assertEqual(0, erlang:length(List2)), + {ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + ?assertEqual(0, erlang:length(List2)), - ok = emqtt:disconnect(C1) - end, + ok = emqtt:disconnect(C1) + end, with_conf(ConfMod, Case). t_max_payload_size(_) -> ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end, Case = fun() -> - emqx_retainer:clean(), - timer:sleep(500), - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), + emqx_retainer:clean(), + timer:sleep(500), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, - <<"retained/1">>, #{}, <<"1234">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, + <<"retained/1">>, + #{}, + <<"1234">>, + [{qos, 0}, {retain, true}] + ), - emqtt:publish(C1, - <<"retained/2">>, #{}, <<"1234567">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, + <<"retained/2">>, + #{}, + <<"1234567">>, + [{qos, 0}, {retain, true}] + ), - timer:sleep(500), - {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), - ?assertEqual(1, erlang:length(List)), + timer:sleep(500), + {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + ?assertEqual(1, erlang:length(List)), - ok = emqtt:disconnect(C1) - end, + ok = emqtt:disconnect(C1) + end, with_conf(ConfMod, Case). t_page_read(_) -> @@ -394,12 +457,13 @@ t_page_read(_) -> timer:sleep(500), Fun = fun(I) -> - emqtt:publish(C1, - <<"retained/", (I + 60)>>, - <<"this is a retained message">>, - [{qos, 0}, {retain, true}] - ) - end, + emqtt:publish( + C1, + <<"retained/", (I + 60)>>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ) + end, lists:foreach(Fun, lists:seq(1, 9)), timer:sleep(200), @@ -436,12 +500,12 @@ receive_messages(Count, Msgs) -> receive {publish, Msg} -> ct:log("Msg: ~p ~n", [Msg]), - receive_messages(Count-1, [Msg|Msgs]); + receive_messages(Count - 1, [Msg | Msgs]); Other -> - ct:log("Other Msg: ~p~n",[Other]), + ct:log("Other Msg: ~p~n", [Other]), receive_messages(Count, Msgs) after 2000 -> - Msgs + Msgs end. with_conf(ConfMod, Case) -> @@ -451,7 +515,8 @@ with_conf(ConfMod, Case) -> try Case(), emqx_retainer:update_config(Conf) - catch Type:Error:Strace -> + catch + Type:Error:Strace -> emqx_retainer:update_config(Conf), erlang:raise(Type, Error, Strace) end. diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index 0eec1b1f1..ba062a883 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -56,19 +56,31 @@ t_config(_Config) -> Path = api_path(["mqtt", "retainer"]), {ok, ConfJson} = request_api(get, Path), ReturnConf = decode_json(ConfJson), - ?assertMatch(#{backend := _, enable := _, flow_control := _, - max_payload_size := _, msg_clear_interval := _, - msg_expiry_interval := _}, - ReturnConf), + ?assertMatch( + #{ + backend := _, + enable := _, + flow_control := _, + max_payload_size := _, + msg_clear_interval := _, + msg_expiry_interval := _ + }, + ReturnConf + ), UpdateConf = fun(Enable) -> - RawConf = emqx_json:decode(ConfJson, [return_maps]), - UpdateJson = RawConf#{<<"enable">> := Enable}, - {ok, UpdateResJson} = request_api(put, - Path, [], auth_header_(), UpdateJson), - UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]), - ?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf)) - end, + RawConf = emqx_json:decode(ConfJson, [return_maps]), + UpdateJson = RawConf#{<<"enable">> := Enable}, + {ok, UpdateResJson} = request_api( + put, + Path, + [], + auth_header_(), + UpdateJson + ), + UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]), + ?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf)) + end, UpdateConf(false), UpdateConf(true). @@ -80,10 +92,13 @@ t_messages(_) -> timer:sleep(500), Each = fun(I) -> - emqtt:publish(C1, <<"retained/", (I + 60)>>, - <<"retained">>, - [{qos, 0}, {retain, true}]) - end, + emqtt:publish( + C1, + <<"retained/", (I + 60)>>, + <<"retained">>, + [{qos, 0}, {retain, true}] + ) + end, lists:foreach(Each, lists:seq(1, 5)), timer:sleep(500), @@ -91,19 +106,28 @@ t_messages(_) -> {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])), Msgs = decode_json(MsgsJson), MsgLen = erlang:length(Msgs), - ?assert(MsgLen >= 5, - io_lib:format("message length is:~p~n", [MsgLen])), %% maybe has $SYS messages + ?assert( + MsgLen >= 5, + %% maybe has $SYS messages + io_lib:format("message length is:~p~n", [MsgLen]) + ), [First | _] = Msgs, - ?assertMatch(#{msgid := _, topic := _, qos := _, - publish_at := _, from_clientid := _, from_username := _ - }, - First), + ?assertMatch( + #{ + msgid := _, + topic := _, + qos := _, + publish_at := _, + from_clientid := _, + from_username := _ + }, + First + ), ok = emqtt:disconnect(C1). t_lookup_and_delete(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqx_retainer:clean(), @@ -116,10 +140,18 @@ t_lookup_and_delete(_) -> {ok, LookupJson} = request_api(get, API), LookupResult = decode_json(LookupJson), - ?assertMatch(#{msgid := _, topic := _, qos := _, payload := _, - publish_at := _, from_clientid := _, from_username := _ - }, - LookupResult), + ?assertMatch( + #{ + msgid := _, + topic := _, + qos := _, + payload := _, + publish_at := _, + from_clientid := _, + from_username := _ + }, + LookupResult + ), {ok, []} = request_api(delete, API), diff --git a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl index 8a3158aac..69c4c6801 100644 --- a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl @@ -37,4 +37,3 @@ end_per_testcase(_TestCase, Config) -> % t_load(_) -> % error('TODO'). - diff --git a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl index a29347440..459948902 100644 --- a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl @@ -43,7 +43,7 @@ receive_messages(0, Msgs) -> receive_messages(Count, Msgs) -> receive {publish, Msg} -> - receive_messages(Count-1, [Msg|Msgs]); + receive_messages(Count - 1, [Msg | Msgs]); _Other -> receive_messages(Count, Msgs) after 300 -> @@ -74,17 +74,26 @@ t_publish_retain_message(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:publish( - Client1, Topic, #{}, - <<"retained message">>, - [{qos, 2}, {retain, true}]), + Client1, + Topic, + #{}, + <<"retained message">>, + [{qos, 2}, {retain, true}] + ), {ok, _} = emqtt:publish( - Client1, Topic, #{}, - <<"new retained message">>, - [{qos, 2}, {retain, true}]), + Client1, + Topic, + #{}, + <<"new retained message">>, + [{qos, 2}, {retain, true}] + ), {ok, _} = emqtt:publish( - Client1, Topic, #{}, - <<"not retained message">>, - [{qos, 2}, {retain, false}]), + Client1, + Topic, + #{}, + <<"not retained message">>, + [{qos, 2}, {retain, false}] + ), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), [Msg] = receive_messages(3), @@ -95,7 +104,8 @@ t_publish_retain_message(_) -> {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), - ?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.3.1-6] [MQTT-3.3.1-7] + %% [MQTT-3.3.1-6] [MQTT-3.3.1-7] + ?assertEqual(0, length(receive_messages(1))), ok = emqtt:disconnect(Client1). @@ -103,38 +113,55 @@ t_publish_message_expiry_interval(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:publish( - Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, - <<"retained message">>, - [{qos, 1}, {retain, true}]), + Client1, + <<"topic/A">>, + #{'Message-Expiry-Interval' => 1}, + <<"retained message">>, + [{qos, 1}, {retain, true}] + ), {ok, _} = emqtt:publish( - Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, - <<"retained message">>, - [{qos, 2}, {retain, true}]), + Client1, + <<"topic/B">>, + #{'Message-Expiry-Interval' => 1}, + <<"retained message">>, + [{qos, 2}, {retain, true}] + ), {ok, _} = emqtt:publish( - Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, - <<"retained message">>, - [{qos, 1}, {retain, true}]), + Client1, + <<"topic/C">>, + #{'Message-Expiry-Interval' => 10}, + <<"retained message">>, + [{qos, 1}, {retain, true}] + ), {ok, _} = emqtt:publish( - Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, - <<"retained message">>, - [{qos, 2}, {retain, true}]), + Client1, + <<"topic/D">>, + #{'Message-Expiry-Interval' => 10}, + <<"retained message">>, + [{qos, 2}, {retain, true}] + ), timer:sleep(1500), {ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2), Msgs = receive_messages(6), - ?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5] + %% [MQTT-3.3.2-5] + ?assertEqual(2, length(Msgs)), L = lists:map( - fun(Msg) -> - MessageExpiryInterval = maps:get('Message-Expiry-Interval', - maps:get(properties, Msg)), + fun(Msg) -> + MessageExpiryInterval = maps:get( + 'Message-Expiry-Interval', + maps:get(properties, Msg) + ), MessageExpiryInterval < 10 - end, Msgs), - ?assertEqual(2, length(L)), %% [MQTT-3.3.2-6] + end, + Msgs + ), + %% [MQTT-3.3.2-6] + ?assertEqual(2, length(L)), ok = emqtt:disconnect(Client1), - clean_retained( <<"topic/C">>), - clean_retained( <<"topic/D">>). - + clean_retained(<<"topic/C">>), + clean_retained(<<"topic/D">>). %%-------------------------------------------------------------------- %% Subsctibe @@ -144,39 +171,50 @@ t_subscribe_retain_handing(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), ok = emqtt:publish( - Client1, <<"topic/A">>, #{}, - <<"retained message">>, - [{qos, 0}, {retain, true}] - ), + Client1, + <<"topic/A">>, + #{}, + <<"retained message">>, + [{qos, 0}, {retain, true}] + ), {ok, _} = emqtt:publish( - Client1, <<"topic/B">>, #{}, - <<"retained message">>, - [{qos, 1}, {retain, true}] - ), + Client1, + <<"topic/B">>, + #{}, + <<"retained message">>, + [{qos, 1}, {retain, true}] + ), {ok, _} = emqtt:publish( - Client1, <<"topic/C">>, #{}, - <<"retained message">>, - [{qos, 2}, {retain, true}] - ), + Client1, + <<"topic/C">>, + #{}, + <<"retained message">>, + [{qos, 2}, {retain, true}] + ), timer:sleep(200), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]), - ?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10] + %% [MQTT-3.3.1-10] + ?assertEqual(3, length(receive_messages(3))), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 2}, {qos, 2}]}]), - ?assertEqual(0, length(receive_messages(3))), %% [MQTT-3.3.1-11] + %% [MQTT-3.3.1-11] + ?assertEqual(0, length(receive_messages(3))), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]), - ?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-9] + %% [MQTT-3.3.1-9] + ?assertEqual(3, length(receive_messages(3))), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]), - ?assertEqual(0, length(receive_messages(3))), %% [MQTT-3.3.1-10] + %% [MQTT-3.3.1-10] + ?assertEqual(0, length(receive_messages(3))), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]), - ?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.8.4-4] + %% [MQTT-3.8.4-4] + ?assertEqual(3, length(receive_messages(3))), ok = emqtt:disconnect(Client1), - clean_retained( <<"topic/A">>), - clean_retained( <<"topic/B">>), - clean_retained( <<"topic/C">>). + clean_retained(<<"topic/A">>), + clean_retained(<<"topic/B">>), + clean_retained(<<"topic/C">>).