fix(delayed): cancel the publish timer when terminate
This commit is contained in:
parent
e6fcef16ba
commit
43a935286a
|
@ -19,6 +19,7 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
|
@ -57,6 +58,14 @@
|
||||||
-record(delayed_message, {key, delayed, msg}).
|
-record(delayed_message, {key, delayed, msg}).
|
||||||
-type delayed_message() :: #delayed_message{}.
|
-type delayed_message() :: #delayed_message{}.
|
||||||
|
|
||||||
|
|
||||||
|
-type state() :: #{ publish_timer := maybe(timer:tref())
|
||||||
|
, publish_at := non_neg_integer()
|
||||||
|
, stats_timer := maybe(reference())
|
||||||
|
, stats_fun := maybe(fun((pos_integer()) -> ok))
|
||||||
|
, max_delayed_messages := non_neg_integer()
|
||||||
|
}.
|
||||||
|
|
||||||
%% sync ms with record change
|
%% sync ms with record change
|
||||||
-define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
|
-define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
|
||||||
-define(DELETE_MS(Id), [{{delayed_message, {'$1', Id}, '_', '_'}, [], ['$1']}]).
|
-define(DELETE_MS(Id), [{{delayed_message, {'$1', Id}, '_', '_'}, [], ['$1']}]).
|
||||||
|
@ -226,8 +235,10 @@ init([Opts]) ->
|
||||||
emqx_conf:add_handler([delayed], ?MODULE),
|
emqx_conf:add_handler([delayed], ?MODULE),
|
||||||
MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0),
|
MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0),
|
||||||
{ok, ensure_stats_event(
|
{ok, ensure_stats_event(
|
||||||
ensure_publish_timer(#{timer => undefined,
|
ensure_publish_timer(#{publish_timer => undefined,
|
||||||
publish_at => 0,
|
publish_at => 0,
|
||||||
|
stats_timer => undefined,
|
||||||
|
stats_fun => undefined,
|
||||||
max_delayed_messages => MaxDelayedMessages}))}.
|
max_delayed_messages => MaxDelayedMessages}))}.
|
||||||
|
|
||||||
handle_call({set_max_delayed_messages, Max}, _From, State) ->
|
handle_call({set_max_delayed_messages, Max}, _From, State) ->
|
||||||
|
@ -268,10 +279,10 @@ handle_cast(Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
%% Do Publish...
|
%% Do Publish...
|
||||||
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
|
handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) ->
|
||||||
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
|
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
|
||||||
lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
|
lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
|
||||||
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
|
{noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})};
|
||||||
|
|
||||||
handle_info(stats, State = #{stats_fun := StatsFun}) ->
|
handle_info(stats, State = #{stats_fun := StatsFun}) ->
|
||||||
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
|
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
|
||||||
|
@ -282,9 +293,10 @@ handle_info(Info, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #{stats_timer := TRef}) ->
|
terminate(_Reason, #{publish_timer := PublishTimer, stats_timer := StatsTimer}) ->
|
||||||
emqx_conf:remove_handler([delayed]),
|
emqx_conf:remove_handler([delayed]),
|
||||||
emqx_misc:cancel_timer(TRef).
|
emqx_misc:cancel_timer(PublishTimer),
|
||||||
|
emqx_misc:cancel_timer(StatsTimer).
|
||||||
|
|
||||||
code_change(_Vsn, State, _Extra) ->
|
code_change(_Vsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -294,20 +306,22 @@ code_change(_Vsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% Ensure the stats
|
%% Ensure the stats
|
||||||
|
-spec ensure_stats_event(state()) -> state().
|
||||||
ensure_stats_event(State) ->
|
ensure_stats_event(State) ->
|
||||||
StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'),
|
StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'),
|
||||||
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
|
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
|
||||||
State#{stats_fun => StatsFun, stats_timer => StatsTimer}.
|
State#{stats_fun := StatsFun, stats_timer := StatsTimer}.
|
||||||
|
|
||||||
%% Ensure publish timer
|
%% Ensure publish timer
|
||||||
|
-spec ensure_publish_timer(state()) -> state().
|
||||||
ensure_publish_timer(State) ->
|
ensure_publish_timer(State) ->
|
||||||
ensure_publish_timer(mnesia:dirty_first(?TAB), State).
|
ensure_publish_timer(mnesia:dirty_first(?TAB), State).
|
||||||
|
|
||||||
ensure_publish_timer('$end_of_table', State) ->
|
ensure_publish_timer('$end_of_table', State) ->
|
||||||
State#{timer := undefined, publish_at := 0};
|
State#{publish_timer := undefined, publish_at := 0};
|
||||||
ensure_publish_timer({Ts, _Id}, State = #{timer := undefined}) ->
|
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) ->
|
||||||
ensure_publish_timer(Ts, os:system_time(seconds), State);
|
ensure_publish_timer(Ts, os:system_time(seconds), State);
|
||||||
ensure_publish_timer({Ts, _Id}, State = #{timer := TRef, publish_at := PubAt})
|
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt})
|
||||||
when Ts < PubAt ->
|
when Ts < PubAt ->
|
||||||
ok = emqx_misc:cancel_timer(TRef),
|
ok = emqx_misc:cancel_timer(TRef),
|
||||||
ensure_publish_timer(Ts, os:system_time(seconds), State);
|
ensure_publish_timer(Ts, os:system_time(seconds), State);
|
||||||
|
@ -317,7 +331,7 @@ ensure_publish_timer(_Key, State) ->
|
||||||
ensure_publish_timer(Ts, Now, State) ->
|
ensure_publish_timer(Ts, Now, State) ->
|
||||||
Interval = max(1, Ts - Now),
|
Interval = max(1, Ts - Now),
|
||||||
TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish),
|
TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish),
|
||||||
State#{timer := TRef, publish_at := Now + Interval}.
|
State#{publish_timer := TRef, publish_at := Now + Interval}.
|
||||||
|
|
||||||
do_publish(Key, Now) ->
|
do_publish(Key, Now) ->
|
||||||
do_publish(Key, Now, []).
|
do_publish(Key, Now, []).
|
||||||
|
|
Loading…
Reference in New Issue