diff --git a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl index ac5be58b2..53280becc 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl @@ -53,11 +53,18 @@ { key , msg }). +-type delayed_message() :: #delayed_message{}. -define(TAB, ?MODULE). -define(SERVER, ?MODULE). -define(MAX_INTERVAL, 4294967). +-type state() :: #{ publish_at := non_neg_integer() + , timer := timer:tref() | undefined + , stats_timer => timer:tref() | undefined + , stats_fun => function() + }. + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -124,7 +131,7 @@ on_message_publish(Msg) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(store(#delayed_message{}) -> ok). +-spec(store(delayed_message()) -> ok). store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). @@ -134,7 +141,9 @@ store(DelayedMsg) -> init([]) -> {ok, ensure_stats_event( - ensure_publish_timer(#{timer => undefined, publish_at => 0}))}. + ensure_publish_timer(#{timer => undefined, + publish_at => 0, + stats_timer => undefined}))}. handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> ok = mnesia:dirty_write(?TAB, DelayedMsg), @@ -163,8 +172,9 @@ handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #{timer := TRef}) -> - emqx_misc:cancel_timer(TRef). +terminate(_Reason, #{timer := PublishTimer} = State) -> + emqx_misc:cancel_timer(PublishTimer), + emqx_misc:cancel_timer(maps:get(stats_timer, State, undefined)). code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" -> NState = maps:with([timer, publish_at], State), @@ -179,12 +189,14 @@ code_change(Vsn, State, _Extra) when Vsn =:= "4.3.0" -> %%-------------------------------------------------------------------- %% Ensure the stats +-spec ensure_stats_event(state()) -> state(). ensure_stats_event(State) -> StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'), {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), State#{stats_fun => StatsFun, stats_timer => StatsTimer}. %% Ensure publish timer +-spec ensure_publish_timer(state()) -> state(). ensure_publish_timer(State) -> ensure_publish_timer(mnesia:dirty_first(?TAB), State). @@ -222,4 +234,3 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> -spec(delayed_count() -> non_neg_integer()). delayed_count() -> mnesia:table_info(?TAB, size). - diff --git a/lib-ce/emqx_modules/src/emqx_modules.app.src b/lib-ce/emqx_modules/src/emqx_modules.app.src index 47a3d8888..ccc3a4c28 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.app.src +++ b/lib-ce/emqx_modules/src/emqx_modules.app.src @@ -1,6 +1,6 @@ {application, emqx_modules, [{description, "EMQ X Module Management"}, - {vsn, "4.3.4"}, + {vsn, "4.3.5"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src index 1b9eeec84..a82421aec 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.appup.src +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -1,33 +1,29 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [ - {<<"4\\.3\\.[2-3]">>, [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}}, - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ], - [ - {<<"4\\.3\\.[2-3]">>, [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}}, - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ] -}. + [{"4.3.4",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, + [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]}, + {"4.3.1", + [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {"4.3.0", + [{update,emqx_mod_delayed,{advanced,[]}}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}], + [{"4.3.4",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, + [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]}, + {"4.3.1", + [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {"4.3.0", + [{update,emqx_mod_delayed,{advanced,[]}}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}]}.