Merge pull request #6576 from lafirest/fix/slow_subs_timer

fix(emqx_slow_subs): fix timer manager error
This commit is contained in:
lafirest 2021-12-30 15:42:29 +08:00 committed by GitHub
commit c602bec883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 40 additions and 20 deletions

View File

@ -39,6 +39,8 @@
-type state() :: #{ enable := boolean() -type state() :: #{ enable := boolean()
, last_tick_at := pos_integer() , last_tick_at := pos_integer()
, expire_timer := undefined | reference()
, notice_timer := undefined | reference()
}. }.
-type log() :: #{ rank := pos_integer() -type log() :: #{ rank := pos_integer()
@ -141,8 +143,14 @@ init_topk_tab() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
InitState = #{enable => false,
last_tick_at => 0,
expire_timer => undefined,
notice_timer => undefined
},
Enable = emqx:get_config([emqx_slow_subs, enable]), Enable = emqx:get_config([emqx_slow_subs, enable]),
{ok, check_enable(Enable, #{enable => false})}. {ok, check_enable(Enable, InitState)}.
handle_call({update_settings, Enable}, _From, State) -> handle_call({update_settings, Enable}, _From, State) ->
State2 = check_enable(Enable, State), State2 = check_enable(Enable, State),
@ -161,23 +169,23 @@ handle_cast(Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info(expire_tick, State) -> handle_info(expire_tick, State) ->
expire_tick(),
Logs = ets:tab2list(?TOPK_TAB), Logs = ets:tab2list(?TOPK_TAB),
do_clear(Logs), do_clear(Logs),
{noreply, State}; State1 = start_timer(expire_timer, fun expire_tick/0, State),
{noreply, State1};
handle_info(notice_tick, State) -> handle_info(notice_tick, State) ->
notice_tick(),
Logs = ets:tab2list(?TOPK_TAB), Logs = ets:tab2list(?TOPK_TAB),
do_notification(Logs, State), do_notification(Logs, State),
{noreply, State#{last_tick_at := ?NOW}}; State1 = start_timer(notice_timer, fun notice_tick/0, State),
{noreply, State1#{last_tick_at := ?NOW}};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _) -> terminate(_Reason, State) ->
unload(), _ = unload(State),
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -191,10 +199,9 @@ expire_tick() ->
notice_tick() -> notice_tick() ->
case emqx:get_config([emqx_slow_subs, notice_interval]) of case emqx:get_config([emqx_slow_subs, notice_interval]) of
0 -> ok; 0 -> undefined;
Interval -> Interval ->
erlang:send_after(Interval, self(), ?FUNCTION_NAME), erlang:send_after(Interval, self(), ?FUNCTION_NAME)
ok
end. end.
-spec do_notification(list(), state()) -> ok. -spec do_notification(list(), state()) -> ok.
@ -250,16 +257,23 @@ publish(TickTime, Notices) ->
_ = emqx_broker:safe_publish(Msg), _ = emqx_broker:safe_publish(Msg),
ok. ok.
load() -> load(State) ->
MaxSizeT = emqx:get_config([emqx_slow_subs, top_k_num]), MaxSizeT = emqx:get_config([emqx_slow_subs, top_k_num]),
MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE), MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE),
_ = emqx:hook('message.slow_subs_stats', _ = emqx:hook('message.slow_subs_stats',
{?MODULE, on_stats_update, [#{max_size => MaxSize}]} {?MODULE, on_stats_update, [#{max_size => MaxSize}]}
), ),
ok.
unload() -> State1 = start_timer(notice_timer, fun notice_tick/0, State),
emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}). State2 = start_timer(expire_timer, fun expire_tick/0, State1),
State2#{enable := true, last_tick_at => ?NOW}.
unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) ->
emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}),
State#{notice_timer := cancel_timer(NoticeTimer),
expire_timer := cancel_timer(ExpireTimer)
}.
do_clear(Logs) -> do_clear(Logs) ->
Now = ?NOW, Now = ?NOW,
@ -304,16 +318,22 @@ check_enable(Enable, #{enable := IsEnable} = State) ->
IsEnable -> IsEnable ->
State; State;
true -> true ->
notice_tick(), load(State);
expire_tick(),
load(),
State#{enable := true, last_tick_at => ?NOW};
_ -> _ ->
unload(), unload(State)
State#{enable := false}
end. end.
update_threshold() -> update_threshold() ->
Threshold = emqx:get_config([emqx_slow_subs, threshold]), Threshold = emqx:get_config([emqx_slow_subs, threshold]),
emqx_message_latency_stats:update_threshold(Threshold), emqx_message_latency_stats:update_threshold(Threshold),
ok. ok.
start_timer(Name, Fun, State) ->
_ = cancel_timer(maps:get(Name, State)),
State#{Name := Fun()}.
cancel_timer(TimerRef) when is_reference(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
undefined;
cancel_timer(_) ->
undefined.