diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index db6226c1f..317942122 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). %% Mnesia bootstrap @@ -57,6 +58,14 @@ -record(delayed_message, {key, delayed, msg}). -type delayed_message() :: #delayed_message{}. + +-type state() :: #{ publish_timer := maybe(timer:tref()) + , publish_at := non_neg_integer() + , stats_timer := maybe(reference()) + , stats_fun := maybe(fun((pos_integer()) -> ok)) + , max_delayed_messages := non_neg_integer() + }. + %% sync ms with record change -define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]). -define(DELETE_MS(Id), [{{delayed_message, {'$1', Id}, '_', '_'}, [], ['$1']}]). @@ -226,8 +235,10 @@ init([Opts]) -> emqx_conf:add_handler([delayed], ?MODULE), MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0), {ok, ensure_stats_event( - ensure_publish_timer(#{timer => undefined, + ensure_publish_timer(#{publish_timer => undefined, publish_at => 0, + stats_timer => undefined, + stats_fun => undefined, max_delayed_messages => MaxDelayedMessages}))}. handle_call({set_max_delayed_messages, Max}, _From, State) -> @@ -268,10 +279,10 @@ handle_cast(Msg, State) -> {noreply, State}. %% Do Publish... -handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> +handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) -> DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), - {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; + {noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})}; handle_info(stats, State = #{stats_fun := StatsFun}) -> StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), @@ -282,9 +293,10 @@ handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. -terminate(_Reason, #{stats_timer := TRef}) -> +terminate(_Reason, #{publish_timer := PublishTimer, stats_timer := StatsTimer}) -> emqx_conf:remove_handler([delayed]), - emqx_misc:cancel_timer(TRef). + emqx_misc:cancel_timer(PublishTimer), + emqx_misc:cancel_timer(StatsTimer). code_change(_Vsn, State, _Extra) -> {ok, State}. @@ -294,20 +306,22 @@ code_change(_Vsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Ensure the stats +-spec ensure_stats_event(state()) -> state(). ensure_stats_event(State) -> StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'), StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), - State#{stats_fun => StatsFun, stats_timer => StatsTimer}. + State#{stats_fun := StatsFun, stats_timer := StatsTimer}. %% Ensure publish timer +-spec ensure_publish_timer(state()) -> state(). ensure_publish_timer(State) -> ensure_publish_timer(mnesia:dirty_first(?TAB), State). ensure_publish_timer('$end_of_table', State) -> - State#{timer := undefined, publish_at := 0}; -ensure_publish_timer({Ts, _Id}, State = #{timer := undefined}) -> + State#{publish_timer := undefined, publish_at := 0}; +ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) -> ensure_publish_timer(Ts, os:system_time(seconds), State); -ensure_publish_timer({Ts, _Id}, State = #{timer := TRef, publish_at := PubAt}) +ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when Ts < PubAt -> ok = emqx_misc:cancel_timer(TRef), ensure_publish_timer(Ts, os:system_time(seconds), State); @@ -317,7 +331,7 @@ ensure_publish_timer(_Key, State) -> ensure_publish_timer(Ts, Now, State) -> Interval = max(1, Ts - Now), TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish), - State#{timer := TRef, publish_at := Now + Interval}. + State#{publish_timer := TRef, publish_at := Now + Interval}. do_publish(Key, Now) -> do_publish(Key, Now, []).