From 8fe342a02dd1531b713377abca25b8e92d833cbe Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 8 Nov 2021 15:46:49 -0300 Subject: [PATCH 1/2] feat(stats): track live channel / client count Track connected client count In order to correctly display the number of _connected_ clients in our monitor dashboard, we need to track those connections that are actually connected to clients, not considering connections from persistent sessions that are disconnected. Today, the `connections.count` that is displayed in the dashboards considers those disconnected persistent sessions as well. The new statistics can be found in the [`emqx_management`](https://github.com/emqx/emqx/tree/main-v4.4/apps/emqx_management) plugin, under `/api/v4/stats`, in the keys `live_connections.{max,count}`. --- apps/emqx/rebar.config | 2 +- apps/emqx/src/emqx_channel.erl | 4 + apps/emqx/src/emqx_cm.erl | 37 ++- apps/emqx/src/emqx_connection.erl | 6 +- apps/emqx/src/emqx_stats.erl | 12 +- apps/emqx/test/emqx_broker_SUITE.erl | 287 +++++++++++++++++++- apps/emqx/test/emqx_channel_SUITE.erl | 2 + apps/emqx/test/emqx_connection_SUITE.erl | 18 +- apps/emqx/test/emqx_ws_connection_SUITE.erl | 12 +- rebar.config | 2 +- 10 files changed, 354 insertions(+), 28 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index c27481a66..59c5cf045 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -20,7 +20,7 @@ , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} ]}. {plugins, [{rebar3_proper, "0.12.1"}]}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 7a5edbb48..af3ac3b2c 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1612,6 +1612,8 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), + ChanPid = self(), + emqx_cm:mark_channel_connected(ChanPid), Channel#channel{conninfo = NConnInfo, conn_state = connected }. @@ -1697,6 +1699,8 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), + ChanPid = self(), + emqx_cm:mark_channel_disconnected(ChanPid), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 890dce60a..d4704c43a 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -73,7 +73,11 @@ ]). %% Internal export --export([stats_fun/0]). +-export([ stats_fun/0 + , mark_channel_connected/1 + , mark_channel_disconnected/1 + , get_connected_client_count/0 + ]). -type(chan_pid() :: pid()). @@ -81,11 +85,13 @@ -define(CHAN_TAB, emqx_channel). -define(CHAN_CONN_TAB, emqx_channel_conn). -define(CHAN_INFO_TAB, emqx_channel_info). +-define(CHAN_LIVE_TAB, emqx_channel_live). -define(CHAN_STATS, [{?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, - {?CHAN_CONN_TAB, 'connections.count', 'connections.max'} + {?CHAN_CONN_TAB, 'connections.count', 'connections.max'}, + {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'} ]). %% Batch drain @@ -446,8 +452,10 @@ init([]) -> ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), + ok = emqx_tables:new(?CHAN_LIVE_TAB, [set, {write_concurrency, true} | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), - {ok, #{chan_pmon => emqx_pmon:new()}}. + State = #{chan_pmon => emqx_pmon:new()}, + {ok, State}. handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -456,17 +464,21 @@ handle_call(Req, _From, State) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), {noreply, State#{chan_pmon := PMon1}}; - handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> + ?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}), ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), + lists:foreach( + fun({ChanPid, _ClientID}) -> + mark_channel_disconnected(ChanPid) + end, + Items), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), @@ -503,3 +515,18 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> get_chann_conn_mod(ClientId, ChanPid) -> rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). +mark_channel_connected(ChanPid) -> + ?tp(emqx_cm_connected_client_count_inc, #{}), + ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}), + ok. + +mark_channel_disconnected(ChanPid) -> + ?tp(emqx_cm_connected_client_count_dec, #{}), + ets:delete(?CHAN_LIVE_TAB, ChanPid), + ok. + +get_connected_client_count() -> + case ets:info(?CHAN_LIVE_TAB, size) of + undefined -> 0; + Size -> Size + end. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index b5c3bb2ac..52caf121b 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -540,7 +540,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) end, ?tp(info, terminate, #{reason => Reason}), - maybe_raise_excption(Reason). + maybe_raise_exception(Reason). %% close socket, discard new state, always return ok. close_socket_ok(State) -> @@ -548,12 +548,12 @@ close_socket_ok(State) -> ok. %% tell truth about the original exception -maybe_raise_excption(#{exception := Exception, +maybe_raise_exception(#{exception := Exception, context := Context, stacktrace := Stacktrace }) -> erlang:raise(Exception, Context, Stacktrace); -maybe_raise_excption(Reason) -> +maybe_raise_exception(Reason) -> exit(Reason). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 0d2b1a1fd..87f50f92f 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% APIs @@ -66,8 +67,10 @@ %% Connection stats -define(CONNECTION_STATS, - ['connections.count', %% Count of Concurrent Connections - 'connections.max' %% Maximum Number of Concurrent Connections + [ 'connections.count' %% Count of Concurrent Connections + , 'connections.max' %% Maximum Number of Concurrent Connections + , 'live_connections.count' %% Count of connected clients + , 'live_connections.max' %% Maximum number of connected clients ]). %% Channel stats @@ -215,6 +218,11 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) -> ets:insert(?TAB, {MaxStat, Val}) end, safe_update_element(Stat, Val), + ?tp(emqx_stats_setstat, + #{ count_stat => Stat + , max_stat => MaxStat + , value => Val + }), {noreply, State}; handle_cast({update_interval, Update = #update{name = Name}}, diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index b6a99b8bc..7ee3b729a 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -23,20 +23,71 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ {group, all_cases} + , {group, connected_client_count_group} + ]. -init_per_suite(Config) -> +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + ConnClientTCs = [ t_connected_client_count_persistent + , t_connected_client_count_anonymous + , t_connected_client_stats + ], + OtherTCs = TCs -- ConnClientTCs, + [ {all_cases, [], OtherTCs} + , {connected_client_count_group, [ {group, tcp} + , {group, ws} + , {group, quic} + ]} + , {tcp, [], ConnClientTCs} + , {ws, [], ConnClientTCs} + , {quic, [], ConnClientTCs} + ]. + +init_per_group(connected_client_count_group, Config) -> + Config; +init_per_group(tcp, Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + [{conn_fun, connect} | Config]; +init_per_group(ws, Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + [ {ssl, false} + , {enable_websocket, true} + , {conn_fun, ws_connect} + , {port, 8083} + , {host, "localhost"} + | Config + ]; +init_per_group(quic, Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + [ {conn_fun, quic_connect} + , {port, 14567} + | Config]; +init_per_group(_Group, Config) -> emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), Config. -end_per_suite(_Config) -> +end_per_group(connected_client_count_group, _Config) -> + ok; +end_per_group(_Group, _Config) -> emqx_common_test_helpers:stop_apps([]). +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + init_per_testcase(Case, Config) -> ?MODULE:Case({init, Config}). @@ -277,6 +328,236 @@ t_shard({'end', _Config}) -> emqx_broker:unsubscribe(<<"topic">>), ok = meck:unload(emqx_broker_helper). +%% persistent sessions, when gone, do not contribute to connected +%% client count +t_connected_client_count_persistent({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_persistent(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), + ClientID = <<"clientid">>, + ?assertEqual(0, emqx_cm:get_connected_client_count()), + {ok, ConnPid0} = emqtt:start_link([ {clean_start, false} + , {clientid, ClientID} + | Config]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid0) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + {ok, {ok, [_]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid0) end, + [emqx_cm_connected_client_count_dec] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% reconnecting + {ok, ConnPid1} = emqtt:start_link([ {clean_start, false} + , {clientid, ClientID} + | Config + ]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% taking over + {ok, ConnPid2} = emqtt:start_link([ {clean_start, false} + , {clientid, ClientID} + | Config + ]), + {{ok, _}, {ok, [_, _]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid2) end, + [ emqx_cm_connected_client_count_inc + , emqx_cm_connected_client_count_dec + ], + 500 + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + ChanPids = emqx_cm:all_channels(), + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> + lists:foreach( + fun(ChanPid) -> exit(ChanPid, kill) end, + ChanPids) + end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_persistent({'end', _Config}) -> + snabbkaffe:stop(), + ok. + +%% connections without client_id also contribute to connected client +%% count +t_connected_client_count_anonymous({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_anonymous(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% first client + {ok, ConnPid0} = emqtt:start_link([ {clean_start, true} + | Config]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid0) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% second client + {ok, ConnPid1} = emqtt:start_link([ {clean_start, true} + | Config]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(2, emqx_cm:get_connected_client_count()), + %% when first client disconnects, shouldn't affect the second + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid0) end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% reconnecting + {ok, ConnPid2} = emqtt:start_link([ {clean_start, true} + | Config + ]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid2) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(2, emqx_cm:get_connected_client_count()), + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid1) end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + Chans = emqx_cm:all_channels(), + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> + lists:foreach( + fun(ChanPid) -> exit(ChanPid, kill) end, + Chans) + end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_anonymous({'end', _Config}) -> + snabbkaffe:stop(), + ok. + +t_connected_client_stats({init, Config}) -> + ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), + {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), + ok = snabbkaffe:start_trace(), + Config; +t_connected_client_stats(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ?assertEqual(0, emqx_stats:getstat('live_connections.count')), + ?assertEqual(0, emqx_stats:getstat('live_connections.max')), + {ok, ConnPid} = emqtt:start_link([ {clean_start, true} + , {clientid, <<"clientid">>} + | Config + ]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid) end, + [emqx_cm_connected_client_count_inc] + ), + %% ensure stats are synchronized + {_, {ok, [_]}} = wait_for_stats( + fun emqx_cm:stats_fun/0, + [#{count_stat => 'live_connections.count', + max_stat => 'live_connections.max'}] + ), + ?assertEqual(1, emqx_stats:getstat('live_connections.count')), + ?assertEqual(1, emqx_stats:getstat('live_connections.max')), + {ok, {ok, [_]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid) end, + [emqx_cm_connected_client_count_dec] + ), + %% ensure stats are synchronized + {_, {ok, [_]}} = wait_for_stats( + fun emqx_cm:stats_fun/0, + [#{count_stat => 'live_connections.count', + max_stat => 'live_connections.max'}] + ), + ?assertEqual(0, emqx_stats:getstat('live_connections.count')), + ?assertEqual(1, emqx_stats:getstat('live_connections.max')), + ok; +t_connected_client_stats({'end', _Config}) -> + ok = snabbkaffe:stop(), + ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), + {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), + ok. + +%% the count must be always non negative +t_connect_client_never_negative({init, Config}) -> + Config; +t_connect_client_never_negative(Config) when is_list(Config) -> + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% would go to -1 + ChanPid = list_to_pid("<0.0.1>"), + emqx_cm:mark_channel_disconnected(ChanPid), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% would be 0, if really went to -1 + emqx_cm:mark_channel_connected(ChanPid), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + ok; +t_connect_client_never_negative({'end', _Config}) -> + ok. + +wait_for_events(Action, Kinds) -> + wait_for_events(Action, Kinds, 500). + +wait_for_events(Action, Kinds, Timeout) -> + Predicate = fun(#{?snk_kind := K}) -> + lists:member(K, Kinds) + end, + N = length(Kinds), + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), + Res = Action(), + case snabbkaffe_collector:receive_events(Sub) of + {timeout, _} -> + {Res, timeout}; + {ok, Events} -> + {Res, {ok, Events}} + end. + +wait_for_stats(Action, Stats) -> + Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) -> + Stat = maps:with( + [ count_stat + , max_stat + ], Event), + lists:member(Stat, Stats); + (_) -> + false + end, + N = length(Stats), + Timeout = 500, + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), + Res = Action(), + case snabbkaffe_collector:receive_events(Sub) of + {timeout, _} -> + {Res, timeout}; + {ok, Events} -> + {Res, {ok, Events}} + end. + recv_msgs(Count) -> recv_msgs(Count, []). diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 10e7db9cf..911341440 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -144,6 +144,8 @@ set_test_listenser_confs() -> init_per_suite(Config) -> %% CM Meck ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), + ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end), %% Access Control Meck ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), ok = meck:expect(emqx_access_control, authenticate, diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 987b39d77..d006a1a10 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -36,6 +36,8 @@ init_per_suite(Config) -> ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), + ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end), %% Meck Limiter ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), %% Meck Pd @@ -113,7 +115,7 @@ t_ws_pingreq_before_connected(_) -> t_info(_) -> CPid = spawn(fun() -> - receive + receive {'$gen_call', From, info} -> gen_server:reply(From, emqx_connection:info(st())) after @@ -132,7 +134,7 @@ t_info_limiter(_) -> t_stats(_) -> CPid = spawn(fun() -> - receive + receive {'$gen_call', From, stats} -> gen_server:reply(From, emqx_connection:stats(st())) after @@ -147,10 +149,10 @@ t_stats(_) -> {send_pend,0}| _] , Stats). t_process_msg(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, - fun(_Packet, Channel) -> - {ok, Channel} + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, + fun(_Packet, Channel) -> + {ok, Channel} end), CPid ! {incoming, ?PACKET(?PINGREQ)}, CPid ! {incoming, undefined}, @@ -320,7 +322,7 @@ t_with_channel(_) -> t_handle_outgoing(_) -> ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). - + t_handle_info(_) -> ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())), @@ -347,7 +349,7 @@ t_activate_socket(_) -> State = st(), {ok, NStats} = emqx_connection:activate_socket(State), ?assertEqual(running, emqx_connection:info(sockstate, NStats)), - + State1 = st(#{sockstate => blocked}), ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 116605a96..d554d3c8c 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -48,7 +48,10 @@ init_per_testcase(TestCase, Config) when TestCase =/= t_ws_pingreq_before_connected, TestCase =/= t_ws_non_check_origin -> - emqx_channel_SUITE:set_test_listenser_confs(), + %% Meck Cm + ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), + ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end), %% Mock cowboy_req ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end), @@ -90,7 +93,8 @@ end_per_testcase(TestCase, _Config) when TestCase =/= t_ws_pingreq_before_connected -> lists:foreach(fun meck:unload/1, - [cowboy_req, + [emqx_cm, + cowboy_req, emqx_access_control, emqx_broker, emqx_hooks, @@ -363,14 +367,12 @@ t_handle_info_close(_) -> {[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()). t_handle_info_event(_) -> - ok = meck:new(emqx_cm, [passthrough, no_history]), ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end), {ok, _} = ?ws_conn:handle_info({event, connected}, st()), {ok, _} = ?ws_conn:handle_info({event, disconnected}, st()), - {ok, _} = ?ws_conn:handle_info({event, updated}, st()), - ok = meck:unload(emqx_cm). + {ok, _} = ?ws_conn:handle_info({event, updated}, st()). t_handle_timeout_idle_timeout(_) -> TRef = make_ref(), diff --git a/rebar.config b/rebar.config index 6ae9523ff..ec7020937 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {getopt, "1.0.2"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} From 60d5017eea3e13311c0707484755392e7c48fbc8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 8 Nov 2021 17:50:58 -0300 Subject: [PATCH 2/2] style(elvis): fix elvis style complaints --- apps/emqx/src/emqx_channel.erl | 43 +++++++++++++++++++------------ apps/emqx/src/emqx_cm.erl | 24 +++++++++++++---- apps/emqx/src/emqx_connection.erl | 10 +++---- apps/emqx/src/emqx_stats.erl | 2 +- 4 files changed, 52 insertions(+), 27 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index af3ac3b2c..84281a500 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -103,7 +103,7 @@ -type(reply() :: {outgoing, emqx_types:packet()} | {outgoing, [emqx_types:packet()]} - | {event, conn_state()|updated} + | {event, conn_state() | updated} | {close, Reason :: atom()}). -type(replies() :: emqx_types:packet() | reply() | [reply()]). @@ -132,7 +132,7 @@ info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). --spec(info(list(atom())|atom(), channel()) -> term()). +-spec(info(list(atom()) | atom(), channel()) -> term()). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> @@ -328,17 +328,23 @@ handle_in(Packet = ?AUTH_PACKET(ReasonCode, _Properties), connecting -> process_connect(NProperties, ensure_connected(NChannel)); _ -> - handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel#channel{conn_state = connected}) + handle_out( auth + , {?RC_SUCCESS, NProperties} + , NChannel#channel{conn_state = connected} + ) end; {continue, NProperties, NChannel} -> - handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel#channel{conn_state = reauthenticating}); + handle_out( auth + , {?RC_CONTINUE_AUTHENTICATION, NProperties} + , NChannel#channel{conn_state = reauthenticating} + ); {error, NReasonCode} -> case ConnState of connecting -> handle_out(connack, NReasonCode, Channel); _ -> handle_out(disconnect, NReasonCode, Channel) - end + end end catch _Class:_Reason -> @@ -632,7 +638,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, ?SLOG(warning, #{ msg => "dropped_qos2_packet", reason => emqx_reason_codes:name(RC), - packetId => PacketId + packet_id => PacketId }), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(pubrec, {PacketId, RC}, Channel) @@ -655,7 +661,7 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) -> -compile({inline, [puback_reason_code/1]}). puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; -puback_reason_code([_|_]) -> ?RC_SUCCESS. +puback_reason_code([_ | _]) -> ?RC_SUCCESS. -compile({inline, [after_message_acked/3]}). after_message_acked(ClientInfo, Msg, PubAckProps) -> @@ -674,7 +680,7 @@ process_subscribe(TopicFilters, SubProps, Channel) -> process_subscribe([], _SubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_subscribe([Topic = {TopicFilter, SubOpts}|More], SubProps, Channel, Acc) -> +process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) -> case check_sub_caps(TopicFilter, SubOpts, Channel) of ok -> {ReasonCode, NChannel} = do_subscribe(TopicFilter, @@ -716,9 +722,9 @@ process_unsubscribe(TopicFilters, UnSubProps, Channel) -> process_unsubscribe([], _UnSubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) -> +process_unsubscribe([{TopicFilter, SubOpts} | More], UnSubProps, Channel, Acc) -> {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel), - process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]). + process_unsubscribe(More, UnSubProps, NChannel, [RC | Acc]). do_unsubscribe(TopicFilter, SubOpts, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, @@ -790,7 +796,9 @@ handle_deliver(Delivers, Channel = #channel{takeover = true, pendings = Pendings, session = Session, clientinfo = #{clientid := ClientId}}) -> - NPendings = lists:append(Pendings, emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), + NPendings = lists:append( + Pendings, + emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{session = Session, @@ -1365,17 +1373,20 @@ authenticate(?AUTH_PACKET(_, #{'Authentication-Method' := AuthMethod} = Properti {error, ?RC_BAD_AUTHENTICATION_METHOD} end. -do_authenticate(#{auth_method := AuthMethod} = Credential, #channel{clientinfo = ClientInfo} = Channel) -> +do_authenticate(#{auth_method := AuthMethod} = Credential, + #channel{clientinfo = ClientInfo} = Channel) -> Properties = #{'Authentication-Method' => AuthMethod}, case emqx_access_control:authenticate(Credential) of {ok, Result} -> {ok, Properties, - Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, - auth_cache = #{}}}; + Channel#channel{ + clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, + auth_cache = #{}}}; {ok, Result, AuthData} -> {ok, Properties#{'Authentication-Data' => AuthData}, - Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, - auth_cache = #{}}}; + Channel#channel{ + clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, + auth_cache = #{}}}; {continue, AuthCache} -> {continue, Properties, Channel#channel{auth_cache = AuthCache}}; {continue, AuthData, AuthCache} -> diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index d4704c43a..1abd5c151 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -102,6 +102,11 @@ -define(T_TAKEOVER, 15000). +%% linting overrides +-elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}} + , {elvis_style, god_modules, #{ignore => [emqx_cm]}} + ]). + %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). start_link() -> @@ -245,7 +250,10 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> pendings => Pendings}}; {living, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Session1 = emqx_persistent_session:persist( ClientInfo + , ConnInfo + , Session + ), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session1, @@ -254,12 +262,18 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> {expired, OldSession} -> _ = emqx_persistent_session:discard(ClientId, OldSession), Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Session1 = emqx_persistent_session:persist( ClientInfo + , ConnInfo + , Session + ), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session1, present => false}}; none -> Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Session1 = emqx_persistent_session:persist( ClientInfo + , ConnInfo + , Session + ), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session1, present => false}} end @@ -309,7 +323,7 @@ takeover_session(ClientId) -> [ChanPid] -> takeover_session(ClientId, ChanPid); ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), + [ChanPid | StalePids] = lists:reverse(ChanPids), ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) @@ -374,7 +388,7 @@ kick_session(ClientId) -> [ChanPid] -> kick_session(ClientId, ChanPid); ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), + [ChanPid | StalePids] = lists:reverse(ChanPids), ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 52caf121b..e4a617675 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -149,7 +149,7 @@ start_link(Transport, Socket, Options) -> %%-------------------------------------------------------------------- %% @doc Get infos of the connection/channel. --spec(info(pid()|state()) -> emqx_types:infos()). +-spec(info(pid() | state()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); info(State = #state{channel = Channel}) -> @@ -176,7 +176,7 @@ info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). %% @doc Get stats of the connection/channel. --spec(stats(pid()|state()) -> emqx_types:stats()). +-spec(stats(pid() | state()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); stats(#state{transport = Transport, @@ -373,7 +373,7 @@ cancel_stats_timer(State) -> State. process_msg([], State) -> {ok, State}; -process_msg([Msg|More], State) -> +process_msg([Msg | More], State) -> try case handle_msg(Msg, State) of ok -> @@ -475,7 +475,7 @@ handle_msg({Passive, _Sock}, State) handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{ listener = {Type, Listener}} = State) -> ActiveN = get_active_n(Type, Listener), - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -649,7 +649,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> {Packets, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, - parse_incoming(Rest, [Packet|Packets], NState) + parse_incoming(Rest, [Packet | Packets], NState) catch throw : ?FRAME_PARSE_ERROR(Reason) -> ?SLOG(info, #{ reason => Reason diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 87f50f92f..74411ee9c 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -233,7 +233,7 @@ handle_cast({update_interval, Update = #update{name = Name}}, name => Name }), State; - false -> State#state{updates = [Update|Updates]} + false -> State#state{updates = [Update | Updates]} end, {noreply, NState};