chore: apply review suggestions

This commit is contained in:
JianBo He 2022-09-13 10:28:53 +08:00
parent 44f8108228
commit 522f650096
7 changed files with 48 additions and 44 deletions

View File

@ -43,6 +43,8 @@
{'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
]). ]).
-define(SERVER_FORCE_SHUTDOWN_TIMEOUT, 5000).
-endif. -endif.
-define(CMD_MOVE_FRONT, front). -define(CMD_MOVE_FRONT, front).

View File

@ -187,7 +187,8 @@ unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
do_deinit(Name, ReqOpts) -> do_deinit(Name, ReqOpts) ->
%% Override the request timeout to deinit grpc server to %% Override the request timeout to deinit grpc server to
%% avoid emqx_exhook_mgr force killed by upper supervisor %% avoid emqx_exhook_mgr force killed by upper supervisor
_ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 5000}), NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT},
_ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts),
ok. ok.
do_init(ChannName, ReqOpts) -> do_init(ChannName, ReqOpts) ->

View File

@ -16,6 +16,8 @@
-module(emqx_exhook_sup). -module(emqx_exhook_sup).
-include("emqx_exhook.hrl").
-behaviour(supervisor). -behaviour(supervisor).
-export([ -export([
@ -28,12 +30,13 @@
stop_grpc_client_channel/1 stop_grpc_client_channel/1
]). ]).
-define(CHILD(Mod, Type, Args), #{ -define(DEFAULT_TIMEOUT, 5000).
-define(CHILD(Mod, Type, Args, Timeout), #{
id => Mod, id => Mod,
start => {Mod, start_link, Args}, start => {Mod, start_link, Args},
type => Type, type => Type,
%% long timeout for emqx_exhook_mgr shutdown => Timeout
shutdown => 15000
}). }).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -46,7 +49,7 @@ start_link() ->
init([]) -> init([]) ->
_ = emqx_exhook_metrics:init(), _ = emqx_exhook_metrics:init(),
_ = emqx_exhook_mgr:init_ref_counter_table(), _ = emqx_exhook_mgr:init_ref_counter_table(),
Mngr = ?CHILD(emqx_exhook_mgr, worker, []), Mngr = ?CHILD(emqx_exhook_mgr, worker, [], force_shutdown_timeout()),
{ok, {{one_for_one, 10, 100}, [Mngr]}}. {ok, {{one_for_one, 10, 100}, [Mngr]}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -71,3 +74,9 @@ stop_grpc_client_channel(Name) ->
_:_:_ -> _:_:_ ->
ok ok
end. end.
%% Calculate the maximum timeout, which will help to shutdown the
%% emqx_exhook_mgr process correctly.
force_shutdown_timeout() ->
Factor = max(3, length(emqx:get_config([exhook, servers])) + 1),
Factor * ?SERVER_FORCE_SHUTDOWN_TIMEOUT.

View File

@ -81,8 +81,14 @@ stop() ->
stop(Name) -> stop(Name) ->
grpc:stop_server(Name), grpc:stop_server(Name),
case whereis(to_atom_name(Name)) of case whereis(to_atom_name(Name)) of
undefined -> ok; undefined ->
Pid -> Pid ! stop ok;
Pid ->
Ref = erlang:monitor(process, Pid),
Pid ! stop,
receive
{'DOWN', Ref, process, Pid, _Reason} -> ok
end
end. end.
take() -> take() ->

View File

@ -244,10 +244,10 @@ esockd_send(Data, #state{
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
esockd_transport:async_send(Sock, Data). esockd_transport:async_send(Sock, Data).
keepalive_stats(recv_oct) -> keepalive_stats(recv) ->
emqx_pd:get_counter(incoming_bytes); emqx_pd:get_counter(recv_pkt);
keepalive_stats(send_oct) -> keepalive_stats(send) ->
emqx_pd:get_counter(outgoing_bytes). emqx_pd:get_counter(send_pkt).
is_datadram_socket({esockd_transport, _}) -> false; is_datadram_socket({esockd_transport, _}) -> false;
is_datadram_socket({udp, _, _}) -> true. is_datadram_socket({udp, _, _}) -> true.
@ -656,16 +656,16 @@ handle_timeout(
Keepalive == keepalive; Keepalive == keepalive;
Keepalive == keepalive_send Keepalive == keepalive_send
-> ->
Stat = StatVal =
case Keepalive of case Keepalive of
keepalive -> recv_oct; keepalive -> keepalive_stats(recv);
keepalive_send -> send_oct keepalive_send -> keepalive_stats(send)
end, end,
case ChannMod:info(conn_state, Channel) of case ChannMod:info(conn_state, Channel) of
disconnected -> disconnected ->
{ok, State}; {ok, State};
_ -> _ ->
handle_timeout(TRef, {Keepalive, keepalive_stats(Stat)}, State) handle_timeout(TRef, {Keepalive, StatVal}, State)
end; end;
handle_timeout( handle_timeout(
_TRef, _TRef,

View File

@ -297,7 +297,7 @@ handle_timeout(
{ok, reset_timer(alive_timer, NChannel)}; {ok, reset_timer(alive_timer, NChannel)};
{error, timeout} -> {error, timeout} ->
Req = #{type => 'KEEPALIVE'}, Req = #{type => 'KEEPALIVE'},
NChannel = clean_timer(alive_timer, Channel), NChannel = remove_timer_ref(alive_timer, Channel),
%% close connection if keepalive timeout %% close connection if keepalive timeout
Replies = [{event, disconnected}, {close, keepalive_timeout}], Replies = [{event, disconnected}, {close, keepalive_timeout}],
{ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
@ -665,7 +665,7 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) ->
ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
Channel; Channel;
ensure_keepalive_timer(Interval, Channel) -> ensure_keepalive_timer(Interval, Channel) ->
StatVal = emqx_gateway_conn:keepalive_stats(recv_oct), StatVal = emqx_gateway_conn:keepalive_stats(recv),
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
@ -684,14 +684,14 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = Timers#{Name => TRef}}. Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) -> reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)). ensure_timer(Name, remove_timer_ref(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
cancel_timer(Name, Channel = #channel{timers = Timers}) -> cancel_timer(Name, Channel = #channel{timers = Timers}) ->
emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
clean_timer(Name, Channel). remove_timer_ref(Name, Channel).
remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout; IdleTimeout;
@ -746,10 +746,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)), NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}. NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}.
default_conninfo(ConnInfo = #{peername := {PeerHost, PeerPort}}) -> default_conninfo(ConnInfo) ->
ConnInfo#{ ConnInfo#{
clean_start => true, clean_start => true,
clientid => anonymous_clientid(PeerHost, PeerPort), clientid => anonymous_clientid(),
username => undefined, username => undefined,
conn_props => #{}, conn_props => #{},
connected => true, connected => true,
@ -790,14 +790,5 @@ proto_name_to_protocol(<<>>) ->
proto_name_to_protocol(ProtoName) when is_binary(ProtoName) -> proto_name_to_protocol(ProtoName) when is_binary(ProtoName) ->
binary_to_atom(ProtoName). binary_to_atom(ProtoName).
anonymous_clientid(PeerHost, PeerPort) -> anonymous_clientid() ->
iolist_to_binary( iolist_to_binary(["exproto-", emqx_misc:gen_id()]).
[
"exproto-anonymous-",
inet:ntoa(PeerHost),
"-",
integer_to_list(PeerPort),
"-",
emqx_misc:gen_id()
]
).

View File

@ -56,6 +56,7 @@ start_link(Pool, Id) ->
[] []
). ).
-spec async_call(atom(), map(), map()) -> ok.
async_call( async_call(
FunName, FunName,
Req = #{conn := Conn}, Req = #{conn := Conn},
@ -63,17 +64,11 @@ async_call(
) -> ) ->
case pick(PoolName, Conn) of case pick(PoolName, Conn) of
false -> false ->
?SLOG( reply(self(), FunName, {error, no_available_grpc_client});
error,
#{
msg => "no_available_grpc_client",
function => FunName,
request => Req
}
);
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
cast(Pid, {rpc, FunName, Req, Options, self()}) cast(Pid, {rpc, FunName, Req, Options, self()})
end. end,
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% cast, pick %% cast, pick