Merge pull request #7320 from lafirest/fix/delayed_terminate
Fix/delayed terminate
This commit is contained in:
commit
2133f8c4a3
|
@ -53,11 +53,18 @@
|
||||||
{ key
|
{ key
|
||||||
, msg
|
, msg
|
||||||
}).
|
}).
|
||||||
|
-type delayed_message() :: #delayed_message{}.
|
||||||
|
|
||||||
-define(TAB, ?MODULE).
|
-define(TAB, ?MODULE).
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
-define(MAX_INTERVAL, 4294967).
|
-define(MAX_INTERVAL, 4294967).
|
||||||
|
|
||||||
|
-type state() :: #{ publish_at := non_neg_integer()
|
||||||
|
, timer := timer:tref() | undefined
|
||||||
|
, stats_timer => timer:tref() | undefined
|
||||||
|
, stats_fun => function()
|
||||||
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -124,7 +131,7 @@ on_message_publish(Msg) ->
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
-spec(store(#delayed_message{}) -> ok).
|
-spec(store(delayed_message()) -> ok).
|
||||||
store(DelayedMsg) ->
|
store(DelayedMsg) ->
|
||||||
gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
|
gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
|
||||||
|
|
||||||
|
@ -134,7 +141,9 @@ store(DelayedMsg) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, ensure_stats_event(
|
{ok, ensure_stats_event(
|
||||||
ensure_publish_timer(#{timer => undefined, publish_at => 0}))}.
|
ensure_publish_timer(#{timer => undefined,
|
||||||
|
publish_at => 0,
|
||||||
|
stats_timer => undefined}))}.
|
||||||
|
|
||||||
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
|
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
|
||||||
ok = mnesia:dirty_write(?TAB, DelayedMsg),
|
ok = mnesia:dirty_write(?TAB, DelayedMsg),
|
||||||
|
@ -163,8 +172,9 @@ handle_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #{timer := TRef}) ->
|
terminate(_Reason, #{timer := PublishTimer} = State) ->
|
||||||
emqx_misc:cancel_timer(TRef).
|
emqx_misc:cancel_timer(PublishTimer),
|
||||||
|
emqx_misc:cancel_timer(maps:get(stats_timer, State, undefined)).
|
||||||
|
|
||||||
code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" ->
|
code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" ->
|
||||||
NState = maps:with([timer, publish_at], State),
|
NState = maps:with([timer, publish_at], State),
|
||||||
|
@ -179,12 +189,14 @@ code_change(Vsn, State, _Extra) when Vsn =:= "4.3.0" ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% Ensure the stats
|
%% Ensure the stats
|
||||||
|
-spec ensure_stats_event(state()) -> state().
|
||||||
ensure_stats_event(State) ->
|
ensure_stats_event(State) ->
|
||||||
StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'),
|
StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'),
|
||||||
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
|
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
|
||||||
State#{stats_fun => StatsFun, stats_timer => StatsTimer}.
|
State#{stats_fun => StatsFun, stats_timer => StatsTimer}.
|
||||||
|
|
||||||
%% Ensure publish timer
|
%% Ensure publish timer
|
||||||
|
-spec ensure_publish_timer(state()) -> state().
|
||||||
ensure_publish_timer(State) ->
|
ensure_publish_timer(State) ->
|
||||||
ensure_publish_timer(mnesia:dirty_first(?TAB), State).
|
ensure_publish_timer(mnesia:dirty_first(?TAB), State).
|
||||||
|
|
||||||
|
@ -222,4 +234,3 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
|
||||||
|
|
||||||
-spec(delayed_count() -> non_neg_integer()).
|
-spec(delayed_count() -> non_neg_integer()).
|
||||||
delayed_count() -> mnesia:table_info(?TAB, size).
|
delayed_count() -> mnesia:table_info(?TAB, size).
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_modules,
|
{application, emqx_modules,
|
||||||
[{description, "EMQ X Module Management"},
|
[{description, "EMQ X Module Management"},
|
||||||
{vsn, "4.3.4"},
|
{vsn, "4.3.5"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{applications, [kernel,stdlib]},
|
{applications, [kernel,stdlib]},
|
||||||
{mod, {emqx_modules_app, []}},
|
{mod, {emqx_modules_app, []}},
|
||||||
|
|
|
@ -1,33 +1,29 @@
|
||||||
%% -*-: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[{"4.3.4",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[2-3]">>, [
|
{<<"4\\.3\\.[2-3]">>,
|
||||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
|
[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.1", [
|
{"4.3.1",
|
||||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0", [
|
{"4.3.0",
|
||||||
{update, emqx_mod_delayed, {advanced, []}},
|
[{update,emqx_mod_delayed,{advanced,[]}},
|
||||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
{<<".*">>,[]}],
|
||||||
{<<".*">>, []}
|
[{"4.3.4",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
||||||
],
|
{<<"4\\.3\\.[2-3]">>,
|
||||||
[
|
[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
{<<"4\\.3\\.[2-3]">>, [
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
|
{"4.3.1",
|
||||||
]},
|
[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
{"4.3.1", [
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
{"4.3.0",
|
||||||
]},
|
[{update,emqx_mod_delayed,{advanced,[]}},
|
||||||
{"4.3.0", [
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
{update, emqx_mod_delayed, {advanced, []}},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
{<<".*">>,[]}]}.
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{<<".*">>, []}
|
|
||||||
]
|
|
||||||
}.
|
|
||||||
|
|
Loading…
Reference in New Issue