From 8df7b1a1be276bab562171bd4335200f2bd975c8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 7 Jun 2023 18:15:14 +0800 Subject: [PATCH 1/5] fix(gateway): fix http-api 500 issue if setting max_connections to infinity --- .../src/emqx_gateway_api_listeners.erl | 16 ++++++++++++++-- apps/emqx_gateway/src/emqx_gateway_schema.erl | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 7c0b8c4bf..a6d1f3139 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -304,7 +304,6 @@ do_listeners_cluster_status(Listeners) -> status => #{ running => Running, current_connections => Curr, - %% XXX: Since it is taken from raw-conf, it is possible a string max_connections => int(Max) } } @@ -314,10 +313,15 @@ do_listeners_cluster_status(Listeners) -> Listeners ). +int(infinity) -> + infinity; +int(<<"infinity">>) -> + infinity; int(B) when is_binary(B) -> binary_to_integer(B); int(I) when is_integer(I) -> I. + aggregate_listener_status(NodeStatus) -> aggregate_listener_status(NodeStatus, 0, 0, undefined). @@ -330,11 +334,19 @@ aggregate_listener_status( CurrAcc, RunningAcc ) -> + NMaxAcc = plus_max_connections(MaxAcc, Max), NRunning = aggregate_running(Running, RunningAcc), - aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning); + aggregate_listener_status(T, NMaxAcc, Current + CurrAcc, NRunning); aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) -> {MaxAcc, CurrAcc, RunningAcc}. +plus_max_connections(_, infinity) -> + infinity; +plus_max_connections(infinity, _) -> + infinity; +plus_max_connections(A, B) when is_integer(A) andalso is_integer(B) -> + A + B. + aggregate_running(R, R) -> R; aggregate_running(R, undefined) -> R; aggregate_running(_, _) -> inconsistent. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 3c5706e82..96eeb3f62 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -266,7 +266,7 @@ common_listener_opts() -> )}, {max_connections, sc( - integer(), + hoconsc:union([infinity, pos_integer()]), #{ default => 1024, desc => ?DESC(gateway_common_listener_max_connections) From dbb8742e0ea0fec7674cc6ab926c015d470b5b80 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 8 Jun 2023 14:17:02 +0800 Subject: [PATCH 2/5] test: cover max_connection in infinity value --- .../test/emqx_gateway_api_SUITE.erl | 32 +++++++++++++++++++ changes/ce/feat-10961.en.md | 3 ++ 2 files changed, 35 insertions(+) create mode 100644 changes/ce/feat-10961.en.md diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl index b2e5861af..091a81142 100644 --- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -411,6 +411,38 @@ t_listeners_tcp(_) -> {404, _} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), ok. +t_listeners_max_conns(_) -> + {204, _} = request(put, "/gateways/stomp", #{}), + {404, _} = request(get, "/gateways/stomp/listeners"), + LisConf = #{ + name => <<"def">>, + type => <<"tcp">>, + bind => <<"127.0.0.1:61613">>, + max_connections => 1024 + }, + {201, _} = request(post, "/gateways/stomp/listeners", LisConf), + {200, ConfResp} = request(get, "/gateways/stomp/listeners"), + assert_confs([LisConf], ConfResp), + {200, ConfResp1} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), + assert_confs(LisConf, ConfResp1), + + LisConf2 = maps:merge(LisConf, #{max_connections => <<"infinity">>}), + {200, _} = request( + put, + "/gateways/stomp/listeners/stomp:tcp:def", + LisConf2 + ), + + {200, ConfResp2} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), + assert_confs(LisConf2, ConfResp2), + + {200, [Listeners]} = request(get, "/gateways/stomp/listeners"), + ?assertMatch(#{max_connections := <<"infinity">>}, Listeners), + + {204, _} = request(delete, "/gateways/stomp/listeners/stomp:tcp:def"), + {404, _} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), + ok. + t_listeners_authn(_) -> GwConf = #{ name => <<"stomp">>, diff --git a/changes/ce/feat-10961.en.md b/changes/ce/feat-10961.en.md new file mode 100644 index 000000000..375a11af7 --- /dev/null +++ b/changes/ce/feat-10961.en.md @@ -0,0 +1,3 @@ +Adds support for unlimited max connections for gateway listeners by allowing +infinity as a valid value for the `max_connections` field in the configuration +and HTTP API From 6a05663bd5c4708593e4c3899e0a67986986be3b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 9 Jun 2023 09:50:24 +0800 Subject: [PATCH 3/5] chore: ensure default value's type be the first one in the union type Co-authored-by: Zaiming (Stone) Shi --- apps/emqx_gateway/src/emqx_gateway_schema.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 96eeb3f62..845229425 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -266,7 +266,7 @@ common_listener_opts() -> )}, {max_connections, sc( - hoconsc:union([infinity, pos_integer()]), + hoconsc:union([pos_integer(), infinity]), #{ default => 1024, desc => ?DESC(gateway_common_listener_max_connections) From 72311a546bd01e9894af999adaa2855a6584b852 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 9 Jun 2023 10:44:17 +0800 Subject: [PATCH 4/5] chore: more clear funcation name --- apps/emqx_gateway/src/emqx_gateway_api_listeners.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index a6d1f3139..994d88b5d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -304,7 +304,7 @@ do_listeners_cluster_status(Listeners) -> status => #{ running => Running, current_connections => Curr, - max_connections => int(Max) + max_connections => ensure_integer_or_infinity(Max) } } } @@ -313,13 +313,13 @@ do_listeners_cluster_status(Listeners) -> Listeners ). -int(infinity) -> +ensure_integer_or_infinity(infinity) -> infinity; -int(<<"infinity">>) -> +ensure_integer_or_infinity(<<"infinity">>) -> infinity; -int(B) when is_binary(B) -> +ensure_integer_or_infinity(B) when is_binary(B) -> binary_to_integer(B); -int(I) when is_integer(I) -> +ensure_integer_or_infinity(I) when is_integer(I) -> I. aggregate_listener_status(NodeStatus) -> From 53eb8d7f1bbfef2c7333a821ea5965f1d4fed408 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 9 Jun 2023 11:34:56 +0800 Subject: [PATCH 5/5] test: fix flaky tests introduced by https://github.com/emqx/emqx/pull/10948 --- .../test/emqx_dashboard_monitor_SUITE.erl | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index ae0d4171b..71e559647 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -99,9 +99,7 @@ t_monitor_current_api_live_connections(_) -> ok = emqtt:disconnect(C), {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, ClientId1}]), {ok, _} = emqtt:connect(C1), - %% waiting for emqx_stats ticker - timer:sleep(1500), - _ = emqx_dashboard_monitor:current_rate(), + ok = waiting_emqx_stats_and_monitor_update('live_connections.max'), {ok, Rate} = request(["monitor_current"]), ?assertEqual(1, maps:get(<<"live_connections">>, Rate)), ?assertEqual(2, maps:get(<<"connections">>, Rate)), @@ -181,3 +179,24 @@ wait_new_monitor(OldMonitor, Count) -> timer:sleep(100), wait_new_monitor(OldMonitor, Count - 1) end. + +waiting_emqx_stats_and_monitor_update(WaitKey) -> + Self = self(), + meck:new(emqx_stats, [passthrough]), + meck:expect( + emqx_stats, + setstat, + fun(Stat, MaxStat, Val) -> + (Stat =:= WaitKey orelse MaxStat =:= WaitKey) andalso (Self ! updated), + meck:passthrough([Stat, MaxStat, Val]) + end + ), + receive + updated -> ok + after 5000 -> + error(waiting_emqx_stats_update_timeout) + end, + meck:unload([emqx_stats]), + %% manually call monitor update + _ = emqx_dashboard_monitor:current_rate(), + ok.