diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index f91b3aa4f..3a96909ff 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -6,40 +6,76 @@ ## ## Notice that all nodes in the same cluster have to be configured to emqx_retainer: { - ## enable/disable emqx_retainer - enable: true - ## use the same storage_type. - ## - ## Value: ram | disc | disc_only - ## - ram: memory only - ## - disc: both memory and disc - ## - disc_only: disc only - ## - ## Default: ram - storage_type: ram + ## enable/disable emqx_retainer + enable: true - ## Maximum number of retained messages. 0 means no limit. - ## - ## Value: Number >= 0 - max_retained_messages: 0 + ## Periodic interval for cleaning up expired messages. Never clear if the value is 0. + ## + ## Value: Duration + ## - h: hour + ## - m: minute + ## - s: second + ## + ## Examples: + ## - 2h: 2 hours + ## - 30m: 30 minutes + ## - 20s: 20 seconds + ## + ## Default: 0s + msg_clear_interval: 0s - ## Maximum retained message size. - ## - ## Value: Bytes - max_payload_size: 1MB + ## Message retention time. 0 means message will never be expired. + ## + ## Default: 0s + msg_expiry_interval: 0s - ## Expiry interval of the retained messages. Never expire if the value is 0. - ## - ## Value: Duration - ## - h: hour - ## - m: minute - ## - s: second - ## - ## Examples: - ## - 2h: 2 hours - ## - 30m: 30 minutes - ## - 20s: 20 seconds - ## - ## Default: 0s - expiry_interval: 0s + ## The message read and deliver flow rate control + ## When a client subscribe to a wildcard topic, may many retained messages will be loaded. + ## If you don't want these data loaded to the memory all at once, you can use this to control. + ## The processing flow: + ## load max_read_number retained message from storage -> + ## deliver -> + ## repeat this, until all retianed messages are delivered + ## + flow_control: { + ## The max messages number per read from storage. 0 means no limit + ## + ## Default: 0 + max_read_number: 0 + + ## The max number of retained message can be delivered in emqx per quota_release_interval.0 means no limit + ## + ## Default: 0 + msg_deliver_quota: 0 + + ## deliver quota reset interval + ## + ## Default: 0s + quota_release_interval: 0s + } + + ## Maximum retained message size. + ## + ## Value: Bytes + max_payload_size: 1MB + + ## Storage connect parameters + ## + ## Value: mnesia + ## + connector: + [ + { + type: mnesia + config: { + ## storage_type: ram | disc | disc_only + storage_type: ram + + ## Maximum number of retained messages. 0 means no limit. + ## + ## Value: Number >= 0 + max_retained_messages: 0 + } + } + ] } diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl index a9978e206..cd07f0692 100644 --- a/apps/emqx_retainer/include/emqx_retainer.hrl +++ b/apps/emqx_retainer/include/emqx_retainer.hrl @@ -14,7 +14,26 @@ %% limitations under the License. %%-------------------------------------------------------------------- +-include_lib("emqx/include/emqx.hrl"). + -define(APP, emqx_retainer). -define(TAB, ?APP). --record(retained, {topic, msg, expiry_time}). -define(RETAINER_SHARD, emqx_retainer_shard). + +-type topic() :: binary(). +-type payload() :: binary(). +-type message() :: #message{}. + +-type context() :: #{context_id := pos_integer(), + atom() => term()}. + +-define(DELIVER_SEMAPHORE, deliver_remained_quota). +-type semaphore() :: ?DELIVER_SEMAPHORE. +-type cursor() :: undefined | term(). +-type result() :: term(). + +-define(SHARED_CONTEXT_TAB, emqx_retainer_ctx). +-record(shared_context, {key :: atom(), value :: term()}). +-type shared_context_key() :: ?DELIVER_SEMAPHORE. + +-type backend() :: emqx_retainer_storage_mnesia. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 961578870..dcb34ccb2 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -21,24 +21,24 @@ -include("emqx_retainer.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). -logger_header("[Retainer]"). -export([start_link/0]). --export([unload/0 +-export([ on_session_subscribed/4 + , on_message_publish/2 ]). --export([ on_session_subscribed/3 - , on_message_publish/1 - ]). +-export([ dispatch/4 + , delete_message/2 + , store_retained/2 + , deliver/5]). --export([ clean/1 - , update_config/1]). - -%% for emqx_pool task func --export([dispatch/2]). +-export([ get_expiry_time/1 + , update_config/1 + , clean/0 + , delete/1]). %% gen_server callbacks -export([ init/1 @@ -49,62 +49,52 @@ , code_change/3 ]). --record(state, {stats_fun, stats_timer, expiry_timer}). - --define(STATS_INTERVAL, timer:seconds(1)). --define(DEF_STORAGE_TYPE, ram). --define(DEF_MAX_RETAINED_MESSAGES, 0). --define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). --define(DEF_EXPIRY_INTERVAL, 0). --define(DEF_ENABLE_VAL, false). - -%% convenient to generate stats_timer/expiry_timer --define(MAKE_TIMER(State, Timer, Interval, Msg), - State#state{Timer = erlang:send_after(Interval, self(), Msg)}). +-type state() :: #{ enable := boolean() + , context_id := non_neg_integer() + , context := undefined | context() + , clear_timer := undefined | reference() + , release_quota_timer := undefined | reference() + , wait_quotas := list() + }. -rlog_shard({?RETAINER_SHARD, ?TAB}). +-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). +-define(DEF_EXPIRY_INTERVAL, 0). + +-define(CAST(Msg), gen_server:cast(?MODULE, Msg)). + +-callback delete_message(context(), topic()) -> ok. +-callback store_retained(context(), message()) -> ok. +-callback read_message(context(), topic()) -> {ok, list()}. +-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. +-callback clear_expired(context()) -> ok. +-callback clean(context()) -> ok. + %%-------------------------------------------------------------------- -%% Load/Unload +%% Hook API %%-------------------------------------------------------------------- - -load() -> - _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, []}), - _ = emqx:hook('message.publish', {?MODULE, on_message_publish, []}), - ok. - -unload() -> - emqx:unhook('message.publish', {?MODULE, on_message_publish}), - emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). - -on_session_subscribed(_, _, #{share := ShareName}) when ShareName =/= undefined -> +on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined -> ok; -on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}) -> +on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}, Context) -> case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of - true -> emqx_pool:async_submit(fun ?MODULE:dispatch/2, [self(), Topic]); + true -> dispatch(Context, Topic); _ -> ok end. -%% @private -dispatch(Pid, Topic) -> - Msgs = case emqx_topic:wildcard(Topic) of - false -> read_messages(Topic); - true -> match_messages(Topic) - end, - [Pid ! {deliver, Topic, Msg} || Msg <- sort_retained(Msgs)]. - %% RETAIN flag set to 1 and payload containing zero bytes on_message_publish(Msg = #message{flags = #{retain := true}, topic = Topic, - payload = <<>>}) -> - ekka_mnesia:dirty_delete(?TAB, topic2tokens(Topic)), + payload = <<>>}, + Context) -> + delete_message(Context, Topic), {ok, Msg}; -on_message_publish(Msg = #message{flags = #{retain := true}}) -> +on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> Msg1 = emqx_message:set_header(retained, true, Msg), - store_retained(Msg1), + store_retained(Context, Msg1), {ok, Msg}; -on_message_publish(Msg) -> +on_message_publish(Msg, _) -> {ok, Msg}. %%-------------------------------------------------------------------- @@ -116,71 +106,98 @@ on_message_publish(Msg) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec(clean(emqx_types:topic()) -> non_neg_integer()). -clean(Topic) when is_binary(Topic) -> - case emqx_topic:wildcard(Topic) of - true -> match_delete_messages(Topic); +-spec dispatch(context(), pid(), topic(), cursor()) -> ok. +dispatch(Context, Pid, Topic, Cursor) -> + Mod = get_backend_module(), + case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of false -> - Tokens = topic2tokens(Topic), - Fun = fun() -> - case mnesia:read({?TAB, Tokens}) of - [] -> 0; - [_M] -> mnesia:delete({?TAB, Tokens}), 1 - end - end, - {atomic, N} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), N + {ok, Result} = Mod:read_message(Context, Topic), + deliver(Result, Context, Pid, Topic, undefiend); + true -> + {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), + deliver(Result, Context, Pid, Topic, NewCursor) + end. + +deliver([], Context, Pid, Topic, Cursor) -> + case Cursor of + undefined -> + ok; + _ -> + dispatch(Context, Pid, Topic, Cursor) + end; +deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> + case erlang:is_process_alive(Pid) of + false -> + ok; + _ -> + #{msg_deliver_quota := MaxDeliverNum} = emqx_config:get([?APP, flow_control]), + case MaxDeliverNum of + 0 -> + _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], + ok; + _ -> + case do_deliver(Result, Id, Pid, Topic) of + ok -> + deliver([], Context, Pid, Topic, Cursor); + abort -> + ok + end + end + end. + +get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> + 0; +get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, + timestamp = Ts}) -> + Ts + Interval * 1000; +get_expiry_time(#message{timestamp = Ts}) -> + Interval = emqx_config:get([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), + case Interval of + 0 -> 0; + _ -> Ts + Interval end. -%%-------------------------------------------------------------------- -%% Update Config -%%-------------------------------------------------------------------- -spec update_config(hocon:config()) -> ok. update_config(Conf) -> - OldCfg = emqx_config:get([?APP]), - emqx_config:put([?APP], Conf), - check_enable_when_update(OldCfg). + gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}). + +clean() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + +delete(Topic) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Topic}). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> - StorageType = emqx_config:get([?MODULE, storage_type], ?DEF_STORAGE_TYPE), - ExpiryInterval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL), - Copies = case StorageType of - ram -> ram_copies; - disc -> disc_copies; - disc_only -> disc_only_copies - end, - StoreProps = [{ets, [compressed, - {read_concurrency, true}, - {write_concurrency, true}]}, - {dets, [{auto_save, 1000}]}], - ok = ekka_mnesia:create_table(?TAB, [ - {type, set}, - {Copies, [node()]}, - {record_name, retained}, - {attributes, record_info(fields, retained)}, - {storage_properties, StoreProps}]), - ok = ekka_mnesia:copy_table(?TAB, Copies), - ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity), - case mnesia:table_info(?TAB, storage_type) of - Copies -> ok; - _Other -> - {atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies), - ok - end, - StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'), - State = ?MAKE_TIMER(#state{stats_fun = StatsFun}, stats_timer, ?STATS_INTERVAL, stats), - check_enable_when_init(), - {ok, start_expire_timer(ExpiryInterval, State)}. + init_shared_context(), + State = new_state(), + #{enable := Enable} = Cfg = emqx_config:get([?APP]), + {ok, + case Enable of + true -> + enable_retainer(State, Cfg); + _ -> + State + end}. -start_expire_timer(0, State) -> - State; -start_expire_timer(undefined, State) -> - State; -start_expire_timer(Ms, State) -> - ?MAKE_TIMER(State, expiry_timer, Ms, expire). +handle_call({update_config, Conf}, _, State) -> + State2 = update_config(State, Conf), + emqx_config:put([?APP], Conf), + {reply, ok, State2}; + +handle_call({wait_semaphore, Id}, From, #{wait_quotas := Waits} = State) -> + {noreply, State#{wait_quotas := [{Id, From} | Waits]}}; + +handle_call(clean, _, #{context := Context} = State) -> + clean(Context), + {reply, ok, State}; + +handle_call({delete, Topic}, _, #{context := Context} = State) -> + delete_message(Context, Topic), + {reply, ok, State}; handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), @@ -190,22 +207,36 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(stats, State = #state{stats_fun = StatsFun}) -> - StatsFun(retained_count()), - {noreply, ?MAKE_TIMER(State, stats_timer, ?STATS_INTERVAL, stats), hibernate}; +handle_info(clear_expired, #{context := Context} = State) -> + Mod = get_backend_module(), + Mod:clear_expired(Context), + Interval = emqx_config:get([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), + {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; -handle_info(expire, State) -> - ok = expire_messages(), - Interval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL), - {noreply, start_expire_timer(Interval, State), hibernate}; +handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) -> + insert_shared_context(?DELIVER_SEMAPHORE, get_msg_deliver_quota()), + case Waits of + [] -> + ok; + _ -> + #{context_id := NowId} = Context, + Waits2 = lists:reverse(Waits), + lists:foreach(fun({Id, From}) -> + gen_server:reply(From, Id =:= NowId) + end, + Waits2) + end, + Interval = emqx_config:get([?APP, flow_control, quota_release_interval]), + {noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota), + wait_quotas := []}}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{stats_timer = TRef1, expiry_timer = TRef2}) -> - _ = erlang:cancel_timer(TRef1), - _ = erlang:cancel_timer(TRef2), +terminate(_Reason, #{clear_timer := TRef1, release_quota_timer := TRef2}) -> + _ = stop_timer(TRef1), + _ = stop_timer(TRef2), ok. code_change(_OldVsn, State, _Extra) -> @@ -214,141 +245,211 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -sort_retained([]) -> []; -sort_retained([Msg]) -> [Msg]; -sort_retained(Msgs) -> - lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) -> - Ts1 =< Ts2 end, - Msgs). +-spec new_state() -> state(). +new_state() -> + #{enable => false, + context_id => 0, + context => undefined, + clear_timer => undefined, + release_quota_timer => undefined, + wait_quotas => []}. -store_retained(Msg = #message{topic = Topic, payload = Payload}) -> - case {is_table_full(), is_too_big(size(Payload))} of - {false, false} -> - ok = emqx_metrics:inc('messages.retained'), - ekka_mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = get_expiry_time(Msg)}); - {true, false} -> - {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, - fun() -> - case mnesia:read(?TAB, Topic) of - [_] -> - mnesia:write(?TAB, - #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = get_expiry_time(Msg)}, - write); - [] -> - ?LOG(error, - "Cannot retain message(topic=~s) for table is full!", [Topic]) - end - end), - ok; - {true, _} -> - ?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic]); - {_, true} -> - ?LOG(error, "Cannot retain message(topic=~s, payload_size=~p) " - "for payload is too big!", [Topic, iolist_size(Payload)]) - end. - -is_table_full() -> - Limit = emqx_config:get([?MODULE, max_retained_messages], ?DEF_MAX_RETAINED_MESSAGES), - Limit > 0 andalso (retained_count() > Limit). +-spec new_context(pos_integer()) -> context(). +new_context(Id) -> + #{context_id => Id}. is_too_big(Size) -> - Limit = emqx_config:get([?MODULE, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), + Limit = emqx_config:get([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), Limit > 0 andalso (Size > Limit). -get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> - 0; -get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, - timestamp = Ts}) -> - Ts + Interval * 1000; -get_expiry_time(#message{timestamp = Ts}) -> - Interval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL), - case Interval of - 0 -> 0; - _ -> Ts + Interval +%% @private +dispatch(Context, Topic) -> + emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4, + [Context, self(), Topic, undefined]). + +-spec delete_message(context(), topic()) -> ok. +delete_message(Context, Topic) -> + Mod = get_backend_module(), + Mod:delete_message(Context, Topic). + +-spec store_retained(context(), message()) -> ok. +store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> + case is_too_big(erlang:byte_size(Payload)) of + false -> + Mod = get_backend_module(), + Mod:store_retained(Context, Msg); + _ -> + ?ERROR("Cannot retain message(topic=~s, payload_size=~p) for payload is too big!", + [Topic, iolist_size(Payload)]) end. -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- +-spec clean(context()) -> ok. +clean(Context) -> + Mod = get_backend_module(), + Mod:clean(Context). --spec(retained_count() -> non_neg_integer()). -retained_count() -> mnesia:table_info(?TAB, size). - -topic2tokens(Topic) -> - emqx_topic:words(Topic). - -expire_messages() -> - NowMs = erlang:system_time(millisecond), - MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'}, - Ms = [{MsHd, [{'=/=','$3',0}, {'<','$3',NowMs}], ['$1']}], - {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, - fun() -> - Keys = mnesia:select(?TAB, Ms, write), - lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) - end), +-spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort. +do_deliver([Msg | T], Id, Pid, Topic) -> + case require_semaphore(?DELIVER_SEMAPHORE, Id) of + true -> + Pid ! {deliver, Topic, Msg}, + do_deliver(T, Id, Pid, Topic); + _ -> + abort + end; +do_deliver([], _, _, _) -> ok. --spec(read_messages(emqx_types:topic()) - -> [emqx_types:message()]). -read_messages(Topic) -> - Tokens = topic2tokens(Topic), - case mnesia:dirty_read(?TAB, Tokens) of - [] -> []; - [#retained{msg = Msg, expiry_time = Et}] -> - case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of - true -> [Msg]; - false -> [] - end - end. +-spec require_semaphore(semaphore(), pos_integer()) -> boolean(). +require_semaphore(Semaphore, Id) -> + Remained = ets:update_counter(?SHARED_CONTEXT_TAB, + Semaphore, + {#shared_context.value, -1, 0, 0}), + wait_semaphore(Remained, Id). --spec(match_messages(emqx_types:topic()) - -> [emqx_types:message()]). -match_messages(Filter) -> - NowMs = erlang:system_time(millisecond), - Cond = condition(emqx_topic:words(Filter)), - MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'}, - Ms = [{MsHd, [{'=:=','$3',0}], ['$2']}, - {MsHd, [{'>','$3',NowMs}], ['$2']}], - mnesia:dirty_select(?TAB, Ms). +-spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean(). +wait_semaphore(0, Id) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Id}, infinity); +wait_semaphore(_, _) -> + true. --spec(match_delete_messages(emqx_types:topic()) - -> DeletedCnt :: non_neg_integer()). -match_delete_messages(Filter) -> - Cond = condition(emqx_topic:words(Filter)), - MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, - Ms = [{MsHd, [], ['$_']}], - Rs = mnesia:dirty_select(?TAB, Ms), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs), - length(Rs). +-spec init_shared_context() -> ok. +init_shared_context() -> + ?SHARED_CONTEXT_TAB = ets:new(?SHARED_CONTEXT_TAB, + [ set, named_table, public + , {keypos, #shared_context.key} + , {write_concurrency, true} + , {read_concurrency, true}]), + lists:foreach(fun({K, V}) -> + insert_shared_context(K, V) + end, + [{?DELIVER_SEMAPHORE, get_msg_deliver_quota()}]). -%% @private -condition(Ws) -> - Ws1 = [case W =:= '+' of true -> '_'; _ -> W end || W <- Ws], - case lists:last(Ws1) =:= '#' of - false -> Ws1; - _ -> (Ws1 -- ['#']) ++ '_' - end. --spec check_enable_when_init() -> ok. -check_enable_when_init() -> - case emqx_config:get([?APP, enable], ?DEF_ENABLE_VAL) of - true -> load(); - _ -> ok - end. +-spec insert_shared_context(shared_context_key(), term()) -> ok. +insert_shared_context(Key, Term) -> + ets:insert(?SHARED_CONTEXT_TAB, #shared_context{key = Key, value = Term}), + ok. --spec check_enable_when_update(hocon:config()) -> ok. -check_enable_when_update(OldCfg) -> - OldVal = maps:get(enable, OldCfg, undefined), - case emqx_config:get([?APP, enable], ?DEF_ENABLE_VAL) of - OldVal -> - ok; +-spec get_msg_deliver_quota() -> non_neg_integer(). +get_msg_deliver_quota() -> + emqx_config:get([?APP, flow_control, msg_deliver_quota]). + +-spec update_config(state(), hocons:config()) -> state(). +update_config(#{clear_timer := ClearTimer, + release_quota_timer := QuotaTimer} = State, Conf) -> + #{enable := Enable, + connector := [Connector | _], + flow_control := #{quota_release_interval := QuotaInterval}, + msg_clear_interval := ClearInterval} = Conf, + + #{connector := [OldConnector | _]} = emqx_config:get([?APP]), + + case Enable of true -> - load(); + StorageType = maps:get(type, Connector), + OldStrorageType = maps:get(type, OldConnector), + case OldStrorageType of + StorageType -> + State#{clear_timer := check_timer(ClearTimer, + ClearInterval, + clear_expired), + release_quota_timer := check_timer(QuotaTimer, + QuotaInterval, + release_deliver_quota)}; + _ -> + State2 = disable_retainer(State), + enable_retainer(State2, Conf) + end; _ -> - unload() + disable_retainer(State) end. +-spec enable_retainer(state(), hocon:config()) -> state(). +enable_retainer(#{context_id := ContextId} = State, + #{msg_clear_interval := ClearInterval, + flow_control := #{quota_release_interval := ReleaseInterval}, + connector := [Connector | _]}) -> + NewContextId = ContextId + 1, + Context = create_resource(new_context(NewContextId), Connector), + load(Context), + State#{enable := true, + context_id := NewContextId, + context := Context, + clear_timer := add_timer(ClearInterval, clear_expired), + release_quota_timer := add_timer(ReleaseInterval, release_deliver_quota)}. + +-spec disable_retainer(state()) -> state(). +disable_retainer(#{clear_timer := TRef1, + release_quota_timer := TRef2, + context := Context, + wait_quotas := Waits} = State) -> + unload(), + ok = lists:foreach(fun(E) -> gen_server:reply(E, false) end, Waits), + ok = close_resource(Context), + State#{enable := false, + clear_timer := stop_timer(TRef1), + release_quota_timer := stop_timer(TRef2), + wait_quotas := []}. + +-spec stop_timer(undefined | reference()) -> undefined. +stop_timer(undefined) -> + undefined; +stop_timer(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + undefined. + +add_timer(0, _) -> + undefined; +add_timer(undefined, _) -> + undefined; +add_timer(Ms, Content) -> + erlang:send_after(Ms, self(), Content). + +check_timer(undefined, Ms, Context) -> + add_timer(Ms, Context); +check_timer(Timer, 0, _) -> + stop_timer(Timer); +check_timer(Timer, undefined, _) -> + stop_timer(Timer); +check_timer(Timer, _, _) -> + Timer. + +-spec get_backend_module() -> backend(). +get_backend_module() -> + [#{type := Backend} | _] = emqx_config:get([?APP, connector]), + erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, Backend])). + +create_resource(Context, #{type := mnesia, config := Cfg}) -> + emqx_retainer_mnesia:create_resource(Cfg), + Context; + +create_resource(Context, #{type := DB, config := Config}) -> + ResourceID = erlang:iolist_to_binary([io_lib:format("~s_~s", [?APP, DB])]), + case emqx_resource:create( + ResourceID, + list_to_existing_atom(io_lib:format("~s_~s", [emqx_connector, DB])), + Config) of + {ok, _} -> + Context#{resource_id => ResourceID}; + {error, already_created} -> + Context#{resource_id => ResourceID}; + {error, Reason} -> + error({load_config_error, Reason}) + end. + +-spec close_resource(context()) -> ok | {error, term()}. +close_resource(#{resource_id := ResourceId}) -> + emqx_resource:stop(ResourceId); +close_resource(_) -> + ok. + +-spec load(context()) -> ok. +load(Context) -> + _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}), + _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}), + ok. + +unload() -> + emqx:unhook('message.publish', {?MODULE, on_message_publish}), + emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl index 3626bbd00..03d03a7ce 100644 --- a/apps/emqx_retainer/src/emqx_retainer_app.erl +++ b/apps/emqx_retainer/src/emqx_retainer_app.erl @@ -30,6 +30,5 @@ start(_Type, _Args) -> {ok, Sup}. stop(_State) -> - emqx_retainer_cli:unload(), - emqx_retainer:unload(). + emqx_retainer_cli:unload(). diff --git a/apps/emqx_retainer/src/emqx_retainer_cli.erl b/apps/emqx_retainer/src/emqx_retainer_cli.erl index 1e965946f..f24d69bed 100644 --- a/apps/emqx_retainer/src/emqx_retainer_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_cli.erl @@ -27,26 +27,6 @@ load() -> emqx_ctl:register_command(retainer, {?MODULE, cmd}, []). -cmd(["info"]) -> - emqx_ctl:print("retained/total: ~w~n", [mnesia:table_info(?TAB, size)]); - -cmd(["topics"]) -> - case mnesia:dirty_all_keys(?TAB) of - [] -> ignore; - Topics -> lists:foreach(fun(Topic) -> emqx_ctl:print("~s~n", [Topic]) end, Topics) - end; - -cmd(["clean"]) -> - Size = mnesia:table_info(?TAB, size), - case ekka_mnesia:clear_table(?TAB) of - {atomic, ok} -> emqx_ctl:print("Cleaned ~p retained messages~n", [Size]); - {aborted, R} -> emqx_ctl:print("Aborted ~p~n", [R]) - end; - -cmd(["clean", Topic]) -> - Lines = emqx_retainer:clean(list_to_binary(Topic)), - emqx_ctl:print("Cleaned ~p retained messages~n", [Lines]); - cmd(_) -> emqx_ctl:usage([{"retainer info", "Show the count of retained messages"}, {"retainer topics", "Show all topics of retained messages"}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl new file mode 100644 index 000000000..5b6028980 --- /dev/null +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -0,0 +1,241 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_retainer_mnesia). + +-behaviour(emqx_retainer). + +-include("emqx_retainer.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-logger_header("[Retainer]"). + +-export([delete_message/2 + , store_retained/2 + , read_message/2 + , match_messages/3 + , clear_expired/1 + , clean/1]). + +-export([create_resource/1]). + +-define(DEF_MAX_RETAINED_MESSAGES, 0). + +-rlog_shard({?RETAINER_SHARD, ?TAB}). + +-record(retained, {topic, msg, expiry_time}). + +-type batch_read_result() :: + {ok, list(emqx:message()), cursor()}. + +%%-------------------------------------------------------------------- +%% emqx_retainer_storage callbacks +%%-------------------------------------------------------------------- +create_resource(#{storage_type := StorageType}) -> + Copies = case StorageType of + ram -> ram_copies; + disc -> disc_copies; + disc_only -> disc_only_copies + end, + StoreProps = [{ets, [compressed, + {read_concurrency, true}, + {write_concurrency, true}]}, + {dets, [{auto_save, 1000}]}], + ok = ekka_mnesia:create_table(?TAB, [ + {type, set}, + {Copies, [node()]}, + {record_name, retained}, + {attributes, record_info(fields, retained)}, + {storage_properties, StoreProps}]), + ok = ekka_mnesia:copy_table(?TAB, Copies), + ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity), + case mnesia:table_info(?TAB, storage_type) of + Copies -> ok; + _Other -> + {atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies), + ok + end. + +store_retained(_, Msg =#message{topic = Topic}) -> + ExpiryTime = emqx_retainer:get_expiry_time(Msg), + case is_table_full() of + false -> + ok = emqx_metrics:inc('messages.retained'), + ekka_mnesia:dirty_write(?TAB, + #retained{topic = topic2tokens(Topic), + msg = Msg, + expiry_time = ExpiryTime}); + _ -> + Tokens = topic2tokens(Topic), + Fun = fun() -> + case mnesia:read(?TAB, Tokens) of + [_] -> + mnesia:write(?TAB, + #retained{topic = Tokens, + msg = Msg, + expiry_time = ExpiryTime}, + write); + [] -> + ?LOG(error, + "Cannot retain message(topic=~s) for table is full!", + [Topic]), + ok + end + end, + {atomic, ok} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + ok + end. + +clear_expired(_) -> + NowMs = erlang:system_time(millisecond), + MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'}, + Ms = [{MsHd, [{'=/=', '$3', 0}, {'<', '$3', NowMs}], ['$1']}], + Fun = fun() -> + Keys = mnesia:select(?TAB, Ms, write), + lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) + end, + {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + ok. + +delete_message(_, Topic) -> + case emqx_topic:wildcard(Topic) of + true -> match_delete_messages(Topic); + false -> + Tokens = topic2tokens(Topic), + Fun = fun() -> + mnesia:delete({?TAB, Tokens}) + end, + case ekka_mnesia:transaction(?RETAINER_SHARD, Fun) of + {atomic, Result} -> + Result; + ok -> + ok + end + end, + ok. + +read_message(_, Topic) -> + {ok, read_messages(Topic)}. + +match_messages(_, Topic, Cursor) -> + MaxReadNum = emqx_config:get([?APP, flow_control, max_read_number]), + case Cursor of + undefined -> + case MaxReadNum of + 0 -> + {ok, sort_retained(match_messages(Topic)), undefined}; + _ -> + start_batch_read(Topic, MaxReadNum) + end; + _ -> + batch_read_messages(Cursor, MaxReadNum) + end. + +clean(_) -> + ekka_mnesia:clear_table(?TAB), + ok. +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +sort_retained([]) -> []; +sort_retained([Msg]) -> [Msg]; +sort_retained(Msgs) -> + lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) -> + Ts1 =< Ts2 end, + Msgs). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- +topic2tokens(Topic) -> + emqx_topic:words(Topic). + +-spec start_batch_read(topic(), pos_integer()) -> batch_read_result(). +start_batch_read(Topic, MaxReadNum) -> + Ms = make_match_spec(Topic), + TabQH = ets:table(?TAB, [{traverse, {select, Ms}}]), + QH = qlc:q([E || E <- TabQH]), + Cursor = qlc:cursor(QH), + batch_read_messages(Cursor, MaxReadNum). + +-spec batch_read_messages(emqx_retainer_storage:cursor(), pos_integer()) -> batch_read_result(). +batch_read_messages(Cursor, MaxReadNum) -> + Answers = qlc:next_answers(Cursor, MaxReadNum), + Orders = sort_retained(Answers), + case erlang:length(Orders) < MaxReadNum of + true -> + qlc:delete_cursor(Cursor), + {ok, Orders, undefined}; + _ -> + {ok, Orders, Cursor} + end. + +-spec(read_messages(emqx_types:topic()) + -> [emqx_types:message()]). +read_messages(Topic) -> + Tokens = topic2tokens(Topic), + case mnesia:dirty_read(?TAB, Tokens) of + [] -> []; + [#retained{msg = Msg, expiry_time = Et}] -> + case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of + true -> [Msg]; + false -> [] + end + end. + +-spec(match_messages(emqx_types:topic()) + -> [emqx_types:message()]). +match_messages(Filter) -> + Ms = make_match_spec(Filter), + mnesia:dirty_select(?TAB, Ms). + +-spec(match_delete_messages(emqx_types:topic()) -> ok). +match_delete_messages(Filter) -> + Cond = condition(emqx_topic:words(Filter)), + MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, + Ms = [{MsHd, [], ['$_']}], + Rs = mnesia:dirty_select(?TAB, Ms), + lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs). + +%% @private +condition(Ws) -> + Ws1 = [case W =:= '+' of true -> '_'; _ -> W end || W <- Ws], + case lists:last(Ws1) =:= '#' of + false -> Ws1; + _ -> (Ws1 -- ['#']) ++ '_' + end. + +-spec make_match_spec(topic()) -> ets:match_spec(). +make_match_spec(Filter) -> + NowMs = erlang:system_time(millisecond), + Cond = condition(emqx_topic:words(Filter)), + MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'}, + [{MsHd, [{'=:=', '$3', 0}], ['$2']}, + {MsHd, [{'>', '$3', NowMs}], ['$2']}]. + +-spec is_table_full() -> boolean(). +is_table_full() -> + [#{config := Cfg} | _] = emqx_config:get([?APP, connector]), + Limit = maps:get(max_retained_messages, + Cfg, + ?DEF_MAX_RETAINED_MESSAGES), + Limit > 0 andalso (table_size() >= Limit). + +-spec table_size() -> non_neg_integer(). +table_size() -> + mnesia:table_info(?TAB, size). diff --git a/apps/emqx_retainer/src/emqx_retainer_pool.erl b/apps/emqx_retainer/src/emqx_retainer_pool.erl new file mode 100644 index 000000000..59ea1077a --- /dev/null +++ b/apps/emqx_retainer/src/emqx_retainer_pool.erl @@ -0,0 +1,182 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_retainer_pool). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). + +%% API +-export([start_link/2, + async_submit/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3, format_status/2]). + +-define(POOL, ?MODULE). + +%%%=================================================================== +%%% API +%%%=================================================================== +async_submit(Fun, Args) -> + cast({async_submit, {Fun, Args}}). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% @end +%%-------------------------------------------------------------------- +-spec start_link(atom(), pos_integer()) -> {ok, Pid :: pid()} | + {error, Error :: {already_started, pid()}} | + {error, Error :: term()} | + ignore. +start_link(Pool, Id) -> + gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% @end +%%-------------------------------------------------------------------- +-spec init(Args :: term()) -> {ok, State :: term()} | + {ok, State :: term(), Timeout :: timeout()} | + {ok, State :: term(), hibernate} | + {stop, Reason :: term()} | + ignore. +init([Pool, Id]) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #{pool => Pool, id => Id}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> + {reply, Reply :: term(), NewState :: term()} | + {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | + {reply, Reply :: term(), NewState :: term(), hibernate} | + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: term()} | + {stop, Reason :: term(), NewState :: term()}. +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_cast(Request :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: term(), NewState :: term()}. +handle_cast({async_submit, Task}, State) -> + try run(Task) + catch _:Error:Stacktrace -> + ?LOG(error, "Error: ~0p, ~0p", [Error, Stacktrace]) + end, + {noreply, State}; + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_info(Info :: timeout() | term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: normal | term(), NewState :: term()}. +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), + State :: term()) -> any(). +terminate(_Reason, #{pool := Pool, id := Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}). +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +-spec code_change(OldVsn :: term() | {down, term()}, + State :: term(), + Extra :: term()) -> {ok, NewState :: term()} | + {error, Reason :: term()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for changing the form and appearance +%% of gen_server status when it is returned from sys:get_status/1,2 +%% or when it appears in termination error logs. +%% @end +%%-------------------------------------------------------------------- +-spec format_status(Opt :: normal | terminate, + Status :: list()) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +%% @private +cast(Msg) -> + gen_server:cast(worker(), Msg). + +%% @private +worker() -> + gproc_pool:pick_worker(?POOL). + +run({M, F, A}) -> + erlang:apply(M, F, A); +run({F, A}) when is_function(F), is_list(A) -> + erlang:apply(F, A); +run(Fun) when is_function(Fun) -> + Fun(). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 14f643823..96cf80846 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -2,20 +2,35 @@ -include_lib("typerefl/include/types.hrl"). --type storage_type() :: ram | disc | disc_only. - --reflect_type([storage_type/0]). - -export([structs/0, fields/1]). +-define(TYPE(Type), hoconsc:t(Type)). + structs() -> ["emqx_retainer"]. fields("emqx_retainer") -> [ {enable, t(boolean(), false)} - , {storage_type, t(storage_type(), ram)} - , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} + , {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")} + , {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")} + , {connector, connector()} + , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} , {max_payload_size, t(emqx_schema:bytesize(), "1MB")} - , {expiry_interval, t(emqx_schema:duration_ms(), "0s")} + ]; + +fields(mnesia_connector) -> + [ {type, ?TYPE(hoconsc:union([mnesia]))} + , {config, ?TYPE(hoconsc:ref(?MODULE, mnesia_connector_cfg))} + ]; + +fields(mnesia_connector_cfg) -> + [ {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)} + , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} + ]; + +fields(flow_control) -> + [ {max_read_number, t(integer(), 0, fun is_pos_integer/1)} + , {msg_deliver_quota, t(integer(), 0, fun is_pos_integer/1)} + , {quota_release_interval, t(emqx_schema:duration_ms(), "0ms")} ]. %%-------------------------------------------------------------------- @@ -28,5 +43,11 @@ t(Type, Default, Validator) -> hoconsc:t(Type, #{default => Default, validator => Validator}). +union_array(Item) when is_list(Item) -> + hoconsc:array(hoconsc:union(Item)). + is_pos_integer(V) -> V >= 0. + +connector() -> + #{type => union_array([hoconsc:ref(?MODULE, mnesia_connector)])}. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index d01c20975..3811ed8f2 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -26,11 +26,13 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + PoolSpec = emqx_pool_sup:spec([emqx_retainer_pool, random, emqx_vm:schedulers(), + {emqx_retainer_pool, start_link, []}]), {ok, {{one_for_one, 10, 3600}, [#{id => retainer, start => {emqx_retainer, start_link, []}, restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_retainer]}]}}. - + modules => [emqx_retainer]}, + PoolSpec]}}. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index cf74b1334..3fa6b8abd 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --define(APP, emqx). +-define(APP, emqx_retainer). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -39,27 +39,34 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_retainer]). init_per_testcase(TestCase, Config) -> - emqx_retainer:clean(<<"#">>), + emqx_retainer:clean(), Interval = case TestCase of t_message_expiry_2 -> 2000; _ -> 0 end, - init_emqx_retainer_conf(Interval), + OldCfg = emqx_config:get([?APP]), + emqx_config:put([?APP], OldCfg#{msg_expiry_interval := Interval}), application:ensure_all_started(emqx_retainer), Config. set_special_configs(emqx_retainer) -> - init_emqx_retainer_conf(0); + init_emqx_retainer_conf(); set_special_configs(_) -> ok. -init_emqx_retainer_conf(Expiry) -> - emqx_config:put([emqx_retainer], +init_emqx_retainer_conf() -> + emqx_config:put([?APP], #{enable => true, - storage_type => ram, - max_retained_messages => 0, - max_payload_size => 1024 * 1024, - expiry_interval => Expiry}). + msg_expiry_interval => 0, + msg_clear_interval => 0, + connector => [#{type => mnesia, + config => + #{max_retained_messages => 0, + storage_type => ram}}], + flow_control => #{max_read_number => 0, + msg_deliver_quota => 0, + quota_release_interval => 0}, + max_payload_size => 1024 * 1024}). %%-------------------------------------------------------------------- %% Test Cases @@ -177,8 +184,8 @@ t_clean(_) -> {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), - 1 = emqx_retainer:clean(<<"retained/test/0">>), - 2 = emqx_retainer:clean(<<"retained/+">>), + ok = emqx_retainer:delete(<<"retained/test/0">>), + ok = emqx_retainer:delete(<<"retained/+">>), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(0, length(receive_messages(3))), @@ -203,4 +210,3 @@ receive_messages(Count, Msgs) -> after 2000 -> Msgs end. - diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index 1f5a32542..d9ea06c2f 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -56,21 +56,14 @@ init_per_testcase(_, Config) -> Config. set_special_configs(emqx_retainer) -> - init_emqx_retainer_conf(0); + emqx_retainer_SUITE:init_emqx_retainer_conf(); set_special_configs(emqx_management) -> emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], - applications =>[#{id => "admin", secret => "public"}]}), + applications =>[#{id => "admin", secret => "public"}]}), ok; set_special_configs(_) -> ok. -init_emqx_retainer_conf(Expiry) -> - emqx_config:put([emqx_retainer], - #{enable => true, - storage_type => ram, - max_retained_messages => 0, - max_payload_size => 1024 * 1024, - expiry_interval => Expiry}). %%------------------------------------------------------------------------------ %% Test Cases %%------------------------------------------------------------------------------ @@ -78,7 +71,7 @@ init_emqx_retainer_conf(Expiry) -> t_config(_Config) -> {ok, Return} = request_http_rest_lookup(["retainer"]), NowCfg = get_http_data(Return), - NewCfg = NowCfg#{<<"expiry_interval">> => timer:seconds(60)}, + NewCfg = NowCfg#{<<"msg_expiry_interval">> => timer:seconds(60)}, RetainerConf = #{<<"emqx_retainer">> => NewCfg}, {ok, _} = request_http_rest_update(["retainer?action=test"], RetainerConf), diff --git a/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl b/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl index 19d404010..70c8a0554 100644 --- a/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx_retainer/test/mqtt_protocol_v5_SUITE.erl @@ -38,13 +38,7 @@ end_per_suite(_Config) -> %% Helpers %%-------------------------------------------------------------------- set_special_configs(emqx_retainer) -> - emqx_config:put([emqx_retainer], - #{enable => true, - storage_type => ram, - max_retained_messages => 0, - max_payload_size => 1024 * 1024, - expiry_interval => 0}); - + emqx_retainer_SUITE:init_emqx_retainer_conf(); set_special_configs(_) -> ok.