1062 lines
44 KiB
Erlang
1062 lines
44 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_persistent_session_SUITE).
|
|
|
|
-include_lib("stdlib/include/assert.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
-include_lib("../include/emqx.hrl").
|
|
-include("../src/emqx_persistent_session.hrl").
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% SUITE boilerplate
|
|
%%--------------------------------------------------------------------
|
|
|
|
all() ->
|
|
[ {group, persistent_store_enabled}
|
|
, {group, persistent_store_disabled}
|
|
].
|
|
|
|
%% A persistent session can be resumed in two ways:
|
|
%% 1. The old connection process is still alive, and the session is taken
|
|
%% over by the new connection.
|
|
%% 2. The old session process has died (e.g., because of node down).
|
|
%% The new process resumes the session from the stored state, and finds
|
|
%% any subscribed messages from the persistent message store.
|
|
%%
|
|
%% We want to test both ways, both with the db backend enabled and disabled.
|
|
%%
|
|
%% In addition, we test both tcp and quic connections.
|
|
|
|
groups() ->
|
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
|
SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)],
|
|
GCTests = [TC || TC <- TCs, is_gc_tc(TC)],
|
|
OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests,
|
|
[ {persistent_store_enabled, [ {group, no_kill_connection_process}
|
|
, {group, kill_connection_process}
|
|
, {group, snabbkaffe}
|
|
, {group, gc_tests}
|
|
]}
|
|
, {persistent_store_disabled, [ {group, no_kill_connection_process}
|
|
]}
|
|
, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
|
|
, { kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
|
|
, {snabbkaffe, [], [{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}]}
|
|
, {tcp, [], OtherTCs}
|
|
, {quic, [], OtherTCs}
|
|
, {ws, [], OtherTCs}
|
|
, {tcp_snabbkaffe, [], SnabbkaffeTCs}
|
|
, {quic_snabbkaffe, [], SnabbkaffeTCs}
|
|
, {ws_snabbkaffe, [], SnabbkaffeTCs}
|
|
, {gc_tests, [], GCTests}
|
|
].
|
|
|
|
is_snabbkaffe_tc(TC) ->
|
|
re:run(atom_to_list(TC), "^t_snabbkaffe_") /= nomatch.
|
|
|
|
is_gc_tc(TC) ->
|
|
re:run(atom_to_list(TC), "^t_gc_") /= nomatch.
|
|
|
|
init_per_group(persistent_store_enabled, Config) ->
|
|
%% Start Apps
|
|
emqx_common_test_helpers:boot_modules(all),
|
|
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
|
|
meck:expect(emqx_config, get, fun(?is_enabled_key) -> true;
|
|
(Other) -> meck:passthrough([Other])
|
|
end),
|
|
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
|
|
?assertEqual(true, emqx_persistent_session:is_store_enabled()),
|
|
[{persistent_store_enabled, true}|Config];
|
|
init_per_group(persistent_store_disabled, Config) ->
|
|
%% Start Apps
|
|
emqx_common_test_helpers:boot_modules(all),
|
|
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
|
|
meck:expect(emqx_config, get, fun(?is_enabled_key) -> false;
|
|
(Other) -> meck:passthrough([Other])
|
|
end),
|
|
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
|
|
?assertEqual(false, emqx_persistent_session:is_store_enabled()),
|
|
[{persistent_store_enabled, false}|Config];
|
|
init_per_group(Group, Config) when Group == ws; Group == ws_snabbkaffe ->
|
|
[{ssl,false},
|
|
{host,"localhost"},
|
|
{enable_websocket,true},
|
|
{port, 8083},
|
|
{conn_fun, ws_connect}| Config];
|
|
init_per_group(Group, Config) when Group == tcp; Group == tcp_snabbkaffe ->
|
|
[ {port, 1883}, {conn_fun, connect}| Config];
|
|
init_per_group(Group, Config) when Group == quic; Group == quic_snabbkaffe ->
|
|
[ {port, 14567}, {conn_fun, quic_connect} | Config];
|
|
init_per_group(no_kill_connection_process, Config) ->
|
|
[ {kill_connection_process, false} | Config];
|
|
init_per_group(kill_connection_process, Config) ->
|
|
[ {kill_connection_process, true} | Config];
|
|
init_per_group(snabbkaffe, Config) ->
|
|
[ {kill_connection_process, true} | Config];
|
|
init_per_group(gc_tests, Config) ->
|
|
%% We need to make sure the system does not interfere with this test group.
|
|
emqx_common_test_helpers:stop_apps([]),
|
|
SessionMsgEts = gc_tests_session_store,
|
|
MsgEts = gc_tests_msg_store,
|
|
Pid = spawn(fun() ->
|
|
ets:new(SessionMsgEts, [named_table, public, ordered_set]),
|
|
ets:new(MsgEts, [named_table, public, ordered_set, {keypos, 2}]),
|
|
receive stop -> ok end
|
|
end),
|
|
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
|
|
meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
|
|
(?MSG_TAB) -> ets:first(MsgEts);
|
|
(X) -> meck:passthrough(X)
|
|
end),
|
|
meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
|
|
(?MSG_TAB, X) -> ets:next(MsgEts, X);
|
|
(Tab, X) -> meck:passthrough([Tab, X])
|
|
end),
|
|
meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X);
|
|
(Tab, X) -> meck:passthrough([Tab, X])
|
|
end),
|
|
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
|
|
|
|
init_per_suite(Config) ->
|
|
Config.
|
|
|
|
set_special_confs(emqx) ->
|
|
Path = emqx_common_test_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"),
|
|
application:set_env(emqx, plugins_loaded_file, Path);
|
|
set_special_confs(_) ->
|
|
ok.
|
|
|
|
end_per_suite(_Config) ->
|
|
ok.
|
|
|
|
end_per_group(gc_tests, Config) ->
|
|
meck:unload(mnesia),
|
|
?config(store_owner, Config) ! stop,
|
|
ok;
|
|
end_per_group(persistent_store_enabled, _Config) ->
|
|
meck:unload(emqx_config),
|
|
emqx_common_test_helpers:stop_apps([]);
|
|
end_per_group(persistent_store_disabled, _Config) ->
|
|
meck:unload(emqx_config),
|
|
emqx_common_test_helpers:stop_apps([]);
|
|
end_per_group(_Group, _Config) ->
|
|
ok.
|
|
|
|
init_per_testcase(TestCase, Config) ->
|
|
Config1 = preconfig_per_testcase(TestCase, Config),
|
|
case is_gc_tc(TestCase) of
|
|
true ->
|
|
ets:delete_all_objects(?config(msg_store, Config)),
|
|
ets:delete_all_objects(?config(session_msg_store, Config));
|
|
false ->
|
|
skip
|
|
end,
|
|
case erlang:function_exported(?MODULE, TestCase, 2) of
|
|
true -> ?MODULE:TestCase(init, Config1);
|
|
_ -> Config1
|
|
end.
|
|
|
|
end_per_testcase(TestCase, Config) ->
|
|
case is_snabbkaffe_tc(TestCase) of
|
|
true -> snabbkaffe:stop();
|
|
false -> skip
|
|
end,
|
|
case erlang:function_exported(?MODULE, TestCase, 2) of
|
|
true -> ?MODULE:TestCase('end', Config);
|
|
false -> ok
|
|
end,
|
|
Config.
|
|
|
|
preconfig_per_testcase(TestCase, Config) ->
|
|
{BaseName, Config1} =
|
|
case ?config(tc_group_properties, Config) of
|
|
[] ->
|
|
%% We are running a single testcase
|
|
{atom_to_binary(TestCase),
|
|
init_per_group(tcp, init_per_group(kill_connection_process, Config))};
|
|
[_|_] = Props->
|
|
Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
|
|
Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
|
|
Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
|
|
{iolist_to_binary(Pre1),
|
|
Config}
|
|
end,
|
|
[ {topic, iolist_to_binary([BaseName, "/foo"])}
|
|
, {stopic, iolist_to_binary([BaseName, "/+"])}
|
|
, {stopic_alt, iolist_to_binary([BaseName, "/foo"])}
|
|
, {client_id, BaseName}
|
|
| Config1].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helpers
|
|
%%--------------------------------------------------------------------
|
|
|
|
client_info(Key, Client) ->
|
|
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
|
|
|
|
receive_messages(Count) ->
|
|
receive_messages(Count, []).
|
|
|
|
receive_messages(0, Msgs) ->
|
|
Msgs;
|
|
receive_messages(Count, Msgs) ->
|
|
receive
|
|
{publish, Msg} ->
|
|
receive_messages(Count-1, [Msg|Msgs]);
|
|
_Other ->
|
|
receive_messages(Count, Msgs)
|
|
after 1000 ->
|
|
Msgs
|
|
end.
|
|
|
|
maybe_kill_connection_process(ClientId, Config) ->
|
|
case ?config(kill_connection_process, Config) of
|
|
true ->
|
|
[ConnectionPid] = emqx_cm:lookup_channels(ClientId),
|
|
?assert(is_pid(ConnectionPid)),
|
|
Ref = monitor(process, ConnectionPid),
|
|
ConnectionPid ! die_if_test,
|
|
receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
|
|
after 3000 -> error(process_did_not_die)
|
|
end;
|
|
false ->
|
|
ok
|
|
end.
|
|
|
|
snabbkaffe_sync_publish(Topic, Payloads, Config) ->
|
|
Fun = fun(Client, Payload) ->
|
|
?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
|
|
, #{?snk_kind := ps_persist_msg, payload := Payload}
|
|
)
|
|
end,
|
|
do_publish(Payloads, Fun, Config).
|
|
|
|
publish(Topic, Payloads, Config) ->
|
|
Fun = fun(Client, Payload) ->
|
|
{ok, _} = emqtt:publish(Client, Topic, Payload, 2)
|
|
end,
|
|
do_publish(Payloads, Fun, Config).
|
|
|
|
do_publish(Payloads = [_|_], PublishFun, Config) ->
|
|
%% Publish from another process to avoid connection confusion.
|
|
{Pid, Ref} =
|
|
spawn_monitor(
|
|
fun() ->
|
|
%% For convenience, always publish using tcp.
|
|
%% The publish path is not what we are testing.
|
|
{ok, Client} = emqtt:start_link([ {proto_ver, v5}
|
|
, {port, 1883} ]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
|
|
ok = emqtt:disconnect(Client)
|
|
end),
|
|
receive
|
|
{'DOWN', Ref, process, Pid, normal} -> ok;
|
|
{'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
|
|
end;
|
|
do_publish(Payload, PublishFun, Config) ->
|
|
do_publish([Payload], PublishFun, Config).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test Cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% [MQTT-3.1.2-23]
|
|
t_connect_session_expiry_interval(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
Payload = <<"test message">>,
|
|
ClientId = ?config(client_id, Config),
|
|
|
|
{ok, Client1} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
ok = emqtt:disconnect(Client1),
|
|
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
publish(Topic, Payload, Config),
|
|
|
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, false}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
[Msg | _ ] = receive_messages(1),
|
|
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
|
|
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
|
|
?assertEqual({ok, 2}, maps:find(qos, Msg)),
|
|
ok = emqtt:disconnect(Client2).
|
|
|
|
t_without_client_id(Config) ->
|
|
process_flag(trap_exit, true), %% Emqtt client dies
|
|
ConnFun = ?config(conn_fun, Config),
|
|
{ok, Client0} = emqtt:start_link([ {proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, false}
|
|
| Config]),
|
|
{error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
|
|
ok.
|
|
|
|
t_assigned_clientid_persistent_session(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, true}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
|
|
AssignedClientId = client_info(clientid, Client1),
|
|
ok = emqtt:disconnect(Client1),
|
|
|
|
maybe_kill_connection_process(AssignedClientId, Config),
|
|
|
|
{ok, Client2} = emqtt:start_link([ {clientid, AssignedClientId},
|
|
{proto_ver, v5},
|
|
{clean_start, false}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
?assertEqual(1, client_info(session_present, Client2)),
|
|
ok = emqtt:disconnect(Client2).
|
|
|
|
t_cancel_on_disconnect(Config) ->
|
|
%% Open a persistent session, but cancel the persistence when
|
|
%% shutting down the connection.
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
|
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, true}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
|
|
|
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{clean_start, false},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
?assertEqual(0, client_info(session_present, Client2)),
|
|
ok = emqtt:disconnect(Client2).
|
|
|
|
t_persist_on_disconnect(Config) ->
|
|
%% Open a non-persistent session, but add the persistence when
|
|
%% shutting down the connection. This is a protocol error, and
|
|
%% should not convert the session into a persistent session.
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
|
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 0}},
|
|
{clean_start, true}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
|
|
%% Strangely enough, the disconnect is reported as successful by emqtt.
|
|
ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
|
|
|
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{clean_start, false},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
%% The session should not be known, since it wasn't persisted because of the
|
|
%% changed expiry interval in the disconnect call.
|
|
?assertEqual(0, client_info(session_present, Client2)),
|
|
ok = emqtt:disconnect(Client2).
|
|
|
|
wait_for_pending(SId) ->
|
|
wait_for_pending(SId, 100).
|
|
|
|
wait_for_pending(_SId, 0) ->
|
|
error(exhausted_wait_for_pending);
|
|
wait_for_pending(SId, N) ->
|
|
case emqx_persistent_session:pending(SId) of
|
|
[] -> timer:sleep(1), wait_for_pending(SId, N - 1);
|
|
[_|_] = Pending -> Pending
|
|
end.
|
|
|
|
t_process_dies_session_expires(Config) ->
|
|
%% Emulate an error in the connect process,
|
|
%% or that the node of the process goes down.
|
|
%% A persistent session should eventually expire.
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
Payload = <<"test">>,
|
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 1}},
|
|
{clean_start, true}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
ok = emqtt:disconnect(Client1),
|
|
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
ok = publish(Topic, [Payload], Config),
|
|
|
|
SessionId =
|
|
case ?config(persistent_store_enabled, Config) of
|
|
false -> undefined;
|
|
true ->
|
|
%% The session should not be marked as expired.
|
|
{Tag, Session} = emqx_persistent_session:lookup(ClientId),
|
|
?assertEqual(persistent, Tag),
|
|
SId = emqx_session:info(id, Session),
|
|
case ?config(kill_connection_process, Config) of
|
|
true ->
|
|
%% The session should have a pending message
|
|
?assertMatch([_], wait_for_pending(SId));
|
|
false ->
|
|
skip
|
|
end,
|
|
SId
|
|
end,
|
|
|
|
timer:sleep(1100),
|
|
|
|
%% The session should now be marked as expired.
|
|
case (?config(kill_connection_process, Config) andalso
|
|
?config(persistent_store_enabled, Config)) of
|
|
true -> ?assertMatch({expired, _}, emqx_persistent_session:lookup(ClientId));
|
|
false -> skip
|
|
end,
|
|
|
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, false}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
?assertEqual(0, client_info(session_present, Client2)),
|
|
|
|
case (?config(kill_connection_process, Config) andalso
|
|
?config(persistent_store_enabled, Config)) of
|
|
true ->
|
|
%% The session should be a fresh one
|
|
{persistent, NewSession} = emqx_persistent_session:lookup(ClientId),
|
|
?assertNotEqual(SessionId, emqx_session:info(id, NewSession)),
|
|
%% The old session should now either be marked as abandoned or already be garbage collected.
|
|
?assertMatch([], emqx_persistent_session:pending(SessionId));
|
|
false ->
|
|
skip
|
|
end,
|
|
|
|
%% We should not receive the pending message
|
|
?assertEqual([], receive_messages(1)),
|
|
|
|
emqtt:disconnect(Client2).
|
|
|
|
t_publish_while_client_is_gone(Config) ->
|
|
%% A persistent session should receive messages in its
|
|
%% subscription even if the process owning the session dies.
|
|
ConnFun = ?config(conn_fun, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
Payload1 = <<"hello1">>,
|
|
Payload2 = <<"hello2">>,
|
|
ClientId = ?config(client_id, Config),
|
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, true}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
|
|
ok = emqtt:disconnect(Client1),
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
ok = publish(Topic, [Payload1, Payload2], Config),
|
|
|
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, false}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
[Msg1] = receive_messages(1),
|
|
[Msg2] = receive_messages(1),
|
|
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
|
|
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
|
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
|
|
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
|
|
|
|
ok = emqtt:disconnect(Client2).
|
|
|
|
t_clean_start_drops_subscriptions(Config) ->
|
|
%% 1. A persistent session is started and disconnected.
|
|
%% 2. While disconnected, a message is published and persisted.
|
|
%% 3. When connecting again, the clean start flag is set, the subscription is renewed,
|
|
%% then we disconnect again.
|
|
%% 4. Finally, a new connection is made with clean start set to false.
|
|
%% The original message should not be delivered.
|
|
|
|
ConnFun = ?config(conn_fun, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
Payload1 = <<"hello1">>,
|
|
Payload2 = <<"hello2">>,
|
|
Payload3 = <<"hello3">>,
|
|
ClientId = ?config(client_id, Config),
|
|
|
|
%% 1.
|
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, true}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
|
|
ok = emqtt:disconnect(Client1),
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
%% 2.
|
|
ok = publish(Topic, Payload1, Config),
|
|
|
|
%% 3.
|
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, true}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
?assertEqual(0, client_info(session_present, Client2)),
|
|
{ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
|
|
|
|
ok = publish(Topic, Payload2, Config),
|
|
[Msg1] = receive_messages(1),
|
|
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
|
|
|
|
ok = emqtt:disconnect(Client2),
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
%% 4.
|
|
{ok, Client3} = emqtt:start_link([ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, false}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client3),
|
|
|
|
ok = publish(Topic, Payload3, Config),
|
|
[Msg2] = receive_messages(1),
|
|
?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
|
|
|
|
ok = emqtt:disconnect(Client3).
|
|
|
|
t_unsubscribe(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
{ok, Client} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client),
|
|
{ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
|
|
case emqx_persistent_session:is_store_enabled() of
|
|
true ->
|
|
{persistent, Session} = emqx_persistent_session:lookup(ClientId),
|
|
SessionID = emqx_session:info(id, Session),
|
|
SessionIDs = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
|
|
?assert(lists:member(SessionID, SessionIDs)),
|
|
?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
|
{ok, _, _} = emqtt:unsubscribe(Client, STopic),
|
|
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
|
SessionIDs2 = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
|
|
?assert(not lists:member(SessionID, SessionIDs2));
|
|
false ->
|
|
?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
|
{ok, _, _} = emqtt:unsubscribe(Client, STopic),
|
|
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic])
|
|
end,
|
|
ok = emqtt:disconnect(Client).
|
|
|
|
t_multiple_subscription_matches(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic1 = ?config(stopic, Config),
|
|
STopic2 = ?config(stopic_alt, Config),
|
|
Payload = <<"test message">>,
|
|
ClientId = ?config(client_id, Config),
|
|
|
|
{ok, Client1} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
|
|
ok = emqtt:disconnect(Client1),
|
|
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
publish(Topic, Payload, Config),
|
|
|
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
|
{clean_start, false}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
|
|
%% We will receive the same message twice because it matches two subscriptions.
|
|
[Msg1, Msg2] = receive_messages(2),
|
|
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
|
|
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
|
|
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
|
|
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
|
|
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
|
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
|
|
ok = emqtt:disconnect(Client2).
|
|
|
|
t_lost_messages_because_of_gc(init, Config) ->
|
|
case (emqx_persistent_session:is_store_enabled()
|
|
andalso ?config(kill_connection_process, Config)) of
|
|
true ->
|
|
Retain = 1000,
|
|
OldRetain = emqx_config:get(?msg_retain, Retain),
|
|
emqx_config:put(?msg_retain, Retain),
|
|
[{retain, Retain}, {old_retain, OldRetain}|Config];
|
|
false -> {skip, only_relevant_with_store_and_kill_process}
|
|
end;
|
|
t_lost_messages_because_of_gc('end', Config) ->
|
|
OldRetain = ?config(old_retain, Config),
|
|
emqx_config:put(?msg_retain, OldRetain),
|
|
ok.
|
|
|
|
t_lost_messages_because_of_gc(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
Retain = ?config(retain, Config),
|
|
Payload1 = <<"hello1">>,
|
|
Payload2 = <<"hello2">>,
|
|
{ok, Client1} = emqtt:start_link([ {clientid, ClientId},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
emqtt:disconnect(Client1),
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
publish(Topic, Payload1, Config),
|
|
timer:sleep(2 * Retain),
|
|
publish(Topic, Payload2, Config),
|
|
emqx_persistent_session_gc:message_gc_worker(),
|
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
|
{clean_start, false},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
Msgs = receive_messages(2),
|
|
?assertMatch([_], Msgs),
|
|
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, hd(Msgs))),
|
|
emqtt:disconnect(Client2),
|
|
ok.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Snabbkaffe helpers
|
|
%%--------------------------------------------------------------------
|
|
|
|
check_snabbkaffe_vanilla(Trace) ->
|
|
ResumeTrace = [T || #{?snk_kind := K} = T <- Trace,
|
|
re:run(atom_to_list(K), "^ps_") /= nomatch],
|
|
?assertMatch([_|_], ResumeTrace),
|
|
[_Sid] = lists:usort(?projection(sid, ResumeTrace)),
|
|
%% Check internal flow of the emqx_cm resuming
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_resuming },
|
|
#{ ?snk_kind := ps_initial_pendings },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_initial_pendings },
|
|
#{ ?snk_kind := ps_persist_pendings },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_persist_pendings },
|
|
#{ ?snk_kind := ps_notify_writers },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers },
|
|
#{ ?snk_kind := ps_node_markers },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_node_markers },
|
|
#{ ?snk_kind := ps_resume_session },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_resume_session },
|
|
#{ ?snk_kind := ps_marker_pendings },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings },
|
|
#{ ?snk_kind := ps_marker_pendings_msgs },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings_msgs },
|
|
#{ ?snk_kind := ps_resume_end },
|
|
ResumeTrace)),
|
|
|
|
%% Check flow between worker and emqx_cm
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers },
|
|
#{ ?snk_kind := ps_worker_started },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings },
|
|
#{ ?snk_kind := ps_worker_resume_end },
|
|
ResumeTrace)),
|
|
?assert(?strict_causality(#{ ?snk_kind := ps_worker_resume_end },
|
|
#{ ?snk_kind := ps_worker_shutdown },
|
|
ResumeTrace)),
|
|
|
|
[Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)),
|
|
?assertMatch([_], Markers).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Snabbkaffe tests
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_snabbkaffe_vanilla_stages(Config) ->
|
|
%% Test that all stages of session resume works ok in the simplest case
|
|
process_flag(trap_exit, true),
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
EmqttOpts = [ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config],
|
|
{ok, Client1} = emqtt:start_link([{clean_start, true}|EmqttOpts]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
ok = emqtt:disconnect(Client1),
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
?check_trace(
|
|
begin
|
|
{ok, Client2} = emqtt:start_link([{clean_start, false}|EmqttOpts]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
ok = emqtt:disconnect(Client2)
|
|
end,
|
|
fun(ok, Trace) ->
|
|
check_snabbkaffe_vanilla(Trace)
|
|
end),
|
|
ok.
|
|
|
|
t_snabbkaffe_pending_messages(Config) ->
|
|
%% Make sure there are pending messages are fetched during the init stage.
|
|
process_flag(trap_exit, true),
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
Payloads = [<<"test", (integer_to_binary(X))/binary>> || X <- [1,2,3,4,5]],
|
|
EmqttOpts = [ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config],
|
|
{ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
ok = emqtt:disconnect(Client1),
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
?check_trace(
|
|
begin
|
|
snabbkaffe_sync_publish(Topic, Payloads, Config),
|
|
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
Msgs = receive_messages(length(Payloads)),
|
|
ReceivedPayloads = [P || #{ payload := P } <- Msgs],
|
|
?assertEqual(lists:sort(ReceivedPayloads), lists:sort(Payloads)),
|
|
ok = emqtt:disconnect(Client2)
|
|
end,
|
|
fun(ok, Trace) ->
|
|
check_snabbkaffe_vanilla(Trace),
|
|
%% Check that all messages was delivered from the DB
|
|
[Delivers1] = ?projection(msgs, ?of_kind(ps_persist_pendings_msgs, Trace)),
|
|
[Delivers2] = ?projection(msgs, ?of_kind(ps_marker_pendings_msgs, Trace)),
|
|
Delivers = Delivers1 ++ Delivers2,
|
|
?assertEqual(length(Payloads), length(Delivers)),
|
|
%% Check for no duplicates
|
|
?assertEqual(lists:usort(Delivers), lists:sort(Delivers))
|
|
end),
|
|
ok.
|
|
|
|
t_snabbkaffe_buffered_messages(Config) ->
|
|
%% Make sure to buffer messages during startup.
|
|
process_flag(trap_exit, true),
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientId = ?config(client_id, Config),
|
|
Topic = ?config(topic, Config),
|
|
STopic = ?config(stopic, Config),
|
|
Payloads1 = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3]],
|
|
Payloads2 = [<<"test", (integer_to_binary(X))/binary>> || X <- [4, 5, 6]],
|
|
EmqttOpts = [ {proto_ver, v5},
|
|
{clientid, ClientId},
|
|
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
| Config],
|
|
{ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
|
|
{ok, _} = emqtt:ConnFun(Client1),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
ok = emqtt:disconnect(Client1),
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
publish(Topic, Payloads1, Config),
|
|
|
|
?check_trace(
|
|
begin
|
|
%% Make the resume init phase wait until the first message is delivered.
|
|
?force_ordering( #{ ?snk_kind := ps_worker_deliver },
|
|
#{ ?snk_kind := ps_resume_end }),
|
|
spawn_link(fun() ->
|
|
?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000),
|
|
publish(Topic, Payloads2, Config)
|
|
end),
|
|
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1),
|
|
ReceivedPayloads = [P || #{ payload := P } <- Msgs],
|
|
?assertEqual(lists:sort(Payloads1 ++ Payloads2),
|
|
lists:sort(ReceivedPayloads)),
|
|
ok = emqtt:disconnect(Client2)
|
|
end,
|
|
fun(ok, Trace) ->
|
|
check_snabbkaffe_vanilla(Trace),
|
|
%% Check that some messages was buffered in the writer process
|
|
[Msgs] = ?projection(msgs, ?of_kind(ps_writer_pendings, Trace)),
|
|
?assertMatch(X when 0 < X andalso X =< length(Payloads2),
|
|
length(Msgs))
|
|
end),
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% GC tests
|
|
%%--------------------------------------------------------------------
|
|
|
|
-define(MARKER, 3).
|
|
-define(DELIVERED, 2).
|
|
-define(UNDELIVERED, 1).
|
|
-define(ABANDONED, 0).
|
|
|
|
msg_id() ->
|
|
emqx_guid:gen().
|
|
|
|
delivered_msg(MsgId, SessionID, STopic) ->
|
|
{SessionID, MsgId, STopic, ?DELIVERED}.
|
|
|
|
undelivered_msg(MsgId, SessionID, STopic) ->
|
|
{SessionID, MsgId, STopic, ?UNDELIVERED}.
|
|
|
|
marker_msg(MarkerID, SessionID) ->
|
|
{SessionID, MarkerID, <<>>, ?MARKER}.
|
|
|
|
guid(MicrosecondsAgo) ->
|
|
%% Make a fake GUID and set a timestamp.
|
|
<< TS:64, Tail:64 >> = emqx_guid:gen(),
|
|
<< (TS - MicrosecondsAgo) : 64, Tail:64 >>.
|
|
|
|
abandoned_session_msg(SessionID) ->
|
|
abandoned_session_msg(SessionID, 0).
|
|
|
|
abandoned_session_msg(SessionID, MicrosecondsAgo) ->
|
|
TS = erlang:system_time(microsecond),
|
|
{SessionID, <<>>, <<(TS - MicrosecondsAgo) : 64>>, ?ABANDONED}.
|
|
|
|
fresh_gc_delete_fun() ->
|
|
Ets = ets:new(gc_collect, [ordered_set]),
|
|
fun(delete, Key) -> ets:insert(Ets, {Key}), ok;
|
|
(collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List);
|
|
(_, _Key) -> ok
|
|
end.
|
|
|
|
fresh_gc_callbacks_fun() ->
|
|
Ets = ets:new(gc_collect, [ordered_set]),
|
|
fun(collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List);
|
|
(Tag, Key) -> ets:insert(Ets, {{Key, Tag}}), ok
|
|
end.
|
|
|
|
get_gc_delete_messages() ->
|
|
Fun = fresh_gc_delete_fun(),
|
|
emqx_persistent_session:gc_session_messages(Fun),
|
|
Fun(collect, <<>>).
|
|
|
|
get_gc_callbacks() ->
|
|
Fun = fresh_gc_callbacks_fun(),
|
|
emqx_persistent_session:gc_session_messages(Fun),
|
|
Fun(collect, <<>>).
|
|
|
|
|
|
t_gc_all_delivered(Config) ->
|
|
Store = ?config(session_msg_store, Config),
|
|
STopic = ?config(stopic, Config),
|
|
SessionId = emqx_guid:gen(),
|
|
MsgIds = [msg_id() || _ <- lists:seq(1, 5)],
|
|
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
SortedContent = lists:usort(Delivered ++ Undelivered),
|
|
ets:insert(Store, [{X, <<>>} || X <- SortedContent]),
|
|
GCMessages = get_gc_delete_messages(),
|
|
?assertEqual(SortedContent, GCMessages),
|
|
ok.
|
|
|
|
t_gc_some_undelivered(Config) ->
|
|
Store = ?config(session_msg_store, Config),
|
|
STopic = ?config(stopic, Config),
|
|
SessionId = emqx_guid:gen(),
|
|
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
|
|
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
{Delivered1,_Delivered2} = split(Delivered),
|
|
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
{Undelivered1, Undelivered2} = split(Undelivered),
|
|
Content = Delivered1 ++ Undelivered1 ++ Undelivered2,
|
|
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
Expected = lists:usort(Delivered1 ++ Undelivered1),
|
|
GCMessages = get_gc_delete_messages(),
|
|
?assertEqual(Expected, GCMessages),
|
|
ok.
|
|
|
|
t_gc_with_markers(Config) ->
|
|
Store = ?config(session_msg_store, Config),
|
|
STopic = ?config(stopic, Config),
|
|
SessionId = emqx_guid:gen(),
|
|
MsgIds1 = [msg_id() || _ <- lists:seq(1, 10)],
|
|
MarkerId = msg_id(),
|
|
MsgIds = [msg_id() || _ <- lists:seq(1, 4)] ++ MsgIds1,
|
|
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
{Delivered1,_Delivered2} = split(Delivered),
|
|
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
{Undelivered1, Undelivered2} = split(Undelivered),
|
|
Markers = [marker_msg(MarkerId, SessionId)],
|
|
Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ Markers,
|
|
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
Expected = lists:usort(Delivered1 ++ Undelivered1),
|
|
GCMessages = get_gc_delete_messages(),
|
|
?assertEqual(Expected, GCMessages),
|
|
ok.
|
|
|
|
t_gc_abandoned_some_undelivered(Config) ->
|
|
Store = ?config(session_msg_store, Config),
|
|
STopic = ?config(stopic, Config),
|
|
SessionId = emqx_guid:gen(),
|
|
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
|
|
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
{Delivered1,_Delivered2} = split(Delivered),
|
|
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
{Undelivered1, Undelivered2} = split(Undelivered),
|
|
Abandoned = abandoned_session_msg(SessionId),
|
|
Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ [Abandoned],
|
|
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
Expected = lists:usort(Delivered1 ++ Undelivered1 ++ Undelivered2),
|
|
GCMessages = get_gc_delete_messages(),
|
|
?assertEqual(Expected, GCMessages),
|
|
ok.
|
|
|
|
t_gc_abandoned_only_called_on_empty_session(Config) ->
|
|
Store = ?config(session_msg_store, Config),
|
|
STopic = ?config(stopic, Config),
|
|
SessionId = emqx_guid:gen(),
|
|
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
|
|
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
Abandoned = abandoned_session_msg(SessionId),
|
|
Content = Delivered ++ Undelivered ++ [Abandoned],
|
|
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
GCMessages = get_gc_callbacks(),
|
|
|
|
%% Since we had messages to delete, we don't expect to get the
|
|
%% callback on the abandoned session
|
|
?assertEqual([], [ X || {X, abandoned} <- GCMessages]),
|
|
|
|
%% But if we have only the abandoned session marker for this
|
|
%% session, it should be called.
|
|
ets:delete_all_objects(Store),
|
|
UndeliveredOtherSession = undelivered_msg(msg_id(), emqx_guid:gen(), <<"topic">>),
|
|
ets:insert(Store, [{X, <<>>} || X <- [Abandoned, UndeliveredOtherSession]]),
|
|
GCMessages2 = get_gc_callbacks(),
|
|
?assertEqual([Abandoned], [ X || {X, abandoned} <- GCMessages2]),
|
|
ok.
|
|
|
|
t_gc_session_gc_worker(init, Config) ->
|
|
meck:new(emqx_persistent_session, [passthrough, no_link]),
|
|
Config;
|
|
t_gc_session_gc_worker('end',_Config) ->
|
|
meck:unload(emqx_persistent_session),
|
|
ok.
|
|
|
|
t_gc_session_gc_worker(Config) ->
|
|
STopic = ?config(stopic, Config),
|
|
SessionID = emqx_guid:gen(),
|
|
MsgDeleted = delivered_msg(msg_id(), SessionID, STopic),
|
|
MarkerNotDeleted = marker_msg(msg_id(), SessionID),
|
|
MarkerDeleted = marker_msg(guid(120 * 1000 * 1000), SessionID),
|
|
AbandonedNotDeleted = abandoned_session_msg(SessionID),
|
|
AbandonedDeleted = abandoned_session_msg(SessionID, 500 * 1000 * 1000),
|
|
meck:expect(emqx_persistent_session, delete_session_message, fun(_Key) -> ok end),
|
|
emqx_persistent_session_gc:session_gc_worker(delete, MsgDeleted),
|
|
emqx_persistent_session_gc:session_gc_worker(marker, MarkerNotDeleted),
|
|
emqx_persistent_session_gc:session_gc_worker(marker, MarkerDeleted),
|
|
emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedDeleted),
|
|
emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedNotDeleted),
|
|
History = meck:history(emqx_persistent_session, self()),
|
|
DeleteCalls = [ Key || {_Pid, {_, delete_session_message, [Key]}, _Result}
|
|
<- History],
|
|
?assertEqual(lists:sort([MsgDeleted, AbandonedDeleted, MarkerDeleted]),
|
|
lists:sort(DeleteCalls)),
|
|
ok.
|
|
|
|
t_gc_message_gc(Config) ->
|
|
Topic = ?config(topic, Config),
|
|
ClientID = ?config(client_id, Config),
|
|
Store = ?config(msg_store, Config),
|
|
NewMsgs = [emqx_message:make(ClientID, Topic, integer_to_binary(P))
|
|
|| P <- lists:seq(6, 10)],
|
|
Retain = 60 * 1000,
|
|
emqx_config:put(?msg_retain, Retain),
|
|
Msgs1 = [emqx_message:make(ClientID, Topic, integer_to_binary(P))
|
|
|| P <- lists:seq(1, 5)],
|
|
OldMsgs = [M#message{id = guid(Retain*1000)} || M <- Msgs1],
|
|
ets:insert(Store, NewMsgs ++ OldMsgs),
|
|
?assertEqual(lists:sort(OldMsgs ++ NewMsgs), ets:tab2list(Store)),
|
|
ok = emqx_persistent_session_gc:message_gc_worker(),
|
|
?assertEqual(lists:sort(NewMsgs), ets:tab2list(Store)),
|
|
ok.
|
|
|
|
split(List) ->
|
|
split(List, [], []).
|
|
|
|
split([], L1, L2) ->
|
|
{L1, L2};
|
|
split([H], L1, L2) ->
|
|
{[H|L1], L2};
|
|
split([H1, H2|Left], L1, L2) ->
|
|
split(Left, [H1|L1], [H2|L2]).
|
|
|