209 lines
8.2 KiB
Erlang
209 lines
8.2 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_cm_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-define(CM, emqx_cm).
|
|
-define(ChanInfo,#{conninfo =>
|
|
#{socktype => tcp,
|
|
peername => {{127,0,0,1}, 5000},
|
|
sockname => {{127,0,0,1}, 1883},
|
|
peercert => nossl,
|
|
conn_mod => emqx_connection,
|
|
receive_maximum => 100}}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% CT callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
all() -> emqx_ct:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_ct_helpers:boot_modules(all),
|
|
emqx_ct_helpers:start_apps([]),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_ct_helpers:stop_apps([]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% TODO: Add more test cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_reg_unreg_channel(_) ->
|
|
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
?assertEqual([self()], emqx_cm:lookup_channels(<<"clientid">>)),
|
|
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
|
|
|
|
t_get_set_chan_info(_) ->
|
|
Info = ?ChanInfo,
|
|
ok = emqx_cm:register_channel(<<"clientid">>, Info, []),
|
|
?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)),
|
|
Info1 = Info#{proto_ver => 5},
|
|
true = emqx_cm:set_chan_info(<<"clientid">>, Info1),
|
|
?assertEqual(Info1, emqx_cm:get_chan_info(<<"clientid">>)),
|
|
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)).
|
|
|
|
t_get_set_chan_stats(_) ->
|
|
Stats = [{recv_oct, 10}, {send_oct, 8}],
|
|
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, Stats),
|
|
?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)),
|
|
Stats1 = [{recv_oct, 10}|Stats],
|
|
true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
|
|
?assertEqual(Stats1, emqx_cm:get_chan_stats(<<"clientid">>)),
|
|
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).
|
|
|
|
t_open_session(_) ->
|
|
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
|
|
|
|
ClientInfo = #{zone => external,
|
|
clientid => <<"clientid">>,
|
|
username => <<"username">>,
|
|
peerhost => {127,0,0,1}},
|
|
ConnInfo = #{socktype => tcp,
|
|
peername => {{127,0,0,1}, 5000},
|
|
sockname => {{127,0,0,1}, 1883},
|
|
peercert => nossl,
|
|
conn_mod => emqx_connection,
|
|
receive_maximum => 100},
|
|
{ok, #{session := Session1, present := false}}
|
|
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
|
|
?assertEqual(100, emqx_session:info(inflight_max, Session1)),
|
|
{ok, #{session := Session2, present := false}}
|
|
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
|
|
?assertEqual(100, emqx_session:info(inflight_max, Session2)),
|
|
|
|
emqx_cm:unregister_channel(<<"clientid">>),
|
|
ok = meck:unload(emqx_connection).
|
|
|
|
t_open_session_race_condition(_) ->
|
|
ClientInfo = #{zone => external,
|
|
clientid => <<"clientid">>,
|
|
username => <<"username">>,
|
|
peerhost => {127,0,0,1}},
|
|
ConnInfo = #{socktype => tcp,
|
|
peername => {{127,0,0,1}, 5000},
|
|
sockname => {{127,0,0,1}, 1883},
|
|
peercert => nossl,
|
|
conn_mod => emqx_connection,
|
|
receive_maximum => 100},
|
|
|
|
Parent = self(),
|
|
OpenASession = fun() ->
|
|
timer:sleep(rand:uniform(100)),
|
|
OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)),
|
|
Parent ! OpenR,
|
|
case OpenR of
|
|
{ok, _} ->
|
|
receive
|
|
{'$gen_call', From, discard} ->
|
|
gen_server:reply(From, ok), ok
|
|
end;
|
|
{error, Reason} ->
|
|
exit(Reason)
|
|
end
|
|
end,
|
|
[spawn(
|
|
fun() ->
|
|
spawn(OpenASession),
|
|
spawn(OpenASession)
|
|
end) || _ <- lists:seq(1, 1000)],
|
|
|
|
WaitingRecv = fun _Wr(N1, N2, 0) ->
|
|
{N1, N2};
|
|
_Wr(N1, N2, Rest) ->
|
|
receive
|
|
{ok, _} -> _Wr(N1+1, N2, Rest-1);
|
|
{error, _} -> _Wr(N1, N2+1, Rest-1)
|
|
end
|
|
end,
|
|
|
|
ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]),
|
|
|
|
?assertEqual(1, ets:info(emqx_channel, size)),
|
|
?assertEqual(1, ets:info(emqx_channel_conn, size)),
|
|
?assertEqual(1, ets:info(emqx_channel_registry, size)),
|
|
|
|
[Pid] = emqx_cm:lookup_channels(<<"clientid">>),
|
|
exit(Pid, kill), timer:sleep(100),
|
|
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
|
|
|
|
t_discard_session(_) ->
|
|
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
|
|
ok = emqx_cm:discard_session(<<"clientid">>),
|
|
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
ok = emqx_cm:discard_session(<<"clientid">>),
|
|
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
ok = emqx_cm:discard_session(<<"clientid">>),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
|
|
ok = emqx_cm:discard_session(<<"clientid">>),
|
|
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
ok = meck:unload(emqx_connection).
|
|
|
|
t_takeover_session(_) ->
|
|
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
|
|
erlang:spawn(fun() ->
|
|
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
receive
|
|
{'$gen_call', From, {takeover, 'begin'}} ->
|
|
gen_server:reply(From, test), ok
|
|
end
|
|
end),
|
|
timer:sleep(100),
|
|
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
|
|
emqx_cm:unregister_channel(<<"clientid">>).
|
|
|
|
t_kick_session(_) ->
|
|
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
|
|
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
|
|
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
test = emqx_cm:kick_session(<<"clientid">>),
|
|
erlang:spawn(fun() ->
|
|
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
timer:sleep(1000)
|
|
end),
|
|
ct:sleep(100),
|
|
test = emqx_cm:kick_session(<<"clientid">>),
|
|
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
ok = meck:unload(emqx_connection).
|
|
|
|
t_all_channels(_) ->
|
|
?assertEqual(true, is_list(emqx_cm:all_channels())).
|
|
|
|
t_lock_clientid(_) ->
|
|
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
|
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
|
{true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>),
|
|
{true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>).
|
|
|
|
t_message(_) ->
|
|
?CM ! testing,
|
|
gen_server:cast(?CM, testing),
|
|
gen_server:call(?CM, testing).
|