fix(session): check if session is persistent
This commit is contained in:
parent
a054049d12
commit
7805cc8c9b
|
@ -14,6 +14,8 @@
|
||||||
-define(DS_SHARD, <<"local">>).
|
-define(DS_SHARD, <<"local">>).
|
||||||
-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
|
-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
|
||||||
|
|
||||||
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% CT boilerplate
|
%% CT boilerplate
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -56,9 +58,11 @@ end_per_testcase(TestCase, Config) when
|
||||||
TestCase =:= t_session_unsubscription_idempotency
|
TestCase =:= t_session_unsubscription_idempotency
|
||||||
->
|
->
|
||||||
Nodes = ?config(nodes, Config),
|
Nodes = ?config(nodes, Config),
|
||||||
|
emqx_common_test_helpers:call_janitor(60_000),
|
||||||
ok = emqx_cth_cluster:stop(Nodes),
|
ok = emqx_cth_cluster:stop(Nodes),
|
||||||
ok;
|
ok;
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
emqx_common_test_helpers:call_janitor(60_000),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -124,10 +128,56 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) ->
|
||||||
false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port)
|
false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
start_client(Opts0 = #{}) ->
|
||||||
|
Defaults = #{
|
||||||
|
proto_ver => v5,
|
||||||
|
properties => #{'Session-Expiry-Interval' => 300}
|
||||||
|
},
|
||||||
|
Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
|
||||||
|
{ok, Client} = emqtt:start_link(Opts),
|
||||||
|
on_exit(fun() -> catch emqtt:stop(Client) end),
|
||||||
|
Client.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_non_persistent_session_subscription(_Config) ->
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
SubTopicFilter = <<"t/#">>,
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
ct:pal("starting"),
|
||||||
|
Client = start_client(#{
|
||||||
|
clientid => ClientId,
|
||||||
|
properties => #{'Session-Expiry-Interval' => 0}
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
ct:pal("subscribing"),
|
||||||
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
|
||||||
|
IteratorRefs = get_all_iterator_refs(node()),
|
||||||
|
IteratorIds = get_all_iterator_ids(node()),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client),
|
||||||
|
|
||||||
|
#{
|
||||||
|
iterator_refs => IteratorRefs,
|
||||||
|
iterator_ids => IteratorIds
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
fun(Res, Trace) ->
|
||||||
|
ct:pal("trace:\n ~p", [Trace]),
|
||||||
|
#{
|
||||||
|
iterator_refs := IteratorRefs,
|
||||||
|
iterator_ids := IteratorIds
|
||||||
|
} = Res,
|
||||||
|
?assertEqual([], IteratorRefs),
|
||||||
|
?assertEqual({ok, []}, IteratorIds),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_session_subscription_idempotency(Config) ->
|
t_session_subscription_idempotency(Config) ->
|
||||||
[Node1Spec | _] = ?config(node_specs, Config),
|
[Node1Spec | _] = ?config(node_specs, Config),
|
||||||
[Node1] = ?config(nodes, Config),
|
[Node1] = ?config(nodes, Config),
|
||||||
|
@ -177,11 +227,7 @@ t_session_subscription_idempotency(Config) ->
|
||||||
end),
|
end),
|
||||||
|
|
||||||
ct:pal("starting 1"),
|
ct:pal("starting 1"),
|
||||||
{ok, Client0} = emqtt:start_link([
|
Client0 = start_client(#{port => Port, clientid => ClientId}),
|
||||||
{port, Port},
|
|
||||||
{clientid, ClientId},
|
|
||||||
{proto_ver, v5}
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:connect(Client0),
|
{ok, _} = emqtt:connect(Client0),
|
||||||
ct:pal("subscribing 1"),
|
ct:pal("subscribing 1"),
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
|
@ -195,11 +241,7 @@ t_session_subscription_idempotency(Config) ->
|
||||||
|
|
||||||
{ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
|
{ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
|
||||||
ct:pal("starting 2"),
|
ct:pal("starting 2"),
|
||||||
{ok, Client1} = emqtt:start_link([
|
Client1 = start_client(#{port => Port, clientid => ClientId}),
|
||||||
{port, Port},
|
|
||||||
{clientid, ClientId},
|
|
||||||
{proto_ver, v5}
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
ct:pal("subscribing 2"),
|
ct:pal("subscribing 2"),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
|
{ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
|
||||||
|
@ -273,11 +315,7 @@ t_session_unsubscription_idempotency(Config) ->
|
||||||
end),
|
end),
|
||||||
|
|
||||||
ct:pal("starting 1"),
|
ct:pal("starting 1"),
|
||||||
{ok, Client0} = emqtt:start_link([
|
Client0 = start_client(#{port => Port, clientid => ClientId}),
|
||||||
{port, Port},
|
|
||||||
{clientid, ClientId},
|
|
||||||
{proto_ver, v5}
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:connect(Client0),
|
{ok, _} = emqtt:connect(Client0),
|
||||||
ct:pal("subscribing 1"),
|
ct:pal("subscribing 1"),
|
||||||
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
|
||||||
|
@ -293,11 +331,7 @@ t_session_unsubscription_idempotency(Config) ->
|
||||||
|
|
||||||
{ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
|
{ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
|
||||||
ct:pal("starting 2"),
|
ct:pal("starting 2"),
|
||||||
{ok, Client1} = emqtt:start_link([
|
Client1 = start_client(#{port => Port, clientid => ClientId}),
|
||||||
{port, Port},
|
|
||||||
{clientid, ClientId},
|
|
||||||
{proto_ver, v5}
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
ct:pal("subscribing 2"),
|
ct:pal("subscribing 2"),
|
||||||
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
|
||||||
|
|
|
@ -317,6 +317,8 @@ is_subscriptions_full(#session{
|
||||||
|
|
||||||
-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
|
-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
|
||||||
session().
|
session().
|
||||||
|
add_persistent_subscription(_TopicFilterBin, _ClientId, Session = #session{is_persistent = false}) ->
|
||||||
|
Session;
|
||||||
add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
|
add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
|
||||||
_ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
|
_ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
|
||||||
Session.
|
Session.
|
||||||
|
|
|
@ -121,12 +121,11 @@ t_session_subscription_iterators(Config) ->
|
||||||
lists:seq(1, 4)
|
lists:seq(1, 4)
|
||||||
),
|
),
|
||||||
ct:pal("starting"),
|
ct:pal("starting"),
|
||||||
{ok, Client} = emqtt:start_link([
|
Client = connect(#{
|
||||||
{port, Port},
|
clientid => ClientId,
|
||||||
{clientid, ClientId},
|
port => Port,
|
||||||
{proto_ver, v5}
|
properties => #{'Session-Expiry-Interval' => 300}
|
||||||
]),
|
}),
|
||||||
{ok, _} = emqtt:connect(Client),
|
|
||||||
ct:pal("publishing 1"),
|
ct:pal("publishing 1"),
|
||||||
Message1 = emqx_message:make(Topic, Payload1),
|
Message1 = emqx_message:make(Topic, Payload1),
|
||||||
publish(Node1, Message1),
|
publish(Node1, Message1),
|
||||||
|
@ -195,15 +194,18 @@ t_session_subscription_iterators(Config) ->
|
||||||
%%
|
%%
|
||||||
|
|
||||||
connect(ClientId, CleanStart, EI) ->
|
connect(ClientId, CleanStart, EI) ->
|
||||||
{ok, Client} = emqtt:start_link([
|
connect(#{
|
||||||
{clientid, ClientId},
|
clientid => ClientId,
|
||||||
{proto_ver, v5},
|
clean_start => CleanStart,
|
||||||
{clean_start, CleanStart},
|
properties => maps:from_list(
|
||||||
{properties,
|
[{'Session-Expiry-Interval', EI} || is_integer(EI)]
|
||||||
maps:from_list(
|
)
|
||||||
[{'Session-Expiry-Interval', EI} || is_integer(EI)]
|
}).
|
||||||
)}
|
|
||||||
]),
|
connect(Opts0 = #{}) ->
|
||||||
|
Defaults = #{proto_ver => v5},
|
||||||
|
Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
|
||||||
|
{ok, Client} = emqtt:start_link(Opts),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
Client.
|
Client.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue