chore: make sure add_handler is ok.
This commit is contained in:
parent
ce915f0bbd
commit
deb64bbbdc
|
@ -20,19 +20,21 @@
|
||||||
|
|
||||||
-define(MAX_AUTO_SUBSCRIBE, 20).
|
-define(MAX_AUTO_SUBSCRIBE, 20).
|
||||||
|
|
||||||
-export([load/0, unload/0]). %
|
%
|
||||||
|
-export([load/0, unload/0]).
|
||||||
|
|
||||||
-export([ max_limit/0
|
-export([
|
||||||
, list/0
|
max_limit/0,
|
||||||
, update/1
|
list/0,
|
||||||
, post_config_update/5
|
update/1,
|
||||||
|
post_config_update/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% hook callback
|
%% hook callback
|
||||||
-export([on_client_connected/3]).
|
-export([on_client_connected/3]).
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
emqx_conf:add_handler([auto_subscribe, topics], ?MODULE),
|
ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE),
|
||||||
update_hook().
|
update_hook().
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
|
@ -56,7 +58,8 @@ post_config_update(_KeyPath, _Req, NewTopics, _OldConf, _AppEnvs) ->
|
||||||
|
|
||||||
on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
|
on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
|
||||||
case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of
|
case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of
|
||||||
[] -> ok;
|
[] ->
|
||||||
|
ok;
|
||||||
TopicTables ->
|
TopicTables ->
|
||||||
_ = self() ! {subscribe, TopicTables},
|
_ = self() ! {subscribe, TopicTables},
|
||||||
ok
|
ok
|
||||||
|
@ -79,9 +82,13 @@ format(Rule = #{topic := Topic}) when is_map(Rule) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
|
update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
|
||||||
case emqx_conf:update([auto_subscribe, topics],
|
case
|
||||||
|
emqx_conf:update(
|
||||||
|
[auto_subscribe, topics],
|
||||||
Topics,
|
Topics,
|
||||||
#{rawconf_with_defaults => true, override_to => cluster}) of
|
#{rawconf_with_defaults => true, override_to => cluster}
|
||||||
|
)
|
||||||
|
of
|
||||||
{ok, #{raw_config := NewTopics}} ->
|
{ok, #{raw_config := NewTopics}} ->
|
||||||
{ok, NewTopics};
|
{ok, NewTopics};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
|
|
@ -22,37 +22,50 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
||||||
|
|
||||||
-export([ start_link/0, on_delivery_completed/3, update_settings/1
|
-export([
|
||||||
, clear_history/0, init_tab/0, post_config_update/5
|
start_link/0,
|
||||||
|
on_delivery_completed/3,
|
||||||
|
update_settings/1,
|
||||||
|
clear_history/0,
|
||||||
|
init_tab/0,
|
||||||
|
post_config_update/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ init/1
|
-export([
|
||||||
, handle_call/3
|
init/1,
|
||||||
, handle_cast/2
|
handle_call/3,
|
||||||
, handle_info/2
|
handle_cast/2,
|
||||||
, terminate/2
|
handle_info/2,
|
||||||
, code_change/3
|
terminate/2,
|
||||||
|
code_change/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-compile(nowarn_unused_type).
|
-compile(nowarn_unused_type).
|
||||||
|
|
||||||
-type state() :: #{ enable := boolean()
|
-type state() :: #{
|
||||||
, last_tick_at := pos_integer()
|
enable := boolean(),
|
||||||
, expire_timer := undefined | reference()
|
last_tick_at := pos_integer(),
|
||||||
|
expire_timer := undefined | reference()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type message() :: #message{}.
|
-type message() :: #message{}.
|
||||||
|
|
||||||
-type stats_type() :: whole %% whole = internal + response
|
%% whole = internal + response
|
||||||
| internal %% timespan from message in to deliver
|
-type stats_type() ::
|
||||||
| response. %% timespan from delivery to client response
|
whole
|
||||||
|
%% timespan from message in to deliver
|
||||||
|
| internal
|
||||||
|
%% timespan from delivery to client response
|
||||||
|
| response.
|
||||||
|
|
||||||
-type stats_update_args() :: #{session_birth_time := pos_integer()}.
|
-type stats_update_args() :: #{session_birth_time := pos_integer()}.
|
||||||
|
|
||||||
-type stats_update_env() :: #{ threshold := non_neg_integer()
|
-type stats_update_env() :: #{
|
||||||
, stats_type := stats_type()
|
threshold := non_neg_integer(),
|
||||||
, max_size := pos_integer()}.
|
stats_type := stats_type(),
|
||||||
|
max_size := pos_integer()
|
||||||
|
}.
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
|
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
|
||||||
|
@ -73,33 +86,39 @@
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc Start the st_statistics
|
%% @doc Start the st_statistics
|
||||||
-spec(start_link() -> emqx_types:startlink_ret()).
|
-spec start_link() -> emqx_types:startlink_ret().
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
on_delivery_completed(#message{timestamp = Ts},
|
on_delivery_completed(
|
||||||
#{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime ->
|
#message{timestamp = Ts},
|
||||||
|
#{session_birth_time := BirthTime},
|
||||||
|
_Cfg
|
||||||
|
) when Ts =< BirthTime ->
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
on_delivery_completed(Msg, Env, Cfg) ->
|
on_delivery_completed(Msg, Env, Cfg) ->
|
||||||
on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
|
on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
|
||||||
|
|
||||||
on_delivery_completed(#message{topic = Topic} = Msg,
|
on_delivery_completed(
|
||||||
|
#message{topic = Topic} = Msg,
|
||||||
#{clientid := ClientId},
|
#{clientid := ClientId},
|
||||||
Now,
|
Now,
|
||||||
#{threshold := Threshold,
|
#{
|
||||||
|
threshold := Threshold,
|
||||||
stats_type := StatsType,
|
stats_type := StatsType,
|
||||||
max_size := MaxSize}) ->
|
max_size := MaxSize
|
||||||
|
}
|
||||||
|
) ->
|
||||||
TimeSpan = calc_timespan(StatsType, Msg, Now),
|
TimeSpan = calc_timespan(StatsType, Msg, Now),
|
||||||
case TimeSpan =< Threshold of
|
case TimeSpan =< Threshold of
|
||||||
true -> ok;
|
true ->
|
||||||
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
Id = ?ID(ClientId, Topic),
|
Id = ?ID(ClientId, Topic),
|
||||||
LastUpdateValue = find_last_update_value(Id),
|
LastUpdateValue = find_last_update_value(Id),
|
||||||
case TimeSpan =< LastUpdateValue of
|
case TimeSpan =< LastUpdateValue of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
_ ->
|
_ -> try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id)
|
||||||
try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id)
|
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -113,14 +132,22 @@ post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
|
||||||
gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT).
|
gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT).
|
||||||
|
|
||||||
init_tab() ->
|
init_tab() ->
|
||||||
safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table
|
safe_create_tab(?TOPK_TAB, [
|
||||||
, {keypos, #top_k.index}, {write_concurrency, true}
|
ordered_set,
|
||||||
, {read_concurrency, true}
|
public,
|
||||||
|
named_table,
|
||||||
|
{keypos, #top_k.index},
|
||||||
|
{write_concurrency, true},
|
||||||
|
{read_concurrency, true}
|
||||||
]),
|
]),
|
||||||
|
|
||||||
safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table
|
safe_create_tab(?INDEX_TAB, [
|
||||||
, {keypos, #index_tab.index}, {write_concurrency, true}
|
ordered_set,
|
||||||
, {read_concurrency, true}
|
public,
|
||||||
|
named_table,
|
||||||
|
{keypos, #index_tab.index},
|
||||||
|
{write_concurrency, true},
|
||||||
|
{read_concurrency, true}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -130,9 +157,10 @@ init_tab() ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
|
|
||||||
emqx_conf:add_handler([slow_subs], ?MODULE),
|
ok = emqx_conf:add_handler([slow_subs], ?MODULE),
|
||||||
|
|
||||||
InitState = #{enable => false,
|
InitState = #{
|
||||||
|
enable => false,
|
||||||
last_tick_at => 0,
|
last_tick_at => 0,
|
||||||
expire_timer => undefined
|
expire_timer => undefined
|
||||||
},
|
},
|
||||||
|
@ -143,11 +171,9 @@ init([]) ->
|
||||||
handle_call({update_settings, #{enable := Enable}}, _From, State) ->
|
handle_call({update_settings, #{enable := Enable}}, _From, State) ->
|
||||||
State2 = check_enable(Enable, State),
|
State2 = check_enable(Enable, State),
|
||||||
{reply, ok, State2};
|
{reply, ok, State2};
|
||||||
|
|
||||||
handle_call(clear_history, _, State) ->
|
handle_call(clear_history, _, State) ->
|
||||||
do_clear_history(),
|
do_clear_history(),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
@ -161,12 +187,12 @@ handle_info(expire_tick, State) ->
|
||||||
do_clear(Logs),
|
do_clear(Logs),
|
||||||
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
||||||
{noreply, State1};
|
{noreply, State1};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, State) ->
|
terminate(_Reason, State) ->
|
||||||
|
ok = emqx_conf:remove_handler([slow_subs]),
|
||||||
_ = unload(State),
|
_ = unload(State),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -180,24 +206,32 @@ expire_tick() ->
|
||||||
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
|
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
|
||||||
|
|
||||||
load(State) ->
|
load(State) ->
|
||||||
#{top_k_num := MaxSizeT,
|
#{
|
||||||
|
top_k_num := MaxSizeT,
|
||||||
stats_type := StatsType,
|
stats_type := StatsType,
|
||||||
threshold := Threshold} = emqx:get_config([slow_subs]),
|
threshold := Threshold
|
||||||
|
} = emqx:get_config([slow_subs]),
|
||||||
MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
|
MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
|
||||||
_ = emqx:hook('delivery.completed',
|
_ = emqx:hook(
|
||||||
{?MODULE, on_delivery_completed,
|
'delivery.completed',
|
||||||
[#{max_size => MaxSize,
|
{?MODULE, on_delivery_completed, [
|
||||||
|
#{
|
||||||
|
max_size => MaxSize,
|
||||||
stats_type => StatsType,
|
stats_type => StatsType,
|
||||||
threshold => Threshold
|
threshold => Threshold
|
||||||
}]}),
|
}
|
||||||
|
]}
|
||||||
|
),
|
||||||
|
|
||||||
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
||||||
State1#{enable := true, last_tick_at => ?NOW}.
|
State1#{enable := true, last_tick_at => ?NOW}.
|
||||||
|
|
||||||
unload(#{expire_timer := ExpireTimer} = State) ->
|
unload(#{expire_timer := ExpireTimer} = State) ->
|
||||||
emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}),
|
emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}),
|
||||||
State#{enable := false,
|
State#{
|
||||||
expire_timer := cancel_timer(ExpireTimer)}.
|
enable := false,
|
||||||
|
expire_timer := cancel_timer(ExpireTimer)
|
||||||
|
}.
|
||||||
|
|
||||||
do_clear(Logs) ->
|
do_clear(Logs) ->
|
||||||
Now = ?NOW,
|
Now = ?NOW,
|
||||||
|
@ -215,11 +249,9 @@ do_clear(Logs) ->
|
||||||
-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer().
|
-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer().
|
||||||
calc_timespan(whole, #message{timestamp = Ts}, Now) ->
|
calc_timespan(whole, #message{timestamp = Ts}, Now) ->
|
||||||
Now - Ts;
|
Now - Ts;
|
||||||
|
|
||||||
calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) ->
|
calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) ->
|
||||||
End = emqx_message:get_header(deliver_begin_at, Msg, Now),
|
End = emqx_message:get_header(deliver_begin_at, Msg, Now),
|
||||||
End - Ts;
|
End - Ts;
|
||||||
|
|
||||||
calc_timespan(response, Msg, Now) ->
|
calc_timespan(response, Msg, Now) ->
|
||||||
Begin = emqx_message:get_header(deliver_begin_at, Msg, Now),
|
Begin = emqx_message:get_header(deliver_begin_at, Msg, Now),
|
||||||
Now - Begin.
|
Now - Begin.
|
||||||
|
@ -248,7 +280,8 @@ try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) ->
|
||||||
update_topk(Now, LastUpdateValue, TimeSpan, Id);
|
update_topk(Now, LastUpdateValue, TimeSpan, Id);
|
||||||
?TOPK_INDEX(Min, MinId) ->
|
?TOPK_INDEX(Min, MinId) ->
|
||||||
case TimeSpan =< Min of
|
case TimeSpan =< Min of
|
||||||
true -> false;
|
true ->
|
||||||
|
false;
|
||||||
_ ->
|
_ ->
|
||||||
update_topk(Now, LastUpdateValue, TimeSpan, Id),
|
update_topk(Now, LastUpdateValue, TimeSpan, Id),
|
||||||
delete_with_index(Min, MinId)
|
delete_with_index(Min, MinId)
|
||||||
|
@ -256,7 +289,6 @@ try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec find_last_update_value(id()) -> non_neg_integer().
|
-spec find_last_update_value(id()) -> non_neg_integer().
|
||||||
find_last_update_value(Id) ->
|
find_last_update_value(Id) ->
|
||||||
case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of
|
case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of
|
||||||
|
@ -269,7 +301,8 @@ find_last_update_value(Id) ->
|
||||||
-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true.
|
-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true.
|
||||||
update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
|
update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
|
||||||
%% update record
|
%% update record
|
||||||
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id),
|
ets:insert(?TOPK_TAB, #top_k{
|
||||||
|
index = ?TOPK_INDEX(TimeSpan, Id),
|
||||||
last_update_time = Now,
|
last_update_time = Now,
|
||||||
extra = []
|
extra = []
|
||||||
}),
|
}),
|
||||||
|
@ -283,7 +316,6 @@ update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
|
||||||
-spec delete_with_index(non_neg_integer(), id()) -> true.
|
-spec delete_with_index(non_neg_integer(), id()) -> true.
|
||||||
delete_with_index(0, _) ->
|
delete_with_index(0, _) ->
|
||||||
true;
|
true;
|
||||||
|
|
||||||
delete_with_index(TimeSpan, Id) ->
|
delete_with_index(TimeSpan, Id) ->
|
||||||
ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)),
|
ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)),
|
||||||
ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)).
|
ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)).
|
||||||
|
|
Loading…
Reference in New Issue