Merge pull request #7633 from zhongwencool/ensure-add-handler-is-ok

fix: emqx_auto_subscribe emqx_slow_subs don't restart when ekka:join.
This commit is contained in:
zhongwencool 2022-04-16 16:09:24 +08:00 committed by GitHub
commit 20b34364d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 168 additions and 121 deletions

View File

@ -1,16 +1,17 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_auto_subscribe, {application, emqx_auto_subscribe, [
[{description, "An OTP application"}, {description, "An OTP application"},
{vsn, "0.1.0"}, {vsn, "0.1.0"},
{registered, []}, {registered, []},
{mod, {emqx_auto_subscribe_app, []}}, {mod, {emqx_auto_subscribe_app, []}},
{applications, {applications, [
[kernel, kernel,
stdlib stdlib,
]}, emqx
{env,[]}, ]},
{modules, []}, {env, []},
{modules, []},
{licenses, ["Apache 2.0"]}, {licenses, ["Apache 2.0"]},
{links, []} {links, []}
]}. ]}.

View File

@ -20,19 +20,21 @@
-define(MAX_AUTO_SUBSCRIBE, 20). -define(MAX_AUTO_SUBSCRIBE, 20).
-export([load/0, unload/0]). % %
-export([load/0, unload/0]).
-export([ max_limit/0 -export([
, list/0 max_limit/0,
, update/1 list/0,
, post_config_update/5 update/1,
]). post_config_update/5
]).
%% hook callback %% hook callback
-export([on_client_connected/3]). -export([on_client_connected/3]).
load() -> load() ->
emqx_conf:add_handler([auto_subscribe, topics], ?MODULE), ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE),
update_hook(). update_hook().
unload() -> unload() ->
@ -56,7 +58,8 @@ post_config_update(_KeyPath, _Req, NewTopics, _OldConf, _AppEnvs) ->
on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) -> on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of
[] -> ok; [] ->
ok;
TopicTables -> TopicTables ->
_ = self() ! {subscribe, TopicTables}, _ = self() ! {subscribe, TopicTables},
ok ok
@ -71,17 +74,21 @@ format(Rules) when is_list(Rules) ->
[format(Rule) || Rule <- Rules]; [format(Rule) || Rule <- Rules];
format(Rule = #{topic := Topic}) when is_map(Rule) -> format(Rule = #{topic := Topic}) when is_map(Rule) ->
#{ #{
topic => Topic, topic => Topic,
qos => maps:get(qos, Rule, 0), qos => maps:get(qos, Rule, 0),
rh => maps:get(rh, Rule, 0), rh => maps:get(rh, Rule, 0),
rap => maps:get(rap, Rule, 0), rap => maps:get(rap, Rule, 0),
nl => maps:get(nl, Rule, 0) nl => maps:get(nl, Rule, 0)
}. }.
update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
case emqx_conf:update([auto_subscribe, topics], case
Topics, emqx_conf:update(
#{rawconf_with_defaults => true, override_to => cluster}) of [auto_subscribe, topics],
Topics,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewTopics}} -> {ok, #{raw_config := NewTopics}} ->
{ok, NewTopics}; {ok, NewTopics};
{error, Reason} -> {error, Reason} ->

View File

@ -1,12 +1,13 @@
{application, emqx_slow_subs, {application, emqx_slow_subs, [
[{description, "EMQX Slow Subscribers Statistics"}, {description, "EMQX Slow Subscribers Statistics"},
{vsn, "1.0.0"}, % strict semver, bump manually! % strict semver, bump manually!
{modules, []}, {vsn, "1.0.0"},
{registered, [emqx_slow_subs_sup]}, {modules, []},
{applications, [kernel,stdlib]}, {registered, [emqx_slow_subs_sup]},
{mod, {emqx_slow_subs_app,[]}}, {applications, [kernel, stdlib, emqx]},
{env, []}, {mod, {emqx_slow_subs_app, []}},
{licenses, ["Apache-2.0"]}, {env, []},
{maintainers, ["EMQX Team <contact@emqx.io>"]}, {licenses, ["Apache-2.0"]},
{links, []} {maintainers, ["EMQX Team <contact@emqx.io>"]},
]}. {links, []}
]}.

View File

@ -22,37 +22,50 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
-export([ start_link/0, on_delivery_completed/3, update_settings/1 -export([
, clear_history/0, init_tab/0, post_config_update/5 start_link/0,
]). on_delivery_completed/3,
update_settings/1,
clear_history/0,
init_tab/0,
post_config_update/5
]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([
, handle_call/3 init/1,
, handle_cast/2 handle_call/3,
, handle_info/2 handle_cast/2,
, terminate/2 handle_info/2,
, code_change/3 terminate/2,
]). code_change/3
]).
-compile(nowarn_unused_type). -compile(nowarn_unused_type).
-type state() :: #{ enable := boolean() -type state() :: #{
, last_tick_at := pos_integer() enable := boolean(),
, expire_timer := undefined | reference() last_tick_at := pos_integer(),
}. expire_timer := undefined | reference()
}.
-type message() :: #message{}. -type message() :: #message{}.
-type stats_type() :: whole %% whole = internal + response %% whole = internal + response
| internal %% timespan from message in to deliver -type stats_type() ::
| response. %% timespan from delivery to client response whole
%% timespan from message in to deliver
| internal
%% timespan from delivery to client response
| response.
-type stats_update_args() :: #{session_birth_time := pos_integer()}. -type stats_update_args() :: #{session_birth_time := pos_integer()}.
-type stats_update_env() :: #{ threshold := non_neg_integer() -type stats_update_env() :: #{
, stats_type := stats_type() threshold := non_neg_integer(),
, max_size := pos_integer()}. stats_type := stats_type(),
max_size := pos_integer()
}.
-ifdef(TEST). -ifdef(TEST).
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
@ -73,33 +86,39 @@
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Start the st_statistics %% @doc Start the st_statistics
-spec(start_link() -> emqx_types:startlink_ret()). -spec start_link() -> emqx_types:startlink_ret().
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
on_delivery_completed(#message{timestamp = Ts}, on_delivery_completed(
#{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> #message{timestamp = Ts},
#{session_birth_time := BirthTime},
_Cfg
) when Ts =< BirthTime ->
ok; ok;
on_delivery_completed(Msg, Env, Cfg) -> on_delivery_completed(Msg, Env, Cfg) ->
on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg). on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
on_delivery_completed(#message{topic = Topic} = Msg, on_delivery_completed(
#{clientid := ClientId}, #message{topic = Topic} = Msg,
Now, #{clientid := ClientId},
#{threshold := Threshold, Now,
stats_type := StatsType, #{
max_size := MaxSize}) -> threshold := Threshold,
stats_type := StatsType,
max_size := MaxSize
}
) ->
TimeSpan = calc_timespan(StatsType, Msg, Now), TimeSpan = calc_timespan(StatsType, Msg, Now),
case TimeSpan =< Threshold of case TimeSpan =< Threshold of
true -> ok; true ->
ok;
_ -> _ ->
Id = ?ID(ClientId, Topic), Id = ?ID(ClientId, Topic),
LastUpdateValue = find_last_update_value(Id), LastUpdateValue = find_last_update_value(Id),
case TimeSpan =< LastUpdateValue of case TimeSpan =< LastUpdateValue of
true -> ok; true -> ok;
_ -> _ -> try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id)
try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id)
end end
end. end.
@ -113,15 +132,23 @@ post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT). gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT).
init_tab() -> init_tab() ->
safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table safe_create_tab(?TOPK_TAB, [
, {keypos, #top_k.index}, {write_concurrency, true} ordered_set,
, {read_concurrency, true} public,
]), named_table,
{keypos, #top_k.index},
{write_concurrency, true},
{read_concurrency, true}
]),
safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table safe_create_tab(?INDEX_TAB, [
, {keypos, #index_tab.index}, {write_concurrency, true} ordered_set,
, {read_concurrency, true} public,
]). named_table,
{keypos, #index_tab.index},
{write_concurrency, true},
{read_concurrency, true}
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -130,12 +157,13 @@ init_tab() ->
init([]) -> init([]) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
emqx_conf:add_handler([slow_subs], ?MODULE), ok = emqx_conf:add_handler([slow_subs], ?MODULE),
InitState = #{enable => false, InitState = #{
last_tick_at => 0, enable => false,
expire_timer => undefined last_tick_at => 0,
}, expire_timer => undefined
},
Enable = emqx:get_config([slow_subs, enable]), Enable = emqx:get_config([slow_subs, enable]),
{ok, check_enable(Enable, InitState)}. {ok, check_enable(Enable, InitState)}.
@ -143,11 +171,9 @@ init([]) ->
handle_call({update_settings, #{enable := Enable}}, _From, State) -> handle_call({update_settings, #{enable := Enable}}, _From, State) ->
State2 = check_enable(Enable, State), State2 = check_enable(Enable, State),
{reply, ok, State2}; {reply, ok, State2};
handle_call(clear_history, _, State) -> handle_call(clear_history, _, State) ->
do_clear_history(), do_clear_history(),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -161,12 +187,12 @@ handle_info(expire_tick, State) ->
do_clear(Logs), do_clear(Logs),
State1 = start_timer(expire_timer, fun expire_tick/0, State), State1 = start_timer(expire_timer, fun expire_tick/0, State),
{noreply, State1}; {noreply, State1};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}. {noreply, State}.
terminate(_Reason, State) -> terminate(_Reason, State) ->
ok = emqx_conf:remove_handler([slow_subs]),
_ = unload(State), _ = unload(State),
ok. ok.
@ -180,46 +206,52 @@ expire_tick() ->
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
load(State) -> load(State) ->
#{top_k_num := MaxSizeT, #{
stats_type := StatsType, top_k_num := MaxSizeT,
threshold := Threshold} = emqx:get_config([slow_subs]), stats_type := StatsType,
threshold := Threshold
} = emqx:get_config([slow_subs]),
MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE), MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
_ = emqx:hook('delivery.completed', _ = emqx:hook(
{?MODULE, on_delivery_completed, 'delivery.completed',
[#{max_size => MaxSize, {?MODULE, on_delivery_completed, [
stats_type => StatsType, #{
threshold => Threshold max_size => MaxSize,
}]}), stats_type => StatsType,
threshold => Threshold
}
]}
),
State1 = start_timer(expire_timer, fun expire_tick/0, State), State1 = start_timer(expire_timer, fun expire_tick/0, State),
State1#{enable := true, last_tick_at => ?NOW}. State1#{enable := true, last_tick_at => ?NOW}.
unload(#{expire_timer := ExpireTimer} = State) -> unload(#{expire_timer := ExpireTimer} = State) ->
emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}), emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}),
State#{enable := false, State#{
expire_timer := cancel_timer(ExpireTimer)}. enable := false,
expire_timer := cancel_timer(ExpireTimer)
}.
do_clear(Logs) -> do_clear(Logs) ->
Now = ?NOW, Now = ?NOW,
Interval = emqx:get_config([slow_subs, expire_interval]), Interval = emqx:get_config([slow_subs, expire_interval]),
Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) -> Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) ->
case Now - Ts >= Interval of case Now - Ts >= Interval of
true -> true ->
delete_with_index(TimeSpan, Id); delete_with_index(TimeSpan, Id);
_ -> _ ->
true true
end end
end, end,
lists:foreach(Each, Logs). lists:foreach(Each, Logs).
-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer(). -spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer().
calc_timespan(whole, #message{timestamp = Ts}, Now) -> calc_timespan(whole, #message{timestamp = Ts}, Now) ->
Now - Ts; Now - Ts;
calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) -> calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) ->
End = emqx_message:get_header(deliver_begin_at, Msg, Now), End = emqx_message:get_header(deliver_begin_at, Msg, Now),
End - Ts; End - Ts;
calc_timespan(response, Msg, Now) -> calc_timespan(response, Msg, Now) ->
Begin = emqx_message:get_header(deliver_begin_at, Msg, Now), Begin = emqx_message:get_header(deliver_begin_at, Msg, Now),
Now - Begin. Now - Begin.
@ -248,7 +280,8 @@ try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) ->
update_topk(Now, LastUpdateValue, TimeSpan, Id); update_topk(Now, LastUpdateValue, TimeSpan, Id);
?TOPK_INDEX(Min, MinId) -> ?TOPK_INDEX(Min, MinId) ->
case TimeSpan =< Min of case TimeSpan =< Min of
true -> false; true ->
false;
_ -> _ ->
update_topk(Now, LastUpdateValue, TimeSpan, Id), update_topk(Now, LastUpdateValue, TimeSpan, Id),
delete_with_index(Min, MinId) delete_with_index(Min, MinId)
@ -256,10 +289,9 @@ try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) ->
end end
end. end.
-spec find_last_update_value(id()) -> non_neg_integer(). -spec find_last_update_value(id()) -> non_neg_integer().
find_last_update_value(Id) -> find_last_update_value(Id) ->
case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of
?INDEX(LastUpdateValue, Id) -> ?INDEX(LastUpdateValue, Id) ->
LastUpdateValue; LastUpdateValue;
_ -> _ ->
@ -269,10 +301,11 @@ find_last_update_value(Id) ->
-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true. -spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true.
update_topk(Now, LastUpdateValue, TimeSpan, Id) -> update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
%% update record %% update record
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id), ets:insert(?TOPK_TAB, #top_k{
last_update_time = Now, index = ?TOPK_INDEX(TimeSpan, Id),
extra = [] last_update_time = Now,
}), extra = []
}),
%% update index %% update index
ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}), ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}),
@ -283,7 +316,6 @@ update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
-spec delete_with_index(non_neg_integer(), id()) -> true. -spec delete_with_index(non_neg_integer(), id()) -> true.
delete_with_index(0, _) -> delete_with_index(0, _) ->
true; true;
delete_with_index(TimeSpan, Id) -> delete_with_index(TimeSpan, Id) ->
ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)), ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)),
ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)). ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)).

View File

@ -145,6 +145,8 @@ defmodule EMQXUmbrella.MixProject do
:emqx_statsd, :emqx_statsd,
:emqx_retainer, :emqx_retainer,
:emqx_prometheus, :emqx_prometheus,
:emqx_auto_subscribe,
:emqx_slow_subs,
:emqx_plugins :emqx_plugins
], ],
steps: steps, steps: steps,
@ -230,7 +232,9 @@ defmodule EMQXUmbrella.MixProject do
:emqx_exhook, :emqx_exhook,
:emqx_authn, :emqx_authn,
:emqx_authz, :emqx_authz,
:emqx_plugin :emqx_auto_subscribe,
:emqx_slow_subs,
:emqx_plugins
] ]
end end

View File

@ -385,7 +385,9 @@ emqx_machine_boot_apps(ce) ->
, emqx_exhook , emqx_exhook
, emqx_authn , emqx_authn
, emqx_authz , emqx_authz
, emqx_plugin , emqx_slow_subs
, emqx_auto_subscribe
, emqx_plugins
]; ];
emqx_machine_boot_apps(ee) -> emqx_machine_boot_apps(ee) ->