From 44ea85305911650761215b325261862e44bb8031 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 12 Jan 2022 18:09:28 +0800 Subject: [PATCH] test(gw): more coverage for emqx_gateway_cm --- apps/emqx_gateway/src/emqx_gateway_cm.erl | 28 +- .../src/emqx_gateway_cm_registry.erl | 18 +- .../test/emqx_gateway_cm_SUITE.erl | 248 ++++++++++++++++++ .../test/emqx_sn_protocol_SUITE.erl | 4 + 4 files changed, 276 insertions(+), 22 deletions(-) create mode 100644 apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index ee1ce19ef..647d3a6a7 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc The Gateway Connection-Manager +%% @doc The Gateway Channel Manager %% %% For a certain type of protocol, this is a single instance of the manager. %% It means that no matter how many instances of the stomp gateway are created, @@ -26,7 +26,6 @@ -include("include/emqx_gateway.hrl"). -include_lib("emqx/include/logger.hrl"). - %% APIs -export([start_link/1]). @@ -74,6 +73,8 @@ -type option() :: {gwname, gateway_name()}. -type options() :: list(option()). +-define(T_KICK, 5000). +-define(T_GET_INFO, 5000). -define(T_TAKEOVER, 15000). -define(DEFAULT_BATCH_SIZE, 10000). @@ -94,9 +95,9 @@ procname(GwName) -> ConnTab :: atom(), ChannInfoTab :: atom()}. cmtabs(GwName) -> - { tabname(chan, GwName) %% Client Tabname; Record: {ClientId, Pid} - , tabname(conn, GwName) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod} - , tabname(info, GwName) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats} + { tabname(chan, GwName) %% Record: {ClientId, Pid} + , tabname(conn, GwName) %% Recrod: {{ClientId, Pid}, ConnMod} + , tabname(info, GwName) %% Record: {{ClientId, Pid}, Info, Stats} }. tabname(chan, GwName) -> @@ -134,7 +135,6 @@ unregister_channel(GwName, ClientId) when is_binary(ClientId) -> insert_channel_info(GwName, ClientId, Info, Stats) -> Chan = {ClientId, self()}, true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}), - %%?tp(debug, insert_channel_info, #{client_id => ClientId}), ok. %% @doc Get info of a channel. @@ -207,7 +207,8 @@ set_chan_stats(GwName, ClientId, Stats) -> emqx_types:clientid(), pid(), emqx_types:stats()) -> boolean(). -set_chan_stats(GwName, ClientId, ChanPid, Stats) when node(ChanPid) == node() -> +set_chan_stats(GwName, ClientId, ChanPid, Stats) + when node(ChanPid) == node() -> Chan = {ClientId, self()}, try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) catch @@ -232,7 +233,7 @@ connection_closed(GwName, ClientId) -> -> {ok, #{session := Session, present := boolean(), pendings => list() - }} + }} | {error, any()}. open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> @@ -256,7 +257,7 @@ open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, open_session(_Type, false = _CleanStart, _ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) -> - %% TODO: + %% TODO: The session takeover logic will be implemented on 0.9? {error, not_supported_now}. %% @private @@ -305,17 +306,12 @@ do_discard_session(GwName, ClientId, Pid) -> discard_session(GwName, ClientId, Pid) catch _ : noproc -> % emqx_ws_connection: call - %?tp(debug, "session_already_gone", #{pid => Pid}), ok; _ : {noproc, _} -> % emqx_connection: gen_server:call - %?tp(debug, "session_already_gone", #{pid => Pid}), ok; _ : {{shutdown, _}, _} -> - %?tp(debug, "session_already_shutdown", #{pid => Pid}), ok; _ : _Error : _St -> - %?tp(error, "failed_to_discard_session", - % #{pid => Pid, reason => Error, stacktrace=>St}) ok end. @@ -464,7 +460,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{registry = Registry, locker = Locker}) -> + _ = gen_server:stop(Registry), + _ = ekka_locker:stop(Locker), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 108db3216..845ad7b7e 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -64,7 +64,8 @@ tabname(Name) -> register_channel(Name, ClientId) when is_binary(ClientId) -> register_channel(Name, {ClientId, self()}); -register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> +register_channel(Name, {ClientId, ChanPid}) + when is_binary(ClientId), is_pid(ChanPid) -> mria:dirty_write(tabname(Name), record(ClientId, ChanPid)). %% @doc Unregister a global channel. @@ -72,13 +73,15 @@ register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(Cha unregister_channel(Name, ClientId) when is_binary(ClientId) -> unregister_channel(Name, {ClientId, self()}); -unregister_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> +unregister_channel(Name, {ClientId, ChanPid}) + when is_binary(ClientId), is_pid(ChanPid) -> mria:dirty_delete_object(tabname(Name), record(ClientId, ChanPid)). %% @doc Lookup the global channels. -spec lookup_channels(gateway_name(), binary()) -> list(pid()). lookup_channels(Name, ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)]. + [ChanPid + || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)]. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. @@ -111,10 +114,11 @@ handle_cast(Msg, State) -> handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) -> Tab = tabname(Name), - global:trans({?LOCK, self()}, - fun() -> - mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) - end), + global:trans( + {?LOCK, self()}, + fun() -> + mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) + end), {noreply, State}; handle_info({membership, _Event}, State) -> diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl new file mode 100644 index 000000000..82d97a166 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl @@ -0,0 +1,248 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_cm_SUITE). + +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(GWNAME, mqttsn). +-define(CLIENTID, <<"client1">>). + +-define(CONF_DEFAULT, <<"gateway {}">>). + +%%-------------------------------------------------------------------- +%% setups +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + emqx_config:erase(gateway), + emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_common_test_helpers:start_apps([]), + + ok = meck:new(emqx_gateway_metrics, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_gateway_metrics, inc, fun(_, _) -> ok end), + Conf. + +end_per_suite(_Conf) -> + meck:unload(emqx_gateway_metrics), + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(_TestCase, Conf) -> + process_flag(trap_exit, true), + {ok, CMPid} = emqx_gateway_cm:start_link([{gwname, ?GWNAME}]), + [{cm, CMPid} | Conf]. + +end_per_testcase(_TestCase, Conf) -> + CMPid = proplists:get_value(cm, Conf), + gen_server:stop(CMPid), + process_flag(trap_exit, false), + Conf. + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_open_session(_) -> + {error, not_supported_now} = emqx_gateway_cm:open_session( + ?GWNAME, false, clientinfo(), conninfo(), + fun(_, _) -> #{} end), + + {ok, SessionRes} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + ?assertEqual(#{present => false, + session => #{no => 1}}, SessionRes), + + %% assert1. check channel infos in ets table + Chann = {?CLIENTID, self()}, + ?assertEqual( + [Chann], + ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))), + ?assertEqual( + [{Chann, ?MODULE}], + ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))), + + %% assert2. discard the presented session + + {ok, SessionRes2} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 2} end), + ?assertEqual(#{present => false, + session => #{no => 2}}, SessionRes2), + + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + ?assertEqual( + 1, + ets:info(emqx_gateway_cm:tabname(info, ?GWNAME), size)), + + receive + discard -> + emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID), + emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID) + after 100 -> + ?assert(false, "waiting discard msg timeout") + end, + + %% assert3. no channel infos in ets table + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))). + +t_get_set_chan_info_stats(_) -> + {ok, SessionRes} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + ?assertEqual(#{present => false, + session => #{no => 1}}, SessionRes), + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + + %% Info: get/set + NInfo = #{newinfo => true}, + emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo), + ?assertEqual( + NInfo, + emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID)), + ?assertEqual( + NInfo, + emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID, self())), + %% Stats: get/set + NStats = [{newstats, true}], + emqx_gateway_cm:set_chan_stats(?GWNAME, ?CLIENTID, NStats), + ?assertEqual( + NStats, + emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID)), + ?assertEqual( + NStats, + emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID, self())), + + emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID), + emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID). + +t_handle_process_down(Conf) -> + Pid = proplists:get_value(cm, Conf), + + {ok, SessionRes} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + ?assertEqual(#{present => false, + session => #{no => 1}}, SessionRes), + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + + _ = Pid ! {'DOWN', mref, process, self(), normal}, + + timer:sleep(200), %% wait the asycn clear task + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))). + +t_kick_session(_) -> + %% session1 + {ok, _} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + + %% meck `lookup_channels` + Self = self(), + ok = meck:new(emqx_gateway_cm_registry, + [passthrough, no_history, no_link]), + ok = meck:expect(emqx_gateway_cm_registry, lookup_channels, + fun(_, ?CLIENTID) -> [Self, Self] end), + + ok = emqx_gateway_cm:kick_session(?GWNAME, ?CLIENTID), + + receive discard -> ok + after 100 -> ?assert(false, "waiting discard msg timeout") + end, + receive + kick -> + emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID), + emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID) + after + 100 -> + ?assert(false, "waiting kick msg timeout") + end, + meck:unload(emqx_gateway_cm_registry). + +t_unexpected_handle(Conf) -> + Pid = proplists:get_value(cm, Conf), + _ = Pid ! unexpected_info, + ok = gen_server:call(Pid, unexpected_call), + ok = gen_server:cast(Pid, unexpected_cast). + +%%-------------------------------------------------------------------- +%% helpers + +clientinfo() -> + #{ clientid => ?CLIENTID + , is_bridge => false + , is_superuser => false + , listener => 'mqttsn:udp:default' + , mountpoint => <<"mqttsn/">> + , peerhost => {127, 0, 0, 1} + , protocol => 'mqtt-sn' + , sockport => 1884 + , username => undefined + , zone => default + }. + +conninfo() -> + #{ clean_start => true + , clientid => ?CLIENTID + , conn_mod => ?MODULE + , connected_at => 1641805544652 + , expiry_interval => 0 + , keepalive => 10 + , peercert => nossl + , peername => {{127, 0, 0, 1}, 64810} + , proto_name => <<"MQTT-SN">> + , proto_ver => <<"1.2">> + , sockname => {{0, 0, 0, 0}, 1884} + , socktype => udp + }. + +%%-------------------------------------------------------------------- +%% connection module mock + +call(ConnPid, discard, _) -> + ConnPid ! discard, ok; +call(ConnPid, kick, _) -> + ConnPid ! kick, ok. diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index d3cae8596..114a5ceed 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1766,6 +1766,10 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_socket_passvice(_) -> + %% TODO: test this gateway enter the passvie event + ok. + t_clients_api(_) -> TsNow = emqx_gateway_utils:unix_ts_to_rfc3339( erlang:system_time(millisecond)),