Merge pull request #8725 from HJianBo/fix-exhook-exproto-bugs
fix exhook exproto bugs
This commit is contained in:
commit
ed99232a8d
|
@ -18,6 +18,8 @@ File format:
|
||||||
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
|
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
|
||||||
- Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597)
|
- Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597)
|
||||||
- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737)
|
- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737)
|
||||||
|
- Close ExProto client process immediately if it's keepalive timeouted. [#8725](https://github.com/emqx/emqx/pull/8725)
|
||||||
|
- Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8725](https://github.com/emqx/emqx/pull/8725)
|
||||||
- Improved jwt authentication module initialization process.[#8736](https://github.com/emqx/emqx/pull/8736)
|
- Improved jwt authentication module initialization process.[#8736](https://github.com/emqx/emqx/pull/8736)
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
@ -28,8 +30,10 @@ File format:
|
||||||
The `foo` variable is a null value, so `clientid != foo` should be evaluated as true.
|
The `foo` variable is a null value, so `clientid != foo` should be evaluated as true.
|
||||||
- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655)
|
- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655)
|
||||||
- Add an idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628)
|
- Add an idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628)
|
||||||
|
- Fix ExHook can't be un-hooked if the grpc service stop first. [#8725](//github.com/emqx/emqx/pull/8725)
|
||||||
- Fix GET `/listeners/` crashes when listener is not ready. [#8752](https://github.com/emqx/emqx/pull/8752)
|
- Fix GET `/listeners/` crashes when listener is not ready. [#8752](https://github.com/emqx/emqx/pull/8752)
|
||||||
|
|
||||||
|
|
||||||
## v4.3.18
|
## v4.3.18
|
||||||
|
|
||||||
### Enhancements
|
### Enhancements
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_exhook,
|
{application, emqx_exhook,
|
||||||
[{description, "EMQ X Extension for Hook"},
|
[{description, "EMQ X Extension for Hook"},
|
||||||
{vsn, "4.3.6"},
|
{vsn, "4.3.7"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_exhook_app, []}},
|
{mod, {emqx_exhook_app, []}},
|
||||||
|
|
|
@ -1,37 +1,30 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[{<<"4\\.3\\.[5-6]">>,
|
||||||
{"4.3.5", [
|
[{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
{"4.3.4",
|
||||||
{"4.3.4", [
|
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_sup, brutal_purge, soft_purge, []},
|
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []},
|
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_handler, brutal_purge, soft_purge, []},
|
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
|
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook, brutal_purge, soft_purge, []},
|
{update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]},
|
||||||
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
|
{<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]},
|
||||||
]},
|
{<<".*">>,[]}],
|
||||||
{<<"4\\.3\\.[0-3]">>, [
|
[{<<"4\\.3\\.[5-6]">>,
|
||||||
{restart_application, emqx_exhook}
|
[{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>, []}
|
{"4.3.5",
|
||||||
],
|
[{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]},
|
||||||
[
|
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5", [
|
{"4.3.4",
|
||||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||||
{"4.3.4", [
|
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_sup, brutal_purge, soft_purge, []},
|
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []},
|
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exhook_handler, brutal_purge, soft_purge, []},
|
{update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]},
|
||||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
|
{<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]},
|
||||||
{load_module, emqx_exhook, brutal_purge, soft_purge, []},
|
{<<".*">>,[]}]}.
|
||||||
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
|
|
||||||
]},
|
|
||||||
{<<"4\\.3\\.[0-3]">>, [
|
|
||||||
{restart_application, emqx_exhook}
|
|
||||||
]},
|
|
||||||
{<<".*">>, []}
|
|
||||||
]
|
|
||||||
}.
|
|
||||||
|
|
|
@ -192,6 +192,17 @@ handle_info({timeout, _Ref, {reload, Name}}, State) ->
|
||||||
{noreply, NState};
|
{noreply, NState};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
{noreply, NState};
|
{noreply, NState};
|
||||||
|
{error, {already_started, Pid}} ->
|
||||||
|
?LOG(warning, "Server ~s already started on ~p, try to restart it", [Name, Pid]),
|
||||||
|
case server(Name) of
|
||||||
|
undefined ->
|
||||||
|
%% force close grpc client pool
|
||||||
|
grpc_client_sup:stop_channel_pool(Name);
|
||||||
|
ServerState ->
|
||||||
|
emqx_exhook_server:unload(ServerState)
|
||||||
|
end,
|
||||||
|
%% try again immediately
|
||||||
|
handle_info({timeout, _Ref, {reload, Name}}, State);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Failed to reload exhook callback server \"~s\", "
|
?LOG(warning, "Failed to reload exhook callback server \"~s\", "
|
||||||
"Reason: ~0p", [Name, Reason]),
|
"Reason: ~0p", [Name, Reason]),
|
||||||
|
@ -202,11 +213,11 @@ handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, State = #state{running = Running}) ->
|
terminate(_Reason, State = #state{running = Running}) ->
|
||||||
|
_ = unload_exhooks(),
|
||||||
_ = maps:fold(fun(Name, _, AccIn) ->
|
_ = maps:fold(fun(Name, _, AccIn) ->
|
||||||
{ok, NAccIn} = do_unload_server(Name, AccIn),
|
{ok, NAccIn} = do_unload_server(Name, AccIn),
|
||||||
NAccIn
|
NAccIn
|
||||||
end, State, Running),
|
end, State, Running),
|
||||||
_ = unload_exhooks(),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% in the emqx_exhook:v4.3.5, we have added one new field in the state last:
|
%% in the emqx_exhook:v4.3.5, we have added one new field in the state last:
|
||||||
|
@ -318,7 +329,7 @@ get_request_failed_action() ->
|
||||||
|
|
||||||
save(Name, ServerState) ->
|
save(Name, ServerState) ->
|
||||||
Saved = persistent_term:get(?APP, []),
|
Saved = persistent_term:get(?APP, []),
|
||||||
persistent_term:put(?APP, lists:reverse([Name | Saved])),
|
persistent_term:put(?APP, lists:usort(lists:reverse([Name | Saved]))),
|
||||||
persistent_term:put({?APP, Name}, ServerState).
|
persistent_term:put({?APP, Name}, ServerState).
|
||||||
|
|
||||||
unsave(Name) ->
|
unsave(Name) ->
|
||||||
|
|
|
@ -146,13 +146,15 @@ format_http_uri(Scheme, Host0, Port) ->
|
||||||
|
|
||||||
-spec unload(server()) -> ok.
|
-spec unload(server()) -> ok.
|
||||||
unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) ->
|
unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) ->
|
||||||
_ = do_deinit(Name, ReqOpts),
|
|
||||||
_ = may_unload_hooks(HookSpecs),
|
_ = may_unload_hooks(HookSpecs),
|
||||||
|
_ = do_deinit(Name, ReqOpts),
|
||||||
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
|
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_deinit(Name, ReqOpts) ->
|
do_deinit(Name, ReqOpts) ->
|
||||||
_ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts),
|
%% Using shorter timeout to deinit grpc server to avoid emqx_exhook_mngr
|
||||||
|
%% force killed by upper supervisor
|
||||||
|
_ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 3000}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_init(ChannName, ReqOpts) ->
|
do_init(ChannName, ReqOpts) ->
|
||||||
|
|
|
@ -2,9 +2,11 @@
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.9",
|
[{"4.3.9",
|
||||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[2-8]">>,
|
{<<"4\\.3\\.[2-8]">>,
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[0-1]">>,
|
{<<"4\\.3\\.[0-1]">>,
|
||||||
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
||||||
|
@ -13,9 +15,11 @@
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.9",
|
[{"4.3.9",
|
||||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[2-8]">>,
|
{<<"4\\.3\\.[2-8]">>,
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[0-1]">>,
|
{<<"4\\.3\\.[0-1]">>,
|
||||||
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -151,7 +151,7 @@ init(ConnInfo = #{socktype := Socktype,
|
||||||
peercert := Peercert}, Options) ->
|
peercert := Peercert}, Options) ->
|
||||||
GRpcChann = proplists:get_value(handler, Options),
|
GRpcChann = proplists:get_value(handler, Options),
|
||||||
NConnInfo = default_conninfo(ConnInfo),
|
NConnInfo = default_conninfo(ConnInfo),
|
||||||
ClientInfo = default_clientinfo(ConnInfo),
|
ClientInfo = default_clientinfo(NConnInfo),
|
||||||
|
|
||||||
IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
|
IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
|
||||||
|
|
||||||
|
@ -275,7 +275,9 @@ handle_timeout(_TRef, {keepalive, StatVal},
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
Req = #{type => 'KEEPALIVE'},
|
Req = #{type => 'KEEPALIVE'},
|
||||||
NChannel = clean_timer(alive_timer, Channel),
|
NChannel = clean_timer(alive_timer, Channel),
|
||||||
{ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
|
%% close connection if keepalive timeout
|
||||||
|
Replies = [{event, disconnected}, {close, normal}],
|
||||||
|
{ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
||||||
|
@ -381,7 +383,7 @@ handle_call({publish, Topic, Qos, Payload},
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call(kick, Channel) ->
|
handle_call(kick, Channel) ->
|
||||||
{shutdown, kicked, ok, Channel};
|
{reply, ok, [{event, disconnected}, {close, kicked}], Channel};
|
||||||
|
|
||||||
handle_call(discard, Channel) ->
|
handle_call(discard, Channel) ->
|
||||||
{shutdown, discarded, ok, Channel};
|
{shutdown, discarded, ok, Channel};
|
||||||
|
@ -633,9 +635,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
|
||||||
NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
|
NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
|
||||||
NClientInfo#{protocol => ProtoName}.
|
NClientInfo#{protocol => ProtoName}.
|
||||||
|
|
||||||
default_conninfo(ConnInfo) ->
|
default_conninfo(ConnInfo =
|
||||||
|
#{peername := {PeerHost, PeerPort}}) ->
|
||||||
ConnInfo#{clean_start => true,
|
ConnInfo#{clean_start => true,
|
||||||
clientid => undefined,
|
clientid => anonymous_clientid(PeerHost, PeerPort),
|
||||||
username => undefined,
|
username => undefined,
|
||||||
conn_props => #{},
|
conn_props => #{},
|
||||||
connected => true,
|
connected => true,
|
||||||
|
@ -646,13 +649,15 @@ default_conninfo(ConnInfo) ->
|
||||||
receive_maximum => 0,
|
receive_maximum => 0,
|
||||||
expiry_interval => 0}.
|
expiry_interval => 0}.
|
||||||
|
|
||||||
default_clientinfo(#{peername := {PeerHost, PeerPort},
|
default_clientinfo(#{peername := {PeerHost, _},
|
||||||
sockname := {_, SockPort}}) ->
|
sockname := {_, SockPort},
|
||||||
|
clientid := ClientId
|
||||||
|
}) ->
|
||||||
#{zone => external,
|
#{zone => external,
|
||||||
protocol => exproto,
|
protocol => exproto,
|
||||||
peerhost => PeerHost,
|
peerhost => PeerHost,
|
||||||
sockport => SockPort,
|
sockport => SockPort,
|
||||||
clientid => anonymous_clientid(PeerHost, PeerPort),
|
clientid => ClientId,
|
||||||
username => undefined,
|
username => undefined,
|
||||||
is_bridge => false,
|
is_bridge => false,
|
||||||
is_superuser => false,
|
is_superuser => false,
|
||||||
|
|
|
@ -54,7 +54,13 @@ start_link(Pool, Id) ->
|
||||||
?MODULE, [Pool, Id], []).
|
?MODULE, [Pool, Id], []).
|
||||||
|
|
||||||
async_call(FunName, Req = #{conn := Conn}, Options) ->
|
async_call(FunName, Req = #{conn := Conn}, Options) ->
|
||||||
cast(pick(Conn), {rpc, FunName, Req, Options, self()}).
|
case pick(Conn) of
|
||||||
|
false ->
|
||||||
|
?LOG(error, "No available grpc client for ~s: ~p",
|
||||||
|
[FunName, Req]);
|
||||||
|
Pid when is_pid(Pid) ->
|
||||||
|
cast(Pid, {rpc, FunName, Req, Options, self()})
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% cast, pick
|
%% cast, pick
|
||||||
|
@ -65,6 +71,7 @@ async_call(FunName, Req = #{conn := Conn}, Options) ->
|
||||||
cast(Deliver, Msg) ->
|
cast(Deliver, Msg) ->
|
||||||
gen_server:cast(Deliver, Msg).
|
gen_server:cast(Deliver, Msg).
|
||||||
|
|
||||||
|
-spec pick(term()) -> pid() | false.
|
||||||
pick(Conn) ->
|
pick(Conn) ->
|
||||||
gproc_pool:pick_worker(exproto_gcli_pool, Conn).
|
gproc_pool:pick_worker(exproto_gcli_pool, Conn).
|
||||||
|
|
||||||
|
|
|
@ -213,11 +213,13 @@ t_keepalive_timeout(Cfg) ->
|
||||||
send(Sock, ConnBin),
|
send(Sock, ConnBin),
|
||||||
{ok, ConnAckBin} = recv(Sock, 5000),
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
||||||
|
|
||||||
DisconnectBin = frame_disconnect(),
|
%% Timed out connections are closed immediately,
|
||||||
{ok, DisconnectBin} = recv(Sock, 10000),
|
%% so there may not be a disconnect message here
|
||||||
|
%%DisconnectBin = frame_disconnect(),
|
||||||
|
%%{ok, DisconnectBin} = recv(Sock, 10000),
|
||||||
|
|
||||||
SockType =/= udp andalso begin
|
SockType =/= udp andalso begin
|
||||||
{error, closed} = recv(Sock, 5000)
|
{error, closed} = recv(Sock, 10000)
|
||||||
end, ok.
|
end, ok.
|
||||||
|
|
||||||
t_hook_connected_disconnected(Cfg) ->
|
t_hook_connected_disconnected(Cfg) ->
|
||||||
|
|
|
@ -62,7 +62,7 @@
|
||||||
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
|
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
|
||||||
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
|
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
|
||||||
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
|
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
|
||||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
|
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{xref_ignores,
|
{xref_ignores,
|
||||||
|
|
Loading…
Reference in New Issue