diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index 50de524af..445465e1f 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -39,6 +39,8 @@ -type state() :: #{ enable := boolean() , last_tick_at := pos_integer() + , expire_timer := undefined | reference() + , notice_timer := undefined | reference() }. -type log() :: #{ rank := pos_integer() @@ -141,8 +143,14 @@ init_topk_tab() -> %%-------------------------------------------------------------------- init([]) -> + InitState = #{enable => false, + last_tick_at => 0, + expire_timer => undefined, + notice_timer => undefined + }, + Enable = emqx:get_config([emqx_slow_subs, enable]), - {ok, check_enable(Enable, #{enable => false})}. + {ok, check_enable(Enable, InitState)}. handle_call({update_settings, Enable}, _From, State) -> State2 = check_enable(Enable, State), @@ -161,23 +169,23 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info(expire_tick, State) -> - expire_tick(), Logs = ets:tab2list(?TOPK_TAB), do_clear(Logs), - {noreply, State}; + State1 = start_timer(expire_timer, fun expire_tick/0, State), + {noreply, State1}; handle_info(notice_tick, State) -> - notice_tick(), Logs = ets:tab2list(?TOPK_TAB), do_notification(Logs, State), - {noreply, State#{last_tick_at := ?NOW}}; + State1 = start_timer(notice_timer, fun notice_tick/0, State), + {noreply, State1#{last_tick_at := ?NOW}}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, _) -> - unload(), +terminate(_Reason, State) -> + _ = unload(State), ok. code_change(_OldVsn, State, _Extra) -> @@ -191,10 +199,9 @@ expire_tick() -> notice_tick() -> case emqx:get_config([emqx_slow_subs, notice_interval]) of - 0 -> ok; + 0 -> undefined; Interval -> - erlang:send_after(Interval, self(), ?FUNCTION_NAME), - ok + erlang:send_after(Interval, self(), ?FUNCTION_NAME) end. -spec do_notification(list(), state()) -> ok. @@ -250,16 +257,23 @@ publish(TickTime, Notices) -> _ = emqx_broker:safe_publish(Msg), ok. -load() -> +load(State) -> MaxSizeT = emqx:get_config([emqx_slow_subs, top_k_num]), MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE), _ = emqx:hook('message.slow_subs_stats', {?MODULE, on_stats_update, [#{max_size => MaxSize}]} ), - ok. -unload() -> - emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}). + State1 = start_timer(notice_timer, fun notice_tick/0, State), + State2 = start_timer(expire_timer, fun expire_tick/0, State1), + State2#{enable := true, last_tick_at => ?NOW}. + + +unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) -> + emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}), + State#{notice_timer := cancel_timer(NoticeTimer), + expire_timer := cancel_timer(ExpireTimer) + }. do_clear(Logs) -> Now = ?NOW, @@ -304,16 +318,22 @@ check_enable(Enable, #{enable := IsEnable} = State) -> IsEnable -> State; true -> - notice_tick(), - expire_tick(), - load(), - State#{enable := true, last_tick_at => ?NOW}; + load(State); _ -> - unload(), - State#{enable := false} + unload(State) end. update_threshold() -> Threshold = emqx:get_config([emqx_slow_subs, threshold]), emqx_message_latency_stats:update_threshold(Threshold), ok. + +start_timer(Name, Fun, State) -> + _ = cancel_timer(maps:get(Name, State)), + State#{Name := Fun()}. + +cancel_timer(TimerRef) when is_reference(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + undefined; +cancel_timer(_) -> + undefined.