diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index d721df5ed..efcd4e946 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -14,6 +14,8 @@ -define(DS_SHARD, <<"local">>). -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). +-import(emqx_common_test_helpers, [on_exit/1]). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -56,9 +58,11 @@ end_per_testcase(TestCase, Config) when TestCase =:= t_session_unsubscription_idempotency -> Nodes = ?config(nodes, Config), + emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), ok; end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(60_000), ok. %%------------------------------------------------------------------------------ @@ -124,10 +128,56 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) -> false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port) ). +start_client(Opts0 = #{}) -> + Defaults = #{ + proto_ver => v5, + properties => #{'Session-Expiry-Interval' => 300} + }, + Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), + {ok, Client} = emqtt:start_link(Opts), + on_exit(fun() -> catch emqtt:stop(Client) end), + Client. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ +t_non_persistent_session_subscription(_Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + SubTopicFilter = <<"t/#">>, + ?check_trace( + begin + ct:pal("starting"), + Client = start_client(#{ + clientid => ClientId, + properties => #{'Session-Expiry-Interval' => 0} + }), + {ok, _} = emqtt:connect(Client), + ct:pal("subscribing"), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), + IteratorRefs = get_all_iterator_refs(node()), + IteratorIds = get_all_iterator_ids(node()), + + ok = emqtt:stop(Client), + + #{ + iterator_refs => IteratorRefs, + iterator_ids => IteratorIds + } + end, + fun(Res, Trace) -> + ct:pal("trace:\n ~p", [Trace]), + #{ + iterator_refs := IteratorRefs, + iterator_ids := IteratorIds + } = Res, + ?assertEqual([], IteratorRefs), + ?assertEqual({ok, []}, IteratorIds), + ok + end + ), + ok. + t_session_subscription_idempotency(Config) -> [Node1Spec | _] = ?config(node_specs, Config), [Node1] = ?config(nodes, Config), @@ -177,11 +227,7 @@ t_session_subscription_idempotency(Config) -> end), ct:pal("starting 1"), - {ok, Client0} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + Client0 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client0), ct:pal("subscribing 1"), process_flag(trap_exit, true), @@ -195,11 +241,7 @@ t_session_subscription_idempotency(Config) -> {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), ct:pal("starting 2"), - {ok, Client1} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + Client1 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client1), ct:pal("subscribing 2"), {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), @@ -273,11 +315,7 @@ t_session_unsubscription_idempotency(Config) -> end), ct:pal("starting 1"), - {ok, Client0} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + Client0 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client0), ct:pal("subscribing 1"), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2), @@ -293,11 +331,7 @@ t_session_unsubscription_idempotency(Config) -> {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), ct:pal("starting 2"), - {ok, Client1} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + Client1 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client1), ct:pal("subscribing 2"), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index ebc1a00a3..ce71ade91 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -317,6 +317,8 @@ is_subscriptions_full(#session{ -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> session(). +add_persistent_subscription(_TopicFilterBin, _ClientId, Session = #session{is_persistent = false}) -> + Session; add_persistent_subscription(TopicFilterBin, ClientId, Session) -> _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId), Session. diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index c4f7ef73b..8dbd34cd9 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -121,12 +121,11 @@ t_session_subscription_iterators(Config) -> lists:seq(1, 4) ), ct:pal("starting"), - {ok, Client} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), - {ok, _} = emqtt:connect(Client), + Client = connect(#{ + clientid => ClientId, + port => Port, + properties => #{'Session-Expiry-Interval' => 300} + }), ct:pal("publishing 1"), Message1 = emqx_message:make(Topic, Payload1), publish(Node1, Message1), @@ -195,15 +194,18 @@ t_session_subscription_iterators(Config) -> %% connect(ClientId, CleanStart, EI) -> - {ok, Client} = emqtt:start_link([ - {clientid, ClientId}, - {proto_ver, v5}, - {clean_start, CleanStart}, - {properties, - maps:from_list( - [{'Session-Expiry-Interval', EI} || is_integer(EI)] - )} - ]), + connect(#{ + clientid => ClientId, + clean_start => CleanStart, + properties => maps:from_list( + [{'Session-Expiry-Interval', EI} || is_integer(EI)] + ) + }). + +connect(Opts0 = #{}) -> + Defaults = #{proto_ver => v5}, + Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), + {ok, Client} = emqtt:start_link(Opts), {ok, _} = emqtt:connect(Client), Client.