diff --git a/etc/emqx.conf b/etc/emqx.conf index 2f03ec7c1..223c8593c 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -181,6 +181,11 @@ node.name = emqx@127.0.0.1 ## Value: String node.cookie = emqxsecretcookie +## Node Max Clients Size. +## +## Value: String +node.max_clients = 1024000 + ## Data dir for the node ## ## Value: Folder diff --git a/priv/emqx.schema b/priv/emqx.schema index c7d6d0af5..f4b5751b5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -334,6 +334,13 @@ end}. hidden ]}. +%% @see node.max_clients +{mapping, "node.max_clients", "emqx.max_clients", [ + {default, 1024000}, + {datatype, integer}, + hidden +]}. + %%-------------------------------------------------------------------- %% RPC %%-------------------------------------------------------------------- diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 852338854..2ad132a9a 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -44,6 +44,8 @@ -export([lookup_conn_pid/1]). +-export([max_client_size/0]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -148,6 +150,9 @@ lookup_conn_pid(ClientId) when is_binary(ClientId) -> notify(Msg) -> gen_server:cast(?CM, {notify, Msg}). +max_client_size() -> + ets:info(?CONN_TAB, size). + %%----------------------------------------------------------------------------- %% gen_server callbacks %%----------------------------------------------------------------------------- diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index e4fe32874..cc6bf1652 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -260,10 +260,22 @@ set_protover(_Packet, PState) -> received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> {error, proto_not_connected, PState}; +received(Packet = ?PACKET(?CONNECT), PState = #pstate{connected = false}) -> + case check_max_clients() of + true -> + ?LOG(error, "Connection rejected due to max clients limitation"), + connack({?RC_QUOTA_EXCEEDED, PState}); + false -> + do_received(Packet, PState) + end; + received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> {error, proto_unexpected_connect, PState}; -received(Packet = ?PACKET(Type), PState) -> +received(Packet, PState) -> + do_received(Packet, PState). + +do_received(Packet = ?PACKET(Type), PState) -> trace(recv, Packet), PState1 = set_protover(Packet, PState), try emqx_packet:validate(Packet) of @@ -1048,3 +1060,8 @@ do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm) -> allow -> AllowTerm; deny -> DenyTerm end. + +check_max_clients() -> + CurrentClientSize = emqx_cm:max_client_size(), + MaxClients = emqx_config:get_env(max_clients, 1024000), + CurrentClientSize >= MaxClients. \ No newline at end of file