%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_slow_subs). -behaviour(gen_server). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -export([ start_link/0, on_delivery_completed/3, update_settings/1, clear_history/0, init_tab/0, post_config_update/5 ]). %% gen_server callbacks -export([ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3 ]). -compile(nowarn_unused_type). -type state() :: #{ enable := boolean(), last_tick_at := pos_integer(), expire_timer := undefined | reference() }. -type message() :: #message{}. %% whole = internal + response -type stats_type() :: whole %% timespan from message in to deliver | internal %% timespan from delivery to client response | response. -type stats_update_args() :: #{session_birth_time := pos_integer()}. -type stats_update_env() :: #{ threshold := non_neg_integer(), stats_type := stats_type(), max_size := pos_integer() }. -ifdef(TEST). -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). -else. -define(EXPIRE_CHECK_INTERVAL, timer:seconds(10)). -endif. -define(NOW, erlang:system_time(millisecond)). -define(DEF_CALL_TIMEOUT, timer:seconds(10)). %% erlang term order %% number < atom < reference < fun < port < pid < tuple < list < bit string %% ets ordered_set is ascending by term order %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Start the st_statistics -spec start_link() -> emqx_types:startlink_ret(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). on_delivery_completed( #message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg ) when Ts =< BirthTime -> ok; on_delivery_completed(Msg, Env, Cfg) -> on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg). on_delivery_completed( #message{topic = Topic} = Msg, #{clientid := ClientId}, Now, #{ threshold := Threshold, stats_type := StatsType, max_size := MaxSize } ) -> TimeSpan = calc_timespan(StatsType, Msg, Now), case TimeSpan =< Threshold of true -> ok; _ -> Id = ?ID(ClientId, Topic), LastUpdateValue = find_last_update_value(Id), case TimeSpan =< LastUpdateValue of true -> ok; _ -> try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) end end. clear_history() -> gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT). update_settings(Conf) -> emqx_conf:update([slow_subs], Conf, #{override_to => cluster}). post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) -> gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT). init_tab() -> safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table, {keypos, #top_k.index}, {write_concurrency, true}, {read_concurrency, true} ]), safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table, {keypos, #index_tab.index}, {write_concurrency, true}, {read_concurrency, true} ]). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> erlang:process_flag(trap_exit, true), ok = emqx_conf:add_handler([slow_subs], ?MODULE), InitState = #{ enable => false, last_tick_at => 0, expire_timer => undefined }, Cfg = emqx:get_config([slow_subs]), {ok, check_enable(Cfg, InitState)}. handle_call({update_settings, Cfg}, _From, State) -> State2 = check_enable(Cfg, State), {reply, ok, State2}; handle_call(clear_history, _, State) -> do_clear_history(), {reply, ok, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info(expire_tick, State) -> Logs = ets:tab2list(?TOPK_TAB), do_clear(Logs), State1 = start_timer(expire_timer, fun expire_tick/0, State), {noreply, State1}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, State) -> ok = emqx_conf:remove_handler([slow_subs]), _ = unload(State), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- expire_tick() -> erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). load( #{ top_k_num := MaxSizeT, stats_type := StatsType, threshold := Threshold }, State ) -> MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE), ok = emqx_hooks:put( 'delivery.completed', {?MODULE, on_delivery_completed, [ #{ max_size => MaxSize, stats_type => StatsType, threshold => Threshold } ]}, ?HP_SLOW_SUB ), State1 = start_timer(expire_timer, fun expire_tick/0, State), State1#{enable := true, last_tick_at => ?NOW}. unload(#{expire_timer := ExpireTimer} = State) -> emqx_hooks:del('delivery.completed', {?MODULE, on_delivery_completed}), State#{ enable := false, expire_timer := cancel_timer(ExpireTimer) }. do_clear(Logs) -> Now = ?NOW, Interval = emqx:get_config([slow_subs, expire_interval]), Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) -> case Now - Ts >= Interval of true -> delete_with_index(TimeSpan, Id); _ -> true end end, lists:foreach(Each, Logs). -spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer(). calc_timespan(whole, #message{timestamp = Ts}, Now) -> Now - Ts; calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) -> End = emqx_message:get_header(deliver_begin_at, Msg, Now), End - Ts; calc_timespan(response, Msg, Now) -> Begin = emqx_message:get_header(deliver_begin_at, Msg, Now), Now - Begin. %% update_topk is safe, because each process has a unique clientid %% insert or delete are bind to this clientid, so there is no race condition %% %% but, the delete_with_index in L249 may have a race condition %% because the data belong to other clientid will be deleted here %% (deleted the data written by other processes). %% so it may appear that: %% when deleting a record, the other process is performing an update operation on this recrod %% in order to solve this race condition problem, the index table also uses the ordered_set type, %% so that even if the above situation occurs, it will only cause the old data to be deleted twice %% and the correctness of the data will not be affected try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) -> case ets:info(?TOPK_TAB, size) of Size when Size < MaxSize -> update_topk(Now, LastUpdateValue, TimeSpan, Id); _Size -> case ets:first(?TOPK_TAB) of '$end_of_table' -> update_topk(Now, LastUpdateValue, TimeSpan, Id); ?TOPK_INDEX(_, Id) -> update_topk(Now, LastUpdateValue, TimeSpan, Id); ?TOPK_INDEX(Min, MinId) -> case TimeSpan =< Min of true -> false; _ -> update_topk(Now, LastUpdateValue, TimeSpan, Id), delete_with_index(Min, MinId) end end end. -spec find_last_update_value(id()) -> non_neg_integer(). find_last_update_value(Id) -> case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of ?INDEX(LastUpdateValue, Id) -> LastUpdateValue; _ -> 0 end. -spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true. update_topk(Now, LastUpdateValue, TimeSpan, Id) -> %% update record ets:insert(?TOPK_TAB, #top_k{ index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Now, extra = [] }), %% update index ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}), %% delete the old record & index delete_with_index(LastUpdateValue, Id). -spec delete_with_index(non_neg_integer(), id()) -> true. delete_with_index(0, _) -> true; delete_with_index(TimeSpan, Id) -> ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)), ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)). safe_create_tab(Name, Opts) -> case ets:whereis(Name) of undefined -> Name = ets:new(Name, Opts); _ -> Name end. do_clear_history() -> ets:delete_all_objects(?INDEX_TAB), ets:delete_all_objects(?TOPK_TAB). check_enable(#{enable := Enable} = Cfg, #{enable := IsEnable} = State) -> case {IsEnable, Enable} of {false, true} -> load(Cfg, State); {true, false} -> unload(State); {true, true} -> S1 = unload(State), load(Cfg, S1); _ -> State end. start_timer(Name, Fun, State) -> _ = cancel_timer(maps:get(Name, State)), State#{Name := Fun()}. cancel_timer(TimerRef) when is_reference(TimerRef) -> _ = erlang:cancel_timer(TimerRef), undefined; cancel_timer(_) -> undefined.