From deb64bbbdca74a0778eba9aabbfc04d7a1d59704 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Fri, 15 Apr 2022 19:17:47 +0800 Subject: [PATCH] chore: make sure add_handler is ok. --- .../src/emqx_auto_subscribe.erl | 39 ++-- apps/emqx_slow_subs/src/emqx_slow_subs.erl | 186 ++++++++++-------- 2 files changed, 132 insertions(+), 93 deletions(-) diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 067c750c1..27dcb38d9 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -20,19 +20,21 @@ -define(MAX_AUTO_SUBSCRIBE, 20). --export([load/0, unload/0]). % +% +-export([load/0, unload/0]). --export([ max_limit/0 - , list/0 - , update/1 - , post_config_update/5 - ]). +-export([ + max_limit/0, + list/0, + update/1, + post_config_update/5 +]). %% hook callback -export([on_client_connected/3]). load() -> - emqx_conf:add_handler([auto_subscribe, topics], ?MODULE), + ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE), update_hook(). unload() -> @@ -56,7 +58,8 @@ post_config_update(_KeyPath, _Req, NewTopics, _OldConf, _AppEnvs) -> on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) -> case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of - [] -> ok; + [] -> + ok; TopicTables -> _ = self() ! {subscribe, TopicTables}, ok @@ -71,17 +74,21 @@ format(Rules) when is_list(Rules) -> [format(Rule) || Rule <- Rules]; format(Rule = #{topic := Topic}) when is_map(Rule) -> #{ - topic => Topic, - qos => maps:get(qos, Rule, 0), - rh => maps:get(rh, Rule, 0), - rap => maps:get(rap, Rule, 0), - nl => maps:get(nl, Rule, 0) + topic => Topic, + qos => maps:get(qos, Rule, 0), + rh => maps:get(rh, Rule, 0), + rap => maps:get(rap, Rule, 0), + nl => maps:get(nl, Rule, 0) }. update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> - case emqx_conf:update([auto_subscribe, topics], - Topics, - #{rawconf_with_defaults => true, override_to => cluster}) of + case + emqx_conf:update( + [auto_subscribe, topics], + Topics, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of {ok, #{raw_config := NewTopics}} -> {ok, NewTopics}; {error, Reason} -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index a29dea123..f0605baca 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -22,37 +22,50 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). --export([ start_link/0, on_delivery_completed/3, update_settings/1 - , clear_history/0, init_tab/0, post_config_update/5 - ]). +-export([ + start_link/0, + on_delivery_completed/3, + update_settings/1, + clear_history/0, + init_tab/0, + post_config_update/5 +]). %% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). -compile(nowarn_unused_type). --type state() :: #{ enable := boolean() - , last_tick_at := pos_integer() - , expire_timer := undefined | reference() - }. +-type state() :: #{ + enable := boolean(), + last_tick_at := pos_integer(), + expire_timer := undefined | reference() +}. -type message() :: #message{}. --type stats_type() :: whole %% whole = internal + response - | internal %% timespan from message in to deliver - | response. %% timespan from delivery to client response +%% whole = internal + response +-type stats_type() :: + whole + %% timespan from message in to deliver + | internal + %% timespan from delivery to client response + | response. -type stats_update_args() :: #{session_birth_time := pos_integer()}. --type stats_update_env() :: #{ threshold := non_neg_integer() - , stats_type := stats_type() - , max_size := pos_integer()}. +-type stats_update_env() :: #{ + threshold := non_neg_integer(), + stats_type := stats_type(), + max_size := pos_integer() +}. -ifdef(TEST). -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). @@ -73,33 +86,39 @@ %% APIs %%-------------------------------------------------------------------- %% @doc Start the st_statistics --spec(start_link() -> emqx_types:startlink_ret()). +-spec start_link() -> emqx_types:startlink_ret(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -on_delivery_completed(#message{timestamp = Ts}, - #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> +on_delivery_completed( + #message{timestamp = Ts}, + #{session_birth_time := BirthTime}, + _Cfg +) when Ts =< BirthTime -> ok; - on_delivery_completed(Msg, Env, Cfg) -> on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg). -on_delivery_completed(#message{topic = Topic} = Msg, - #{clientid := ClientId}, - Now, - #{threshold := Threshold, - stats_type := StatsType, - max_size := MaxSize}) -> +on_delivery_completed( + #message{topic = Topic} = Msg, + #{clientid := ClientId}, + Now, + #{ + threshold := Threshold, + stats_type := StatsType, + max_size := MaxSize + } +) -> TimeSpan = calc_timespan(StatsType, Msg, Now), case TimeSpan =< Threshold of - true -> ok; + true -> + ok; _ -> Id = ?ID(ClientId, Topic), LastUpdateValue = find_last_update_value(Id), case TimeSpan =< LastUpdateValue of true -> ok; - _ -> - try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) + _ -> try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) end end. @@ -113,15 +132,23 @@ post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) -> gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT). init_tab() -> - safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table - , {keypos, #top_k.index}, {write_concurrency, true} - , {read_concurrency, true} - ]), + safe_create_tab(?TOPK_TAB, [ + ordered_set, + public, + named_table, + {keypos, #top_k.index}, + {write_concurrency, true}, + {read_concurrency, true} + ]), - safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table - , {keypos, #index_tab.index}, {write_concurrency, true} - , {read_concurrency, true} - ]). + safe_create_tab(?INDEX_TAB, [ + ordered_set, + public, + named_table, + {keypos, #index_tab.index}, + {write_concurrency, true}, + {read_concurrency, true} + ]). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -130,12 +157,13 @@ init_tab() -> init([]) -> erlang:process_flag(trap_exit, true), - emqx_conf:add_handler([slow_subs], ?MODULE), + ok = emqx_conf:add_handler([slow_subs], ?MODULE), - InitState = #{enable => false, - last_tick_at => 0, - expire_timer => undefined - }, + InitState = #{ + enable => false, + last_tick_at => 0, + expire_timer => undefined + }, Enable = emqx:get_config([slow_subs, enable]), {ok, check_enable(Enable, InitState)}. @@ -143,11 +171,9 @@ init([]) -> handle_call({update_settings, #{enable := Enable}}, _From, State) -> State2 = check_enable(Enable, State), {reply, ok, State2}; - handle_call(clear_history, _, State) -> do_clear_history(), {reply, ok, State}; - handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -161,12 +187,12 @@ handle_info(expire_tick, State) -> do_clear(Logs), State1 = start_timer(expire_timer, fun expire_tick/0, State), {noreply, State1}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, State) -> + ok = emqx_conf:remove_handler([slow_subs]), _ = unload(State), ok. @@ -180,46 +206,52 @@ expire_tick() -> erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). load(State) -> - #{top_k_num := MaxSizeT, - stats_type := StatsType, - threshold := Threshold} = emqx:get_config([slow_subs]), + #{ + top_k_num := MaxSizeT, + stats_type := StatsType, + threshold := Threshold + } = emqx:get_config([slow_subs]), MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE), - _ = emqx:hook('delivery.completed', - {?MODULE, on_delivery_completed, - [#{max_size => MaxSize, - stats_type => StatsType, - threshold => Threshold - }]}), + _ = emqx:hook( + 'delivery.completed', + {?MODULE, on_delivery_completed, [ + #{ + max_size => MaxSize, + stats_type => StatsType, + threshold => Threshold + } + ]} + ), State1 = start_timer(expire_timer, fun expire_tick/0, State), State1#{enable := true, last_tick_at => ?NOW}. unload(#{expire_timer := ExpireTimer} = State) -> emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}), - State#{enable := false, - expire_timer := cancel_timer(ExpireTimer)}. + State#{ + enable := false, + expire_timer := cancel_timer(ExpireTimer) + }. do_clear(Logs) -> Now = ?NOW, Interval = emqx:get_config([slow_subs, expire_interval]), Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) -> - case Now - Ts >= Interval of - true -> - delete_with_index(TimeSpan, Id); - _ -> - true - end - end, + case Now - Ts >= Interval of + true -> + delete_with_index(TimeSpan, Id); + _ -> + true + end + end, lists:foreach(Each, Logs). -spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer(). calc_timespan(whole, #message{timestamp = Ts}, Now) -> Now - Ts; - calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) -> End = emqx_message:get_header(deliver_begin_at, Msg, Now), End - Ts; - calc_timespan(response, Msg, Now) -> Begin = emqx_message:get_header(deliver_begin_at, Msg, Now), Now - Begin. @@ -248,7 +280,8 @@ try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) -> update_topk(Now, LastUpdateValue, TimeSpan, Id); ?TOPK_INDEX(Min, MinId) -> case TimeSpan =< Min of - true -> false; + true -> + false; _ -> update_topk(Now, LastUpdateValue, TimeSpan, Id), delete_with_index(Min, MinId) @@ -256,10 +289,9 @@ try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) -> end end. - -spec find_last_update_value(id()) -> non_neg_integer(). find_last_update_value(Id) -> - case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of + case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of ?INDEX(LastUpdateValue, Id) -> LastUpdateValue; _ -> @@ -269,10 +301,11 @@ find_last_update_value(Id) -> -spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true. update_topk(Now, LastUpdateValue, TimeSpan, Id) -> %% update record - ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id), - last_update_time = Now, - extra = [] - }), + ets:insert(?TOPK_TAB, #top_k{ + index = ?TOPK_INDEX(TimeSpan, Id), + last_update_time = Now, + extra = [] + }), %% update index ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}), @@ -283,7 +316,6 @@ update_topk(Now, LastUpdateValue, TimeSpan, Id) -> -spec delete_with_index(non_neg_integer(), id()) -> true. delete_with_index(0, _) -> true; - delete_with_index(TimeSpan, Id) -> ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)), ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)).