From 1a2a8f017c8dbc4a635d7fad63dfb44862820676 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 21 Jul 2022 09:59:30 +0200 Subject: [PATCH 1/7] test(shared_sub): Increase timeout --- test/emqx_shared_sub_SUITE.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 30bd96a66..837ef962f 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -284,7 +284,7 @@ test_two_messages(Strategy, Group) -> ok. last_message(ExpectedPayload, Pids) -> - last_message(ExpectedPayload, Pids, 100). + last_message(ExpectedPayload, Pids, 6000). last_message(ExpectedPayload, Pids, Timeout) -> receive @@ -401,10 +401,10 @@ t_local_fallback(_) -> emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/local_foo/bar">>, 0}), - emqx:publish(Message1), + [{share, <<"local_foo/bar">>, {ok, 1}}] = emqx:publish(Message1), {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]), - rpc:call(Node, emqx, publish, [Message2]), + [{share, <<"local_foo/bar">>, {ok, 1}}] = rpc:call(Node, emqx, publish, [Message2]), {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]), emqtt:stop(ConnPid1), @@ -537,7 +537,7 @@ start_slave(Name, Port) -> {erl_flags, ebin_path()}]), pong = net_adm:ping(Node), - setup_node(Node, Port), + ok = setup_node(Node, Port), Node. stop_slave(Node) -> @@ -573,6 +573,6 @@ setup_node(Node, Port) -> [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]), - rpc:call(Node, ekka, join, [node()]), - + ok = rpc:call(Node, ekka, join, [node()]), + true = lists:member(Node, nodes()), ok. From 0982e8e7fd052cd1f325685351e757f22e4e8e2e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 26 Jul 2022 11:29:12 +0200 Subject: [PATCH 2/7] test(shared_sub): Fix gen_rpc setup --- test/emqx_shared_sub_SUITE.erl | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 837ef962f..cea92c986 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -37,11 +37,15 @@ all() -> emqx_ct:all(?SUITE). init_per_suite(Config) -> net_kernel:start(['master@127.0.0.1', longnames]), emqx_ct_helpers:boot_modules(all), + PortDiscovery = application:get_env(gen_rpc, port_discovery), + application:set_env(gen_rpc, port_discovery, stateless), + application:ensure_all_started(gen_rpc), emqx_ct_helpers:start_apps([]), - Config. + [{port_discovery, PortDiscovery} | Config]. -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). +end_per_suite(Config) -> + emqx_ct_helpers:stop_apps([gen_rpc]), + application:set_env(gen_rpc, port_discovery, proplists:get_value(port_discovery, Config)). t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). @@ -336,7 +340,7 @@ t_per_group_config(_) -> test_two_messages(round_robin, <<"round_robin_group">>). t_local(_) -> - Node = start_slave('local_shared_sub_test', 21884), + Node = start_slave('local_shared_sub_test19', 21884), GroupConfig = #{ <<"local_group_fallback">> => local, <<"local_group">> => local, @@ -392,7 +396,7 @@ t_local_fallback(_) -> Topic = <<"local_foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - Node = start_slave('local_fallback_shared_sub_test', 11885), + Node = start_slave('local_fallback_shared_sub_test19', 11885), {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), {ok, _} = emqtt:connect(ConnPid1), @@ -563,8 +567,7 @@ setup_node(Node, Port) -> name => "internal", opts => [{zone,internal}], proto => tcp}]), - application:set_env(gen_rpc, port_discovery, manual), - application:set_env(gen_rpc, tcp_server_port, Port * 2), + application:set_env(gen_rpc, port_discovery, stateless), ok; (_) -> ok @@ -572,7 +575,13 @@ setup_node(Node, Port) -> [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]), + rpc:call(Node, ekka, join, [node()]), - ok = rpc:call(Node, ekka, join, [node()]), - true = lists:member(Node, nodes()), + %% Sanity check. Assert that `gen_rpc' is set up correctly: + ?assertEqual( Node + , gen_rpc:call(Node, erlang, node, []) + ), + ?assertEqual( node() + , gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []]) + ), ok. From b042498b3c0251b0fbc43cbad051fc654d9110d9 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 26 Jul 2022 15:25:01 +0200 Subject: [PATCH 3/7] test(shared_sub): Apply remarks Co-authored-by: Thales Macedo Garitezi --- test/emqx_shared_sub_SUITE.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index cea92c986..02c4c6598 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -45,7 +45,10 @@ init_per_suite(Config) -> end_per_suite(Config) -> emqx_ct_helpers:stop_apps([gen_rpc]), - application:set_env(gen_rpc, port_discovery, proplists:get_value(port_discovery, Config)). + case proplists:get_value(port_discovery, Config) of + {ok, OldValue} -> application:set_env(gen_rpc, port_discovery, OldValue); + _ -> ok + end. t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). From db26956f3e83f71c35bdd4fc7eaca429a769b40a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 26 Jul 2022 12:12:13 +0800 Subject: [PATCH 4/7] feat(exproto): keeping client information up to date --- apps/emqx_exproto/src/emqx_exproto_channel.erl | 9 +++++---- apps/emqx_exproto/src/emqx_exproto_conn.erl | 10 +++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index dce3b36e1..80e5f4045 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -260,7 +260,8 @@ handle_timeout(_TRef, {keepalive, StatVal}, {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> Req = #{type => 'KEEPALIVE'}, - {ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)} + NChannel = clean_timer(alive_timer, Channel), + {ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> @@ -327,7 +328,7 @@ handle_call({start_timer, keepalive, Interval}, NConnInfo = ConnInfo#{keepalive => Interval}, NClientInfo = ClientInfo#{keepalive => Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, - {reply, ok, ensure_keepalive(NChannel)}; + {reply, ok, [{event, updated}], ensure_keepalive(NChannel)}; handle_call({subscribe, TopicFilter, Qos}, Channel = #channel{ @@ -339,13 +340,13 @@ handle_call({subscribe, TopicFilter, Qos}, {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; _ -> {ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), - {reply, ok, NChannel} + {reply, ok, [{event, updated}], NChannel} end; handle_call({unsubscribe, TopicFilter}, Channel = #channel{conn_state = connected}) -> {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel), - {reply, ok, NChannel}; + {reply, ok, [{event, updated}], NChannel}; handle_call({publish, Topic, Qos, Payload}, Channel = #channel{ diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index 0e83e96c4..5c56731d6 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -450,11 +450,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) -> emqx_cm:connection_closed(ClientId), {ok, State}; -%handle_msg({event, _Other}, State = #state{channel = Channel}) -> -% ClientId = emqx_exproto_channel:info(clientid, Channel), -% emqx_cm:set_chan_info(ClientId, info(State)), -% emqx_cm:set_chan_stats(ClientId, stats(State)), -% {ok, State}; +handle_msg({event, _Other}, State = #state{channel = Channel}) -> + ClientId = emqx_exproto_channel:info(clientid, Channel), + emqx_cm:set_chan_info(ClientId, info(State)), + emqx_cm:set_chan_stats(ClientId, stats(State)), + {ok, State}; handle_msg({timeout, TRef, TMsg}, State) -> handle_timeout(TRef, TMsg, State); From c188eeb1d957326ce5530831d08c84852b13d72a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 26 Jul 2022 17:10:11 +0800 Subject: [PATCH 5/7] fix(exproto): refine keepalive timer checking --- apps/emqx_exproto/src/emqx_exproto_conn.erl | 28 ++++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index 5c56731d6..152c0edf1 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -211,16 +211,35 @@ esockd_setopts({esockd_transport, Socket}, Opts) -> %% FIXME: DTLS works?? esockd_transport:setopts(Socket, Opts). -esockd_getstat({udp, _SockPid, Sock}, Stats) -> - inet:getstat(Sock, Stats); esockd_getstat({esockd_transport, Sock}, Stats) -> - esockd_transport:getstat(Sock, Stats). + esockd_transport:getstat(Sock, Stats); +esockd_getstat({udp, _SockPid, _Sock}, Stats) -> + {ok, lists:map(fun(K) -> {K, get_stats(K)} end, Stats)}. -send(Data, #state{socket = {udp, _SockPid, Sock}, peername = {Ip, Port}}) -> +send(Data, State = #state{socket = {udp, _SockPid, Sock}, peername = {Ip, Port}}) -> + incr_send_stats(Data, State), gen_udp:send(Sock, Ip, Port, Data); send(Data, #state{socket = {esockd_transport, Sock}}) -> esockd_transport:async_send(Sock, Data). +incr_recv_stats(Data, #state{socket = {udp, _, _}}) -> + incr_stats(recv_oct, byte_size(Data)), + incr_stats(recv_cnt, 1). + +incr_send_stats(Data, #state{socket = {udp, _, _}}) -> + incr_stats(send_oct, byte_size(Data)), + incr_stats(send_cnt, 1). + +incr_stats(Key, Cnt) -> + Cnt0 = get_stats(Key), + put({stats, Key}, Cnt0 + Cnt). + +get_stats(Key) -> + case get({stats, Key}) of + undefined -> 0; + Cnt -> Cnt + end. + %%-------------------------------------------------------------------- %% callbacks %%-------------------------------------------------------------------- @@ -386,6 +405,7 @@ handle_msg({'$gen_cast', Req}, State) -> with_channel(handle_cast, [Req], State); handle_msg({datagram, _SockPid, Data}, State) -> + incr_recv_stats(Data, State), process_incoming(Data, State); handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> From a5bf1a3b2dfa70d6704cf0be9ab24be1775dcbb6 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 27 Jul 2022 09:38:46 +0800 Subject: [PATCH 6/7] chore(exproto): update app.src & appup.src --- apps/emqx_exproto/src/emqx_exproto.app.src | 2 +- apps/emqx_exproto/src/emqx_exproto.appup.src | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src index 41c19f752..76bc9fa85 100644 --- a/apps/emqx_exproto/src/emqx_exproto.app.src +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_exproto, [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.3.8"}, %% 4.3.3 is used by ee + {vsn, "4.3.9"}, %% 4.3.3 is used by ee {modules, []}, {registered, []}, {mod, {emqx_exproto_app, []}}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 33bd4386c..96b3d9ea3 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,7 +1,10 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{<<"4\\.3\\.[6-7]">>, + [{"4.3.8", + [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[6-7]">>, [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-5]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, @@ -12,7 +15,10 @@ {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[6-7]">>, + [{"4.3.8", + [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[6-7]">>, [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-5]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, From 448ae41a641932e800ae2c09213e73ce012093df Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 27 Jul 2022 09:43:04 +0800 Subject: [PATCH 7/7] chore: update changes --- CHANGES-4.3.md | 2 ++ apps/emqx_exproto/src/emqx_exproto.appup.src | 14 ++------------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index ac9a11a0d..c6bb1b34d 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -20,6 +20,8 @@ File format: - Fixed crash when shared persistent subscription [#8441] - Fixed issue in Lua hook that prevented messages from being rejected [#8535] +- Fix ExProto UDP client keepalive checking error. + This causes the clients to not expire as long as a new UDP packet arrives [#8575] ### Enhancements - HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450] diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 96b3d9ea3..d6406b239 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,12 +1,7 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.8", - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, - {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[6-7]">>, - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[2-5]">>, + [{<<"4\\.3\\.[2-8]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>, @@ -15,12 +10,7 @@ {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.8", - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, - {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[6-7]">>, - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[2-5]">>, + [{<<"4\\.3\\.[2-8]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>,