diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index f574c2ffe..289ed580d 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -11,6 +11,8 @@ * Fix delayed publish inaccurate caused by os time change. [#8926](https://github.com/emqx/emqx/pull/8926) * Fix that EMQX can't start when the retainer is disabled [#8911](https://github.com/emqx/emqx/pull/8911) * Fix that redis authn will deny the unknown users [#8934](https://github.com/emqx/emqx/pull/8934) +* Fix ExProto UDP client keepalive checking error. + This causes the clients to not expire as long as a new UDP packet arrives [#8866](https://github.com/emqx/emqx/pull/8866) ## Enhancements @@ -19,6 +21,8 @@ * Remove `node.etc_dir` from emqx.conf, because it is never used. Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892) * Added a new API `POST /listeners` for creating listener. [#8876](https://github.com/emqx/emqx/pull/8876) +* Close ExProto client process immediately if it's keepalive timeouted. [#8866](https://github.com/emqx/emqx/pull/8866) +* Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8866](https://github.com/emqx/emqx/pull/8866) # 5.0.7 diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl index 6c386e688..7436b2a1c 100644 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -43,6 +43,8 @@ {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} ]). +-define(SERVER_FORCE_SHUTDOWN_TIMEOUT, 5000). + -endif. -define(CMD_MOVE_FRONT, front). diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index cf83e8eb9..e58555ca1 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -21,6 +21,7 @@ -include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% APIs -export([start_link/0]). @@ -297,7 +298,8 @@ handle_info(refresh_tick, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State = #{servers := Servers}) -> +terminate(Reason, State = #{servers := Servers}) -> + _ = unload_exhooks(), _ = maps:fold( fun(Name, _, AccIn) -> do_unload_server(Name, AccIn) @@ -305,7 +307,7 @@ terminate(_Reason, State = #{servers := Servers}) -> State, Servers ), - _ = unload_exhooks(), + ?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index b15724ff2..9c89915aa 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -179,13 +179,16 @@ filter(Ls) -> -spec unload(server()) -> ok. unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) -> - _ = do_deinit(Name, ReqOpts), _ = may_unload_hooks(HookSpecs), + _ = do_deinit(Name, ReqOpts), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), ok. do_deinit(Name, ReqOpts) -> - _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts), + %% Override the request timeout to deinit grpc server to + %% avoid emqx_exhook_mgr force killed by upper supervisor + NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT}, + _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index fb424ddff..cd49d89bb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -16,6 +16,8 @@ -module(emqx_exhook_sup). +-include("emqx_exhook.hrl"). + -behaviour(supervisor). -export([ @@ -28,11 +30,13 @@ stop_grpc_client_channel/1 ]). --define(CHILD(Mod, Type, Args), #{ +-define(DEFAULT_TIMEOUT, 5000). + +-define(CHILD(Mod, Type, Args, Timeout), #{ id => Mod, start => {Mod, start_link, Args}, type => Type, - shutdown => 15000 + shutdown => Timeout }). %%-------------------------------------------------------------------- @@ -45,7 +49,7 @@ start_link() -> init([]) -> _ = emqx_exhook_metrics:init(), _ = emqx_exhook_mgr:init_ref_counter_table(), - Mngr = ?CHILD(emqx_exhook_mgr, worker, []), + Mngr = ?CHILD(emqx_exhook_mgr, worker, [], force_shutdown_timeout()), {ok, {{one_for_one, 10, 100}, [Mngr]}}. %%-------------------------------------------------------------------- @@ -70,3 +74,9 @@ stop_grpc_client_channel(Name) -> _:_:_ -> ok end. + +%% Calculate the maximum timeout, which will help to shutdown the +%% emqx_exhook_mgr process correctly. +force_shutdown_timeout() -> + Factor = max(3, length(emqx:get_config([exhook, servers])) + 1), + Factor * ?SERVER_FORCE_SHUTDOWN_TIMEOUT. diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 62606cf18..a0472b2c3 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -24,6 +24,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl). @@ -313,6 +314,40 @@ t_cluster_name(_) -> ), emqx_exhook_mgr:disable(<<"default">>). +t_stop_timeout(_) -> + snabbkaffe:start_trace(), + meck:new(emqx_exhook_demo_svr, [passthrough, no_history]), + meck:expect( + emqx_exhook_demo_svr, + on_provider_unloaded, + fun(Req, Md) -> + %% ensure sleep time greater than emqx_exhook_mgr shutdown timeout + timer:sleep(20000), + meck:passthrough([Req, Md]) + end + ), + + %% stop application + application:stop(emqx_exhook), + ?block_until(#{?snk_kind := exhook_mgr_terminated}, 20000), + + %% all exhook hooked point should be unloaded + Mods = lists:flatten( + lists:map( + fun({hook, _, Cbs}) -> + lists:map(fun({callback, {M, _, _}, _, _}) -> M end, Cbs) + end, + ets:tab2list(emqx_hooks) + ) + ), + ?assertEqual(false, lists:any(fun(M) -> M == emqx_exhook_handler end, Mods)), + + %% ensure started for other tests + emqx_common_test_helpers:start_apps([emqx_exhook]), + + snabbkaffe:stop(), + meck:unload(emqx_exhook_demo_svr). + %%-------------------------------------------------------------------- %% Cases Helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index ea8398eeb..b566b7ab2 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -80,7 +80,16 @@ stop() -> stop(Name) -> grpc:stop_server(Name), - to_atom_name(Name) ! stop. + case whereis(to_atom_name(Name)) of + undefined -> + ok; + Pid -> + Ref = erlang:monitor(process, Pid), + Pid ! stop, + receive + {'DOWN', Ref, process, Pid, _Reason} -> ok + end + end. take() -> to_atom_name(?NAME) ! {take, self()}, diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 9e65c7eea..99ac3a38f 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API -export([ @@ -51,6 +52,9 @@ %% Internal callback -export([wakeup_from_hib/2, recvloop/2]). +%% for channel module +-export([keepalive_stats/1]). + -record(state, { %% TCP/SSL/UDP/DTLS Wrapped Socket socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, @@ -240,6 +244,11 @@ esockd_send(Data, #state{ esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> esockd_transport:async_send(Sock, Data). +keepalive_stats(recv) -> + emqx_pd:get_counter(recv_pkt); +keepalive_stats(send) -> + emqx_pd:get_counter(send_pkt). + is_datadram_socket({esockd_transport, _}) -> false; is_datadram_socket({udp, _, _}) -> true. @@ -568,9 +577,15 @@ terminate( channel = Channel } ) -> - ?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}), _ = ChannMod:terminate(Reason, Channel), _ = close_socket(State), + ClientId = + try ChannMod:info(clientid, Channel) of + Id -> Id + catch + _:_ -> undefined + end, + ?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}), exit(Reason). %%-------------------------------------------------------------------- @@ -635,28 +650,22 @@ handle_timeout( Keepalive, State = #state{ chann_mod = ChannMod, - socket = Socket, channel = Channel } ) when Keepalive == keepalive; Keepalive == keepalive_send -> - Stat = + StatVal = case Keepalive of - keepalive -> recv_oct; - keepalive_send -> send_oct + keepalive -> keepalive_stats(recv); + keepalive_send -> keepalive_stats(send) end, case ChannMod:info(conn_state, Channel) of disconnected -> {ok, State}; _ -> - case esockd_getstat(Socket, [Stat]) of - {ok, [{Stat, RecvOct}]} -> - handle_timeout(TRef, {Keepalive, RecvOct}, State); - {error, Reason} -> - handle_info({sock_error, Reason}, State) - end + handle_timeout(TRef, {Keepalive, StatVal}, State) end; handle_timeout( _TRef, diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 861ae3189..be4cddcaa 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -78,7 +78,8 @@ -define(TIMER_TABLE, #{ alive_timer => keepalive, - force_timer => force_close + force_timer => force_close, + idle_timer => force_close_idle }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). @@ -151,14 +152,17 @@ init( Ctx = maps:get(ctx, Options), GRpcChann = maps:get(handler, Options), PoolName = maps:get(pool_name, Options), - NConnInfo = default_conninfo(ConnInfo), + IdleTimeout = emqx_gateway_utils:idle_timeout(Options), + + NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}), ListenerId = case maps:get(listener, Options, undefined) of undefined -> undefined; {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName) end, + EnableAuthn = maps:get(enable_authn, Options, true), - DefaultClientInfo = default_clientinfo(ConnInfo), + DefaultClientInfo = default_clientinfo(NConnInfo), ClientInfo = DefaultClientInfo#{ listener => ListenerId, enable_authn => EnableAuthn @@ -183,7 +187,9 @@ init( } ) }, - try_dispatch(on_socket_created, wrap(Req), Channel). + start_idle_checking_timer( + try_dispatch(on_socket_created, wrap(Req), Channel) + ). %% @private peercert(NoSsl, ConnInfo) when @@ -217,6 +223,12 @@ socktype(dtls) -> 'DTLS'. address({Host, Port}) -> #{host => inet:ntoa(Host), port => Port}. +%% avoid udp connection process leak +start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) -> + ensure_timer(idle_timer, Channel); +start_idle_checking_timer(Channel) -> + Channel. + %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- @@ -285,10 +297,15 @@ handle_timeout( {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> Req = #{type => 'KEEPALIVE'}, - {ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)} + NChannel = remove_timer_ref(alive_timer, Channel), + %% close connection if keepalive timeout + Replies = [{event, disconnected}, {close, keepalive_timeout}], + {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> {shutdown, {error, {force_close, Reason}}, Channel}; +handle_timeout(_TRef, force_close_idle, Channel) -> + {shutdown, idle_timeout, Channel}; handle_timeout(_TRef, Msg, Channel) -> ?SLOG(warning, #{ msg => "unexpected_timeout_signal", @@ -390,7 +407,7 @@ handle_call( NConnInfo = ConnInfo#{keepalive => Interval}, NClientInfo = ClientInfo#{keepalive => Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, - {reply, ok, ensure_keepalive(NChannel)}; + {reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))}; handle_call( {subscribe_from_client, TopicFilter, Qos}, _From, @@ -405,21 +422,21 @@ handle_call( {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel}; _ -> {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), - {reply, ok, NChannel} + {reply, ok, [{event, updated}], NChannel} end; handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> {ok, [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel), - {reply, {ok, {NTopicFilter, NSubOpts}}, NChannel}; + {reply, {ok, {NTopicFilter, NSubOpts}}, [{event, updated}], NChannel}; handle_call( {unsubscribe_from_client, TopicFilter}, _From, Channel = #channel{conn_state = connected} ) -> {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel), - {reply, ok, NChannel}; + {reply, ok, [{event, updated}], NChannel}; handle_call({unsubscribe, Topic}, _From, Channel) -> {ok, NChannel} = do_unsubscribe([Topic], Channel), - {reply, ok, NChannel}; + {reply, ok, [{event, update}], NChannel}; handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> {reply, {ok, maps:to_list(Subs)}, Channel}; handle_call( @@ -446,7 +463,7 @@ handle_call( {reply, ok, Channel} end; handle_call(kick, _From, Channel) -> - {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)}; + {reply, ok, [{event, disconnected}, {close, kicked}], Channel}; handle_call(discard, _From, Channel) -> {shutdown, discarded, ok, Channel}; handle_call(Req, _From, Channel) -> @@ -648,7 +665,8 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) -> ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> Channel; ensure_keepalive_timer(Interval, Channel) -> - Keepalive = emqx_keepalive:init(timer:seconds(Interval)), + StatVal = emqx_gateway_conn:keepalive_stats(recv), + Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(Name, Channel = #channel{timers = Timers}) -> @@ -666,11 +684,17 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> - ensure_timer(Name, clean_timer(Name, Channel)). + ensure_timer(Name, remove_timer_ref(Name, Channel)). -clean_timer(Name, Channel = #channel{timers = Timers}) -> +cancel_timer(Name, Channel = #channel{timers = Timers}) -> + emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), + remove_timer_ref(Name, Channel). + +remove_timer_ref(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. +interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> + IdleTimeout; interval(force_timer, _) -> 15000; interval(alive_timer, #channel{keepalive = Keepalive}) -> @@ -725,7 +749,7 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> default_conninfo(ConnInfo) -> ConnInfo#{ clean_start => true, - clientid => undefined, + clientid => anonymous_clientid(), username => undefined, conn_props => #{}, connected => true, @@ -739,14 +763,15 @@ default_conninfo(ConnInfo) -> default_clientinfo(#{ peername := {PeerHost, _}, - sockname := {_, SockPort} + sockname := {_, SockPort}, + clientid := ClientId }) -> #{ zone => default, protocol => exproto, peerhost => PeerHost, sockport => SockPort, - clientid => undefined, + clientid => ClientId, username => undefined, is_bridge => false, is_superuser => false, @@ -764,3 +789,6 @@ proto_name_to_protocol(<<>>) -> exproto; proto_name_to_protocol(ProtoName) when is_binary(ProtoName) -> binary_to_atom(ProtoName). + +anonymous_clientid() -> + iolist_to_binary(["exproto-", emqx_misc:gen_id()]). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl index d1bf4ba94..cf8ed76a7 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl @@ -56,12 +56,19 @@ start_link(Pool, Id) -> [] ). +-spec async_call(atom(), map(), map()) -> ok. async_call( FunName, Req = #{conn := Conn}, Options = #{pool_name := PoolName} ) -> - cast(pick(PoolName, Conn), {rpc, FunName, Req, Options, self()}). + case pick(PoolName, Conn) of + false -> + reply(self(), FunName, {error, no_available_grpc_client}); + Pid when is_pid(Pid) -> + cast(Pid, {rpc, FunName, Req, Options, self()}) + end, + ok. %%-------------------------------------------------------------------- %% cast, pick @@ -72,6 +79,7 @@ async_call( cast(Deliver, Msg) -> gen_server:cast(Deliver, Msg). +-spec pick(term(), term()) -> pid() | false. pick(PoolName, Conn) -> gproc_pool:pick_worker(PoolName, Conn). diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 66f780d3f..0e863a14c 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("eunit/include/eunit.hrl"). -import( emqx_exproto_echo_svr, @@ -38,6 +39,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TCPOPTS, [binary, {active, false}]). -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). @@ -62,6 +64,9 @@ all() -> [{group, Name} || Name <- metrics()]. +suite() -> + [{timetrap, {seconds, 30}}]. + groups() -> Cases = emqx_common_test_helpers:all(?MODULE), [{Name, Cases} || Name <- metrics()]. @@ -87,6 +92,7 @@ set_special_cfg(emqx_gateway) -> [gateway, exproto], #{ server => #{bind => 9100}, + idle_timeout => 5000, handler => #{address => "http://127.0.0.1:9001"}, listeners => listener_confs(LisType) } @@ -223,14 +229,16 @@ t_acl_deny(Cfg) -> close(Sock). t_keepalive_timeout(Cfg) -> + ok = snabbkaffe:start_trace(), SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), + ClientId1 = <<"keepalive_test_client1">>, Client = #{ proto_name => <<"demo">>, proto_ver => <<"v0.1">>, - clientid => <<"test_client_1">>, - keepalive => 2 + clientid => ClientId1, + keepalive => 5 }, Password = <<"123456">>, @@ -238,16 +246,42 @@ t_keepalive_timeout(Cfg) -> ConnAckBin = frame_connack(0), send(Sock, ConnBin), - {ok, ConnAckBin} = recv(Sock, 5000), + {ok, ConnAckBin} = recv(Sock), - DisconnectBin = frame_disconnect(), - {ok, DisconnectBin} = recv(Sock, 10000), - - SockType =/= udp andalso - begin - {error, closed} = recv(Sock, 5000) - end, - ok. + case SockType of + udp -> + %% another udp client should not affect the first + %% udp client keepalive check + timer:sleep(4000), + Sock2 = open(SockType), + ConnBin2 = frame_connect( + Client#{clientid => <<"keepalive_test_client2">>}, + Password + ), + send(Sock2, ConnBin2), + %% first client will be keepalive timeouted in 6s + ?assertMatch( + {ok, #{ + clientid := ClientId1, + reason := {shutdown, {sock_closed, keepalive_timeout}} + }}, + ?block_until(#{?snk_kind := conn_process_terminated}, 8000) + ); + _ -> + ?assertMatch( + {ok, #{ + clientid := ClientId1, + reason := {shutdown, {sock_closed, keepalive_timeout}} + }}, + ?block_until(#{?snk_kind := conn_process_terminated}, 12000) + ), + Trace = snabbkaffe:collect_trace(), + %% conn process should be terminated + ?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))), + %% socket port should be closed + ?assertEqual({error, closed}, recv(Sock, 5000)) + end, + snabbkaffe:stop(). t_hook_connected_disconnected(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), @@ -337,6 +371,8 @@ t_hook_session_subscribed_unsubscribed(Cfg) -> error(hook_is_not_running) end, + send(Sock, frame_disconnect()), + close(Sock), emqx_hooks:del('session.subscribed', {?MODULE, hook_fun3}), emqx_hooks:del('session.unsubscribed', {?MODULE, hook_fun4}). @@ -373,6 +409,48 @@ t_hook_message_delivered(Cfg) -> close(Sock), emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}). +t_idle_timeout(Cfg) -> + ok = snabbkaffe:start_trace(), + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + %% need to create udp client by sending something + case SockType of + udp -> + %% nothing to do + ok = meck:new(emqx_exproto_gcli, [passthrough, no_history]), + ok = meck:expect( + emqx_exproto_gcli, + async_call, + fun(FunName, _Req, _GClient) -> + self() ! {hreply, FunName, ok}, + ok + end + ), + %% send request, but nobody can respond to it + ClientId = <<"idle_test_client1">>, + Client = #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => ClientId, + keepalive => 5 + }, + Password = <<"123456">>, + ConnBin = frame_connect(Client, Password), + send(Sock, ConnBin), + ?assertMatch( + {ok, #{reason := {shutdown, idle_timeout}}}, + ?block_until(#{?snk_kind := conn_process_terminated}, 10000) + ), + ok = meck:unload(emqx_exproto_gcli); + _ -> + ?assertMatch( + {ok, #{reason := {shutdown, idle_timeout}}}, + ?block_until(#{?snk_kind := conn_process_terminated}, 10000) + ) + end, + snabbkaffe:stop(). + %%-------------------------------------------------------------------- %% Utils @@ -422,6 +500,9 @@ send({ssl, Sock}, Bin) -> send({dtls, Sock}, Bin) -> ssl:send(Sock, Bin). +recv(Sock) -> + recv(Sock, infinity). + recv({tcp, Sock}, Ts) -> gen_tcp:recv(Sock, 0, Ts); recv({udp, Sock}, Ts) -> diff --git a/mix.exs b/mix.exs index 3f55de64b..c4c827f92 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true}, {:ekka, github: "emqx/ekka", tag: "0.13.4", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, - {:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true}, + {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.2"}, {:replayq, "0.3.4", override: true}, diff --git a/rebar.config b/rebar.config index 271e274da..e9a8cc425 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.4"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} - , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} + , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.4"}