diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 6eafe0e4a..8bb460967 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -67,6 +67,91 @@ t_lease_initial(_Config) -> ok = emqtt:disconnect(ConnShared), ok = emqtt:disconnect(ConnPub). +t_two_clients(_Config) -> + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr4/topic4/#">>, 1), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr4/topic4/#">>, 1), + + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + {ok, _} = emqtt:publish(ConnPub, <<"topic4/1">>, <<"hello1">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic4/2">>, <<"hello2">>, 1), + ct:sleep(2_000), + {ok, _} = emqtt:publish(ConnPub, <<"topic4/1">>, <<"hello3">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic4/2">>, <<"hello4">>, 1), + + ?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000), + ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), + ?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000), + ?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000), + + ok = emqtt:disconnect(ConnShared1), + ok = emqtt:disconnect(ConnShared2), + ok = emqtt:disconnect(ConnPub). + +t_client_loss(_Config) -> + process_flag(trap_exit, true), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr5/topic5/#">>, 1), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr5/topic5/#">>, 1), + + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + {ok, _} = emqtt:publish(ConnPub, <<"topic5/1">>, <<"hello1">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic5/2">>, <<"hello2">>, 1), + + exit(ConnShared1, kill), + + {ok, _} = emqtt:publish(ConnPub, <<"topic5/1">>, <<"hello3">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic5/2">>, <<"hello4">>, 1), + + ?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000), + ?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000), + + ok = emqtt:disconnect(ConnShared2), + ok = emqtt:disconnect(ConnPub). + +t_stream_revoke(_Config) -> + process_flag(trap_exit, true), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr6/topic6/#">>, 1), + + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + {ok, _} = emqtt:publish(ConnPub, <<"topic6/1">>, <<"hello1">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic6/2">>, <<"hello2">>, 1), + + ?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000), + ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + + ?assertWaitEvent( + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr6/topic6/#">>, 1), + #{ + ?snk_kind := shared_sub_group_sm_leader_update_streams, + stream_progresses := [_ | _], + id := <<"client_shared2">> + }, + 5_000 + ), + + {ok, _} = emqtt:publish(ConnPub, <<"topic6/1">>, <<"hello3">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic6/2">>, <<"hello4">>, 1), + + ?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000), + ?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000), + + ok = emqtt:disconnect(ConnShared1), + ok = emqtt:disconnect(ConnShared2), + ok = emqtt:disconnect(ConnPub). + t_lease_reconnect(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), @@ -127,7 +212,7 @@ t_renew_lease_timeout(_Config) -> emqtt_connect_sub(ClientId) -> {ok, C} = emqtt:start_link([ - {client_id, ClientId}, + {clientid, ClientId}, {clean_start, true}, {proto_ver, v5}, {properties, #{'Session-Expiry-Interval' => 7_200}} @@ -137,7 +222,7 @@ emqtt_connect_sub(ClientId) -> emqtt_connect_pub(ClientId) -> {ok, C} = emqtt:start_link([ - {client_id, ClientId}, + {clientid, ClientId}, {clean_start, true}, {proto_ver, v5} ]),