diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index ae0d4171b..71e559647 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -99,9 +99,7 @@ t_monitor_current_api_live_connections(_) -> ok = emqtt:disconnect(C), {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, ClientId1}]), {ok, _} = emqtt:connect(C1), - %% waiting for emqx_stats ticker - timer:sleep(1500), - _ = emqx_dashboard_monitor:current_rate(), + ok = waiting_emqx_stats_and_monitor_update('live_connections.max'), {ok, Rate} = request(["monitor_current"]), ?assertEqual(1, maps:get(<<"live_connections">>, Rate)), ?assertEqual(2, maps:get(<<"connections">>, Rate)), @@ -181,3 +179,24 @@ wait_new_monitor(OldMonitor, Count) -> timer:sleep(100), wait_new_monitor(OldMonitor, Count - 1) end. + +waiting_emqx_stats_and_monitor_update(WaitKey) -> + Self = self(), + meck:new(emqx_stats, [passthrough]), + meck:expect( + emqx_stats, + setstat, + fun(Stat, MaxStat, Val) -> + (Stat =:= WaitKey orelse MaxStat =:= WaitKey) andalso (Self ! updated), + meck:passthrough([Stat, MaxStat, Val]) + end + ), + receive + updated -> ok + after 5000 -> + error(waiting_emqx_stats_update_timeout) + end, + meck:unload([emqx_stats]), + %% manually call monitor update + _ = emqx_dashboard_monitor:current_rate(), + ok. diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 7c0b8c4bf..994d88b5d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -304,8 +304,7 @@ do_listeners_cluster_status(Listeners) -> status => #{ running => Running, current_connections => Curr, - %% XXX: Since it is taken from raw-conf, it is possible a string - max_connections => int(Max) + max_connections => ensure_integer_or_infinity(Max) } } } @@ -314,10 +313,15 @@ do_listeners_cluster_status(Listeners) -> Listeners ). -int(B) when is_binary(B) -> +ensure_integer_or_infinity(infinity) -> + infinity; +ensure_integer_or_infinity(<<"infinity">>) -> + infinity; +ensure_integer_or_infinity(B) when is_binary(B) -> binary_to_integer(B); -int(I) when is_integer(I) -> +ensure_integer_or_infinity(I) when is_integer(I) -> I. + aggregate_listener_status(NodeStatus) -> aggregate_listener_status(NodeStatus, 0, 0, undefined). @@ -330,11 +334,19 @@ aggregate_listener_status( CurrAcc, RunningAcc ) -> + NMaxAcc = plus_max_connections(MaxAcc, Max), NRunning = aggregate_running(Running, RunningAcc), - aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning); + aggregate_listener_status(T, NMaxAcc, Current + CurrAcc, NRunning); aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) -> {MaxAcc, CurrAcc, RunningAcc}. +plus_max_connections(_, infinity) -> + infinity; +plus_max_connections(infinity, _) -> + infinity; +plus_max_connections(A, B) when is_integer(A) andalso is_integer(B) -> + A + B. + aggregate_running(R, R) -> R; aggregate_running(R, undefined) -> R; aggregate_running(_, _) -> inconsistent. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 3c5706e82..845229425 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -266,7 +266,7 @@ common_listener_opts() -> )}, {max_connections, sc( - integer(), + hoconsc:union([pos_integer(), infinity]), #{ default => 1024, desc => ?DESC(gateway_common_listener_max_connections) diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl index b2e5861af..091a81142 100644 --- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -411,6 +411,38 @@ t_listeners_tcp(_) -> {404, _} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), ok. +t_listeners_max_conns(_) -> + {204, _} = request(put, "/gateways/stomp", #{}), + {404, _} = request(get, "/gateways/stomp/listeners"), + LisConf = #{ + name => <<"def">>, + type => <<"tcp">>, + bind => <<"127.0.0.1:61613">>, + max_connections => 1024 + }, + {201, _} = request(post, "/gateways/stomp/listeners", LisConf), + {200, ConfResp} = request(get, "/gateways/stomp/listeners"), + assert_confs([LisConf], ConfResp), + {200, ConfResp1} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), + assert_confs(LisConf, ConfResp1), + + LisConf2 = maps:merge(LisConf, #{max_connections => <<"infinity">>}), + {200, _} = request( + put, + "/gateways/stomp/listeners/stomp:tcp:def", + LisConf2 + ), + + {200, ConfResp2} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), + assert_confs(LisConf2, ConfResp2), + + {200, [Listeners]} = request(get, "/gateways/stomp/listeners"), + ?assertMatch(#{max_connections := <<"infinity">>}, Listeners), + + {204, _} = request(delete, "/gateways/stomp/listeners/stomp:tcp:def"), + {404, _} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"), + ok. + t_listeners_authn(_) -> GwConf = #{ name => <<"stomp">>, diff --git a/changes/ce/feat-10961.en.md b/changes/ce/feat-10961.en.md new file mode 100644 index 000000000..375a11af7 --- /dev/null +++ b/changes/ce/feat-10961.en.md @@ -0,0 +1,3 @@ +Adds support for unlimited max connections for gateway listeners by allowing +infinity as a valid value for the `max_connections` field in the configuration +and HTTP API