test(gw): more coverage for emqx_gateway_cm

This commit is contained in:
JianBo He 2022-01-12 18:09:28 +08:00
parent 4ce11fec6e
commit 44ea853059
4 changed files with 276 additions and 22 deletions

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The Gateway Connection-Manager
%% @doc The Gateway Channel Manager
%%
%% For a certain type of protocol, this is a single instance of the manager.
%% It means that no matter how many instances of the stomp gateway are created,
@ -26,7 +26,6 @@
-include("include/emqx_gateway.hrl").
-include_lib("emqx/include/logger.hrl").
%% APIs
-export([start_link/1]).
@ -74,6 +73,8 @@
-type option() :: {gwname, gateway_name()}.
-type options() :: list(option()).
-define(T_KICK, 5000).
-define(T_GET_INFO, 5000).
-define(T_TAKEOVER, 15000).
-define(DEFAULT_BATCH_SIZE, 10000).
@ -94,9 +95,9 @@ procname(GwName) ->
ConnTab :: atom(),
ChannInfoTab :: atom()}.
cmtabs(GwName) ->
{ tabname(chan, GwName) %% Client Tabname; Record: {ClientId, Pid}
, tabname(conn, GwName) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod}
, tabname(info, GwName) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats}
{ tabname(chan, GwName) %% Record: {ClientId, Pid}
, tabname(conn, GwName) %% Recrod: {{ClientId, Pid}, ConnMod}
, tabname(info, GwName) %% Record: {{ClientId, Pid}, Info, Stats}
}.
tabname(chan, GwName) ->
@ -134,7 +135,6 @@ unregister_channel(GwName, ClientId) when is_binary(ClientId) ->
insert_channel_info(GwName, ClientId, Info, Stats) ->
Chan = {ClientId, self()},
true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}),
%%?tp(debug, insert_channel_info, #{client_id => ClientId}),
ok.
%% @doc Get info of a channel.
@ -207,7 +207,8 @@ set_chan_stats(GwName, ClientId, Stats) ->
emqx_types:clientid(),
pid(),
emqx_types:stats()) -> boolean().
set_chan_stats(GwName, ClientId, ChanPid, Stats) when node(ChanPid) == node() ->
set_chan_stats(GwName, ClientId, ChanPid, Stats)
when node(ChanPid) == node() ->
Chan = {ClientId, self()},
try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
catch
@ -256,7 +257,7 @@ open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun,
open_session(_Type, false = _CleanStart,
_ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) ->
%% TODO:
%% TODO: The session takeover logic will be implemented on 0.9?
{error, not_supported_now}.
%% @private
@ -305,17 +306,12 @@ do_discard_session(GwName, ClientId, Pid) ->
discard_session(GwName, ClientId, Pid)
catch
_ : noproc -> % emqx_ws_connection: call
%?tp(debug, "session_already_gone", #{pid => Pid}),
ok;
_ : {noproc, _} -> % emqx_connection: gen_server:call
%?tp(debug, "session_already_gone", #{pid => Pid}),
ok;
_ : {{shutdown, _}, _} ->
%?tp(debug, "session_already_shutdown", #{pid => Pid}),
ok;
_ : _Error : _St ->
%?tp(error, "failed_to_discard_session",
% #{pid => Pid, reason => Error, stacktrace=>St})
ok
end.
@ -464,7 +460,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
terminate(_Reason, #state{registry = Registry, locker = Locker}) ->
_ = gen_server:stop(Registry),
_ = ekka_locker:stop(Locker),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -64,7 +64,8 @@ tabname(Name) ->
register_channel(Name, ClientId) when is_binary(ClientId) ->
register_channel(Name, {ClientId, self()});
register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
register_channel(Name, {ClientId, ChanPid})
when is_binary(ClientId), is_pid(ChanPid) ->
mria:dirty_write(tabname(Name), record(ClientId, ChanPid)).
%% @doc Unregister a global channel.
@ -72,13 +73,15 @@ register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(Cha
unregister_channel(Name, ClientId) when is_binary(ClientId) ->
unregister_channel(Name, {ClientId, self()});
unregister_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
unregister_channel(Name, {ClientId, ChanPid})
when is_binary(ClientId), is_pid(ChanPid) ->
mria:dirty_delete_object(tabname(Name), record(ClientId, ChanPid)).
%% @doc Lookup the global channels.
-spec lookup_channels(gateway_name(), binary()) -> list(pid()).
lookup_channels(Name, ClientId) ->
[ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)].
[ChanPid
|| #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)].
record(ClientId, ChanPid) ->
#channel{chid = ClientId, pid = ChanPid}.
@ -111,7 +114,8 @@ handle_cast(Msg, State) ->
handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) ->
Tab = tabname(Name),
global:trans({?LOCK, self()},
global:trans(
{?LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab])
end),

View File

@ -0,0 +1,248 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_cm_SUITE).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compile(nowarn_export_all).
-define(GWNAME, mqttsn).
-define(CLIENTID, <<"client1">>).
-define(CONF_DEFAULT, <<"gateway {}">>).
%%--------------------------------------------------------------------
%% setups
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:erase(gateway),
emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([]),
ok = meck:new(emqx_gateway_metrics, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_gateway_metrics, inc, fun(_, _) -> ok end),
Conf.
end_per_suite(_Conf) ->
meck:unload(emqx_gateway_metrics),
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(_TestCase, Conf) ->
process_flag(trap_exit, true),
{ok, CMPid} = emqx_gateway_cm:start_link([{gwname, ?GWNAME}]),
[{cm, CMPid} | Conf].
end_per_testcase(_TestCase, Conf) ->
CMPid = proplists:get_value(cm, Conf),
gen_server:stop(CMPid),
process_flag(trap_exit, false),
Conf.
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_open_session(_) ->
{error, not_supported_now} = emqx_gateway_cm:open_session(
?GWNAME, false, clientinfo(), conninfo(),
fun(_, _) -> #{} end),
{ok, SessionRes} = emqx_gateway_cm:open_session(
?GWNAME, true, clientinfo(), conninfo(),
fun(_, _) -> #{no => 1} end),
?assertEqual(#{present => false,
session => #{no => 1}}, SessionRes),
%% assert1. check channel infos in ets table
Chann = {?CLIENTID, self()},
?assertEqual(
[Chann],
ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
?assertEqual(
[{Chann, ?MODULE}],
ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
%% assert2. discard the presented session
{ok, SessionRes2} = emqx_gateway_cm:open_session(
?GWNAME, true, clientinfo(), conninfo(),
fun(_, _) -> #{no => 2} end),
?assertEqual(#{present => false,
session => #{no => 2}}, SessionRes2),
emqx_gateway_cm:insert_channel_info(
?GWNAME, ?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()}, []),
?assertEqual(
1,
ets:info(emqx_gateway_cm:tabname(info, ?GWNAME), size)),
receive
discard ->
emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
after 100 ->
?assert(false, "waiting discard msg timeout")
end,
%% assert3. no channel infos in ets table
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))).
t_get_set_chan_info_stats(_) ->
{ok, SessionRes} = emqx_gateway_cm:open_session(
?GWNAME, true, clientinfo(), conninfo(),
fun(_, _) -> #{no => 1} end),
?assertEqual(#{present => false,
session => #{no => 1}}, SessionRes),
emqx_gateway_cm:insert_channel_info(
?GWNAME, ?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()}, []),
%% Info: get/set
NInfo = #{newinfo => true},
emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
?assertEqual(
NInfo,
emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID)),
?assertEqual(
NInfo,
emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID, self())),
%% Stats: get/set
NStats = [{newstats, true}],
emqx_gateway_cm:set_chan_stats(?GWNAME, ?CLIENTID, NStats),
?assertEqual(
NStats,
emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID)),
?assertEqual(
NStats,
emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID, self())),
emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID).
t_handle_process_down(Conf) ->
Pid = proplists:get_value(cm, Conf),
{ok, SessionRes} = emqx_gateway_cm:open_session(
?GWNAME, true, clientinfo(), conninfo(),
fun(_, _) -> #{no => 1} end),
?assertEqual(#{present => false,
session => #{no => 1}}, SessionRes),
emqx_gateway_cm:insert_channel_info(
?GWNAME, ?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()}, []),
_ = Pid ! {'DOWN', mref, process, self(), normal},
timer:sleep(200), %% wait the asycn clear task
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))).
t_kick_session(_) ->
%% session1
{ok, _} = emqx_gateway_cm:open_session(
?GWNAME, true, clientinfo(), conninfo(),
fun(_, _) -> #{no => 1} end),
emqx_gateway_cm:insert_channel_info(
?GWNAME, ?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()}, []),
%% meck `lookup_channels`
Self = self(),
ok = meck:new(emqx_gateway_cm_registry,
[passthrough, no_history, no_link]),
ok = meck:expect(emqx_gateway_cm_registry, lookup_channels,
fun(_, ?CLIENTID) -> [Self, Self] end),
ok = emqx_gateway_cm:kick_session(?GWNAME, ?CLIENTID),
receive discard -> ok
after 100 -> ?assert(false, "waiting discard msg timeout")
end,
receive
kick ->
emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
after
100 ->
?assert(false, "waiting kick msg timeout")
end,
meck:unload(emqx_gateway_cm_registry).
t_unexpected_handle(Conf) ->
Pid = proplists:get_value(cm, Conf),
_ = Pid ! unexpected_info,
ok = gen_server:call(Pid, unexpected_call),
ok = gen_server:cast(Pid, unexpected_cast).
%%--------------------------------------------------------------------
%% helpers
clientinfo() ->
#{ clientid => ?CLIENTID
, is_bridge => false
, is_superuser => false
, listener => 'mqttsn:udp:default'
, mountpoint => <<"mqttsn/">>
, peerhost => {127, 0, 0, 1}
, protocol => 'mqtt-sn'
, sockport => 1884
, username => undefined
, zone => default
}.
conninfo() ->
#{ clean_start => true
, clientid => ?CLIENTID
, conn_mod => ?MODULE
, connected_at => 1641805544652
, expiry_interval => 0
, keepalive => 10
, peercert => nossl
, peername => {{127, 0, 0, 1}, 64810}
, proto_name => <<"MQTT-SN">>
, proto_ver => <<"1.2">>
, sockname => {{0, 0, 0, 0}, 1884}
, socktype => udp
}.
%%--------------------------------------------------------------------
%% connection module mock
call(ConnPid, discard, _) ->
ConnPid ! discard, ok;
call(ConnPid, kick, _) ->
ConnPid ! kick, ok.

View File

@ -1766,6 +1766,10 @@ t_broadcast_test1(_) ->
timer:sleep(600),
gen_udp:close(Socket).
t_socket_passvice(_) ->
%% TODO: test this gateway enter the passvie event
ok.
t_clients_api(_) ->
TsNow = emqx_gateway_utils:unix_ts_to_rfc3339(
erlang:system_time(millisecond)),