feat(queue): add new test scenarios

This commit is contained in:
Ilya Averyanov 2024-06-27 17:18:59 +03:00
parent bceb5d43ed
commit 8f0d807c00
1 changed files with 87 additions and 2 deletions

View File

@ -67,6 +67,91 @@ t_lease_initial(_Config) ->
ok = emqtt:disconnect(ConnShared),
ok = emqtt:disconnect(ConnPub).
t_two_clients(_Config) ->
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr4/topic4/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr4/topic4/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/2">>, <<"hello2">>, 1),
ct:sleep(2_000),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/2">>, <<"hello4">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_client_loss(_Config) ->
process_flag(trap_exit, true),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr5/topic5/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr5/topic5/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/2">>, <<"hello2">>, 1),
exit(ConnShared1, kill),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/2">>, <<"hello4">>, 1),
?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_stream_revoke(_Config) ->
process_flag(trap_exit, true),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr6/topic6/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
?assertWaitEvent(
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr6/topic6/#">>, 1),
#{
?snk_kind := shared_sub_group_sm_leader_update_streams,
stream_progresses := [_ | _],
id := <<"client_shared2">>
},
5_000
),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/2">>, <<"hello4">>, 1),
?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
@ -127,7 +212,7 @@ t_renew_lease_timeout(_Config) ->
emqtt_connect_sub(ClientId) ->
{ok, C} = emqtt:start_link([
{client_id, ClientId},
{clientid, ClientId},
{clean_start, true},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7_200}}
@ -137,7 +222,7 @@ emqtt_connect_sub(ClientId) ->
emqtt_connect_pub(ClientId) ->
{ok, C} = emqtt:start_link([
{client_id, ClientId},
{clientid, ClientId},
{clean_start, true},
{proto_ver, v5}
]),