emqx/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl

334 lines
8.7 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_cm_SUITE).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compile(nowarn_export_all).
-define(GWNAME, mqttsn).
-define(CLIENTID, <<"client1">>).
-define(CONF_DEFAULT, <<"gateway {}">>).
%%--------------------------------------------------------------------
%% setups
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:erase(gateway),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([]),
ok = meck:new(emqx_gateway_metrics, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_gateway_metrics, inc, fun(_, _) -> ok end),
Conf.
end_per_suite(_Conf) ->
meck:unload(emqx_gateway_metrics),
emqx_common_test_helpers:stop_apps([]),
emqx_config:delete_override_conf_files().
init_per_testcase(_TestCase, Conf) ->
process_flag(trap_exit, true),
{ok, CMPid} = emqx_gateway_cm:start_link([{gwname, ?GWNAME}]),
[{cm, CMPid} | Conf].
end_per_testcase(_TestCase, Conf) ->
CMPid = proplists:get_value(cm, Conf),
gen_server:stop(CMPid),
process_flag(trap_exit, false),
Conf.
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_open_session(_) ->
{ok, #{
present := false,
session := #{}
}} = emqx_gateway_cm:open_session(
?GWNAME,
false,
clientinfo(),
conninfo(),
fun(_, _) -> #{} end
),
{ok, SessionRes} = emqx_gateway_cm:open_session(
?GWNAME,
true,
clientinfo(),
conninfo(),
fun(_, _) -> #{no => 1} end
),
?assertEqual(
#{
present => false,
session => #{no => 1}
},
SessionRes
),
%% assert1. check channel infos in ets table
Chann = {?CLIENTID, self()},
?assertEqual(
[Chann],
ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))
),
?assertEqual(
[{Chann, ?MODULE}],
ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))
),
%% assert2. discard the presented session
{ok, SessionRes2} = emqx_gateway_cm:open_session(
?GWNAME,
true,
clientinfo(),
conninfo(),
fun(_, _) -> #{no => 2} end
),
?assertEqual(
#{
present => false,
session => #{no => 2}
},
SessionRes2
),
emqx_gateway_cm:insert_channel_info(
?GWNAME,
?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()},
[]
),
?assertEqual(
1,
ets:info(emqx_gateway_cm:tabname(info, ?GWNAME), size)
),
receive
discard ->
emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
after 100 ->
?assert(false, "waiting discard msg timeout")
end,
%% assert3. no channel infos in ets table
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))
),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))
),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))
).
t_get_set_chan_info_stats(_) ->
{ok, SessionRes} = emqx_gateway_cm:open_session(
?GWNAME,
true,
clientinfo(),
conninfo(),
fun(_, _) -> #{no => 1} end
),
?assertEqual(
#{
present => false,
session => #{no => 1}
},
SessionRes
),
emqx_gateway_cm:insert_channel_info(
?GWNAME,
?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()},
[]
),
%% Info: get/set
NInfo = #{newinfo => true, node => node()},
emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
?assertEqual(
NInfo,
emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID)
),
?assertEqual(
NInfo,
emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID, self())
),
%% Stats: get/set
NStats = [{newstats, true}],
emqx_gateway_cm:set_chan_stats(?GWNAME, ?CLIENTID, NStats),
?assertEqual(
NStats,
emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID)
),
?assertEqual(
NStats,
emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID, self())
),
emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID).
t_handle_process_down(Conf) ->
Pid = proplists:get_value(cm, Conf),
{ok, SessionRes} = emqx_gateway_cm:open_session(
?GWNAME,
true,
clientinfo(),
conninfo(),
fun(_, _) -> #{no => 1} end
),
?assertEqual(
#{
present => false,
session => #{no => 1}
},
SessionRes
),
emqx_gateway_cm:insert_channel_info(
?GWNAME,
?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()},
[]
),
_ = Pid ! {'DOWN', mref, process, self(), normal},
%% wait the async clear task
timer:sleep(200),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))
),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))
),
?assertEqual(
[],
ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))
).
t_kick_session(_) ->
%% session1
{ok, _} = emqx_gateway_cm:open_session(
?GWNAME,
true,
clientinfo(),
conninfo(),
fun(_, _) -> #{no => 1} end
),
emqx_gateway_cm:insert_channel_info(
?GWNAME,
?CLIENTID,
#{clientinfo => clientinfo(), conninfo => conninfo()},
[]
),
%% meck `lookup_channels`
Self = self(),
ok = meck:new(
emqx_gateway_cm_registry,
[passthrough, no_history, no_link]
),
ok = meck:expect(
emqx_gateway_cm_registry,
lookup_channels,
fun(_, ?CLIENTID) -> [Self, Self] end
),
ok = emqx_gateway_cm:kick_session(?GWNAME, ?CLIENTID),
receive
kick -> ok
after 100 -> ?assert(false, "waiting discard msg timeout")
end,
receive
kick ->
emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
after 100 ->
?assert(false, "waiting kick msg timeout")
end,
?assertMatch({error, not_found}, emqx_gateway_http:kickout_client(?GWNAME, <<"i-dont-exist">>)),
meck:unload(emqx_gateway_cm_registry).
t_unexpected_handle(Conf) ->
Pid = proplists:get_value(cm, Conf),
_ = Pid ! unexpected_info,
ok = gen_server:call(Pid, unexpected_call),
ok = gen_server:cast(Pid, unexpected_cast).
%%--------------------------------------------------------------------
%% helpers
clientinfo() ->
#{
clientid => ?CLIENTID,
is_bridge => false,
is_superuser => false,
listener => 'mqttsn:udp:default',
mountpoint => <<"mqttsn/">>,
peerhost => {127, 0, 0, 1},
protocol => 'mqtt-sn',
sockport => 1884,
username => undefined,
zone => default
}.
conninfo() ->
#{
clean_start => true,
clientid => ?CLIENTID,
conn_mod => ?MODULE,
connected_at => 1641805544652,
expiry_interval => 0,
keepalive => 10,
peercert => nossl,
peername => {{127, 0, 0, 1}, 64810},
proto_name => <<"MQTT-SN">>,
proto_ver => <<"1.2">>,
sockname => {{0, 0, 0, 0}, 1884},
socktype => udp
}.
%%--------------------------------------------------------------------
%% connection module mock
call(ConnPid, discard, _) ->
ConnPid ! discard,
ok;
call(ConnPid, kick, _) ->
ConnPid ! kick,
ok.