diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 7bfef472d..fa64f3528 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1536,6 +1536,7 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), + emqx_cm:increment_connected_client_count(), Channel#channel{conninfo = NConnInfo, conn_state = connected }. @@ -1624,6 +1625,7 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), + emqx_cm:decrement_connected_client_count(), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- @@ -1725,4 +1727,3 @@ flag(false) -> 0. set_field(Name, Value, Channel) -> Pos = emqx_misc:index_of(Name, record_info(fields, channel)), setelement(Pos+1, Channel, Value). - diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 23f078568..2ecacabc2 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -72,7 +72,12 @@ ]). %% Internal export --export([stats_fun/0, clean_down/1]). +-export([ stats_fun/0 + , clean_down/1 + , increment_connected_client_count/0 + , decrement_connected_client_count/0 + , get_connected_client_count/0 + ]). -type(chan_pid() :: pid()). @@ -141,6 +146,12 @@ unregister_channel(ClientId) when is_binary(ClientId) -> do_unregister_channel(Chan) -> ok = emqx_cm_registry:unregister_channel(Chan), true = ets:delete(?CHAN_CONN_TAB, Chan), + case ets:lookup(?CHAN_INFO_TAB, Chan) of + [{_, #{conn_state := connected}, _}] -> + decrement_connected_client_count(); + _ -> + skip + end, true = ets:delete(?CHAN_INFO_TAB, Chan), ets:delete_object(?CHAN_TAB, Chan). @@ -427,6 +438,7 @@ rpc_call(Node, Fun, Args, Timeout) -> %% @private cast(Msg) -> gen_server:cast(?CM, Msg). +call(Msg) -> gen_server:call(?CM, Msg). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -438,8 +450,14 @@ init([]) -> ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), - {ok, #{chan_pmon => emqx_pmon:new()}}. + State = #{ chan_pmon => emqx_pmon:new() + , connected_client_count => 0 + }, + {ok, State}. +handle_call({connected_client_count, get}, _From, + State = #{connected_client_count := CCCount}) -> + {reply, CCCount, State}; handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -447,7 +465,18 @@ handle_call(Req, _From, State) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), {noreply, State#{chan_pmon := PMon1}}; - +handle_cast({connected_client_count, inc}, + State0 = #{connected_client_count := CCCount0}) -> + ?tp(emqx_cm_connected_client_count_inc, #{count_before => CCCount0}), + CCCount = CCCount0 + 1, + State = State0#{connected_client_count := CCCount}, + {noreply, State}; +handle_cast({connected_client_count, dec}, + State0 = #{connected_client_count := CCCount0}) -> + ?tp(emqx_cm_connected_client_count_dec, #{count_before => CCCount0}), + CCCount = max(0, CCCount0 - 1), + State = State0#{connected_client_count := CCCount}, + {noreply, State}; handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -493,3 +522,11 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> get_chann_conn_mod(ClientId, ChanPid) -> rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). +increment_connected_client_count() -> + cast({connected_client_count, inc}). + +decrement_connected_client_count() -> + cast({connected_client_count, dec}). + +get_connected_client_count() -> + call({connected_client_count, get}). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ab91c02b4..84086015d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -518,7 +518,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) end, ?tp(info, terminate, #{reason => Reason}), - maybe_raise_excption(Reason). + maybe_raise_exception(Reason). %% close socket, discard new state, always return ok. close_socket_ok(State) -> @@ -526,12 +526,12 @@ close_socket_ok(State) -> ok. %% tell truth about the original exception -maybe_raise_excption(#{exception := Exception, +maybe_raise_exception(#{exception := Exception, context := Context, stacktrace := Stacktrace }) -> erlang:raise(Exception, Context, Stacktrace); -maybe_raise_excption(Reason) -> +maybe_raise_exception(Reason) -> exit(Reason). %%-------------------------------------------------------------------- diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 4cef68660..ff1a30fe2 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -277,6 +278,68 @@ t_stats_fun({'end', _Config}) -> ok = emqx_broker:unsubscribe(<<"topic">>), ok = emqx_broker:unsubscribe(<<"topic2">>). +%% persistent sessions, when gone, do not contribute to connect client +%% count +t_connected_client_count_persistent({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_persistent(Config) when is_list(Config) -> + ?assertEqual(0, emqx_cm:get_connected_client_count()), + {ok, ConnPid0} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid0) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + {ok, _} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid0) end, + [emqx_cm_connected_client_count_dec] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% reconnecting + {ok, ConnPid1} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% taking over + {ok, ConnPid2} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid2) end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_connected_client_count_inc + ] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + [ChanPid] = emqx_cm:all_channels(), + {true, _} = wait_for_events( + fun() -> exit(ChanPid, kill) end, + [emqx_cm_connected_client_count_dec] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_persistent({'end', _Config}) -> + snabbkaffe:stop(), + ok. + +wait_for_events(Action, Kinds) -> + Predicate = fun(#{?snk_kind := K}) -> + lists:member(K, Kinds) + end, + N = length(Kinds), + Timeout = 100, + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), + Res = Action(), + case snabbkaffe_collector:receive_events(Sub) of + {timeout, []} -> + {Res, timeout}; + {ok, Events} -> + {Res, {ok, Events}} + end. + recv_msgs(Count) -> recv_msgs(Count, []).