test(ds): add tests for error mapping and replay recovery

This commit is contained in:
Andrew Mayorov 2024-03-07 12:47:05 +01:00
parent e7e8771277
commit 69427dc42d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 251 additions and 2 deletions

View File

@ -83,6 +83,27 @@
end)()
).
-define(assertMatchOneOf(PAT1, PAT2, EXPR),
(fun() ->
case (X__V = (EXPR)) of
PAT1 ->
X__V;
PAT2 ->
X__V;
_ ->
erlang:error(
{assertMatch, [
{module, ?MODULE},
{line, ?LINE},
{expression, (??EXPR)},
{pattern, "one of [ " ++ (??PAT1) ++ ", " ++ (??PAT2) ++ " ]"},
{value, X__V}
]}
)
end
end)()
).
-define(assertExceptionOneOf(CT1, CT2, EXPR),
(fun() ->
X__Attrs = [

View File

@ -118,7 +118,6 @@ app_specs() ->
app_specs(Opts) ->
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
emqx_durable_storage,
{emqx, "session_persistence = {enable = true}" ++ ExtraEMQXConf}
].
@ -154,6 +153,14 @@ start_client(Opts0 = #{}) ->
on_exit(fun() -> catch emqtt:stop(Client) end),
Client.
start_connect_client(Opts = #{}) ->
Client = start_client(Opts),
?assertMatch({ok, _}, emqtt:connect(Client)),
Client.
mk_clientid(Prefix, ID) ->
iolist_to_binary(io_lib:format("~p/~p", [Prefix, ID])).
restart_node(Node, NodeSpec) ->
?tp(will_restart_node, #{}),
emqx_cth_cluster:restart(Node, NodeSpec),
@ -599,3 +606,66 @@ t_session_gc(Config) ->
[]
),
ok.
t_session_replay_retry(_Config) ->
%% Verify that the session recovers smoothly from transient errors during
%% replay.
ok = emqx_ds_test_helpers:mock_rpc(),
NClients = 10,
ClientSubOpts = #{
clientid => mk_clientid(?FUNCTION_NAME, sub),
auto_ack => never
},
ClientSub = start_connect_client(ClientSubOpts),
?assertMatch(
{ok, _, [?RC_GRANTED_QOS_1]},
emqtt:subscribe(ClientSub, <<"t/#">>, ?QOS_1)
),
ClientsPub = [
start_connect_client(#{
clientid => mk_clientid(?FUNCTION_NAME, I),
properties => #{'Session-Expiry-Interval' => 0}
})
|| I <- lists:seq(1, NClients)
],
lists:foreach(
fun(Client) ->
Index = integer_to_binary(rand:uniform(NClients)),
Topic = <<"t/", Index/binary>>,
?assertMatch({ok, #{}}, emqtt:publish(Client, Topic, Index, 1))
end,
ClientsPub
),
Pubs0 = emqx_common_test_helpers:wait_publishes(NClients, 5_000),
NPubs = length(Pubs0),
?assertEqual(NClients, NPubs, ?drainMailbox()),
ok = emqtt:stop(ClientSub),
%% Make `emqx_ds` believe that roughly half of the shards are unavailable.
ok = emqx_ds_test_helpers:mock_rpc_result(
fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
case erlang:phash2(Shard) rem 2 of
0 -> unavailable;
1 -> passthrough
end
end
),
_ClientSub = start_connect_client(ClientSubOpts#{clean_start => false}),
Pubs1 = emqx_common_test_helpers:wait_publishes(NPubs, 5_000),
?assert(length(Pubs1) < length(Pubs0), Pubs1),
%% "Recover" the shards.
emqx_ds_test_helpers:unmock_rpc(),
Pubs2 = emqx_common_test_helpers:wait_publishes(NPubs - length(Pubs1), 5_000),
?assertEqual(
[maps:with([topic, payload, qos], P) || P <- Pubs0],
[maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2]
).

View File

@ -61,6 +61,7 @@
read_schema_configs/2,
render_config_file/2,
wait_for/4,
wait_publishes/2,
wait_mqtt_payload/1,
select_free_port/1
]).
@ -426,6 +427,16 @@ wait_for(Fn, Ln, F, Timeout) ->
{Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
wait_publishes(0, _Timeout) ->
[];
wait_publishes(Count, Timeout) ->
receive
{publish, Msg} ->
[Msg | wait_publishes(Count - 1, Timeout)]
after Timeout ->
[]
end.
flush() ->
flush([]).

View File

@ -21,6 +21,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(N_SHARDS, 1).
@ -553,9 +554,93 @@ t_get_streams_concurrently_with_drop_generation(_Config) ->
ok
end,
[]
).
t_error_mapping_replication_layer(_Config) ->
%% This checks that the replication layer maps recoverable errors correctly.
ok = emqx_ds_test_helpers:mock_rpc(),
ok = snabbkaffe:start_trace(),
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
[Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
TopicFilter = emqx_topic:words(<<"foo/#">>),
Msgs = [
message(<<"C1">>, <<"foo/bar">>, <<"1">>, 0),
message(<<"C1">>, <<"foo/baz">>, <<"2">>, 1),
message(<<"C2">>, <<"foo/foo">>, <<"3">>, 2),
message(<<"C3">>, <<"foo/xyz">>, <<"4">>, 3),
message(<<"C4">>, <<"foo/bar">>, <<"5">>, 4),
message(<<"C5">>, <<"foo/oof">>, <<"6">>, 5)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}),
?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}),
Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0),
Iterators0 = lists:map(
fun({_Rank, S}) ->
{ok, Iter} = emqx_ds:make_iterator(DB, S, TopicFilter, 0),
Iter
end,
Streams0
),
ok.
%% Disrupt the link to the second shard.
ok = emqx_ds_test_helpers:mock_rpc_result(
fun(_Node, emqx_ds_replication_layer, _Function, Args) ->
case Args of
[DB, Shard1 | _] -> passthrough;
[DB, Shard2 | _] -> unavailable
end
end
),
%% Result of `emqx_ds:get_streams/3` will just contain partial results, not an error.
Streams1 = emqx_ds:get_streams(DB, TopicFilter, 0),
?assert(
length(Streams1) > 0 andalso length(Streams1) =< length(Streams0),
Streams1
),
%% At least one of `emqx_ds:make_iterator/4` will end in an error.
Results1 = lists:map(
fun({_Rank, S}) ->
?assertMatchOneOf(
{ok, _Iter},
{error, recoverable, {erpc, _}},
emqx_ds:make_iterator(DB, S, TopicFilter, 0)
)
end,
Streams0
),
?assert(
length([error || {error, _, _} <- Results1]) > 0,
Results1
),
%% At least one of `emqx_ds:next/3` over initial set of iterators will end in an error.
Results2 = lists:map(
fun(Iter) ->
?assertMatchOneOf(
{ok, _Iter, [_ | _]},
{error, recoverable, {badrpc, _}},
emqx_ds:next(DB, Iter, _BatchSize = 42)
)
end,
Iterators0
),
?assert(
length([error || {error, _, _} <- Results2]) > 0,
Results2
),
snabbkaffe:stop(),
meck:unload().
update_data_set() ->
[
@ -591,6 +676,10 @@ fetch_all(DB, TopicFilter, StartTime) ->
Streams
).
message(ClientId, Topic, Payload, PublishedAt) ->
Msg = message(Topic, Payload, PublishedAt),
Msg#message{from = ClientId}.
message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,

View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_test_helpers).
-compile(export_all).
-compile(nowarn_export_all).
%% RPC mocking
mock_rpc() ->
ok = meck:new(erpc, [passthrough, no_history, unstick]),
ok = meck:new(gen_rpc, [passthrough, no_history]).
unmock_rpc() ->
catch meck:unload(erpc),
catch meck:unload(gen_rpc).
mock_rpc_result(ExpectFun) ->
mock_rpc_result(erpc, ExpectFun),
mock_rpc_result(gen_rpc, ExpectFun).
mock_rpc_result(erpc, ExpectFun) ->
ok = meck:expect(erpc, call, fun(Node, Mod, Function, Args) ->
case ExpectFun(Node, Mod, Function, Args) of
passthrough ->
meck:passthrough([Node, Mod, Function, Args]);
unavailable ->
meck:exception(error, {erpc, noconnection});
{timeout, Timeout} ->
ok = timer:sleep(Timeout),
meck:exception(error, {erpc, timeout})
end
end);
mock_rpc_result(gen_rpc, ExpectFun) ->
ok = meck:expect(gen_rpc, call, fun(Dest = {Node, _}, Mod, Function, Args) ->
case ExpectFun(Node, Mod, Function, Args) of
passthrough ->
meck:passthrough([Dest, Mod, Function, Args]);
unavailable ->
{badtcp, econnrefused};
{timeout, Timeout} ->
ok = timer:sleep(Timeout),
{badrpc, timeout}
end
end).