diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index eb2c3564f..3f6950b3b 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -109,9 +109,13 @@ t_open_session(_) -> emqx_cm:unregister_channel(<<"clientid">>), ok = meck:unload(emqx_connection). +rand_client_id() -> + list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())). + t_open_session_race_condition(_) -> + ClientId = rand_client_id(), ClientInfo = #{zone => external, - clientid => <<"clientid">>, + clientid => ClientId, username => <<"username">>, peerhost => {127,0,0,1}}, ConnInfo = #{socktype => tcp, @@ -136,11 +140,12 @@ t_open_session_race_condition(_) -> exit(Reason) end end, + N = 1000, [spawn( fun() -> spawn(OpenASession), spawn(OpenASession) - end) || _ <- lists:seq(1, 1000)], + end) || _ <- lists:seq(1, N)], WaitingRecv = fun _Wr(N1, N2, 0) -> {N1, N2}; @@ -151,45 +156,49 @@ t_open_session_race_condition(_) -> end end, - ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]), + {Succeeded, Failed} = WaitingRecv(0, 0, 2 * N), + ct:pal("Race condition status: succeeded=~p failed=~p~n", [Succeeded, Failed]), - ?assertEqual(1, ets:info(emqx_channel, size)), - ?assertEqual(1, ets:info(emqx_channel_conn, size)), - ?assertEqual(1, ets:info(emqx_channel_registry, size)), + ?assertMatch([_], ets:lookup(emqx_channel, ClientId)), + [Pid] = emqx_cm:lookup_channels(ClientId), + ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Pid})), + ?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)), - [Pid] = emqx_cm:lookup_channels(<<"clientid">>), - exit(Pid, kill), timer:sleep(100), - ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)). + exit(Pid, kill), + timer:sleep(100), %% TODO deterministic + ?assertEqual([], emqx_cm:lookup_channels(ClientId)). t_discard_session(_) -> + ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), + ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - ok = emqx_cm:discard_session(<<"clientid">>), + ok = emqx_cm:discard_session(ClientId), + ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), + ok = emqx_cm:discard_session(ClientId), + ok = emqx_cm:unregister_channel(ClientId), + ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), + ok = emqx_cm:discard_session(ClientId), ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), + ok = emqx_cm:discard_session(ClientId), + ok = emqx_cm:unregister_channel(ClientId), ok = meck:unload(emqx_connection). t_discard_session_race(_) -> + ClientId = rand_client_id(), ?check_trace( begin #{conninfo := ConnInfo0} = ?ChanInfo, ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection}, {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end), - ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo), + ok = emqx_cm:register_channel(ClientId, Pid, ConnInfo), Pid ! stop, receive {'DOWN', Ref, process, Pid, normal} -> ok end, - ok = emqx_cm:discard_session(<<"clientid">>), + ok = emqx_cm:discard_session(ClientId), {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000) end, fun(_, _) -> @@ -235,10 +244,10 @@ t_all_channels(_) -> ?assertEqual(true, is_list(emqx_cm:all_channels())). t_lock_clientid(_) -> - {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>), - {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>), - {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>), - {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>). + {true, Nodes} = emqx_cm_locker:lock(<<"clientid">>), + ?assertEqual({true, Nodes}, emqx_cm_locker:lock(<<"clientid">>)), + ?assertEqual({true, Nodes}, emqx_cm_locker:unlock(<<"clientid">>)), + ?assertEqual({true, Nodes}, emqx_cm_locker:unlock(<<"clientid">>)). t_message(_) -> ?CM ! testing,