diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index a200394e4..5744fdbf3 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -83,6 +83,27 @@ end)() ). +-define(assertMatchOneOf(PAT1, PAT2, EXPR), + (fun() -> + case (X__V = (EXPR)) of + PAT1 -> + X__V; + PAT2 -> + X__V; + _ -> + erlang:error( + {assertMatch, [ + {module, ?MODULE}, + {line, ?LINE}, + {expression, (??EXPR)}, + {pattern, "one of [ " ++ (??PAT1) ++ ", " ++ (??PAT2) ++ " ]"}, + {value, X__V} + ]} + ) + end + end)() +). + -define(assertExceptionOneOf(CT1, CT2, EXPR), (fun() -> X__Attrs = [ diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index c0631e7ab..c928f10da 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -118,7 +118,6 @@ app_specs() -> app_specs(Opts) -> ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""), [ - emqx_durable_storage, {emqx, "session_persistence = {enable = true}" ++ ExtraEMQXConf} ]. @@ -154,6 +153,14 @@ start_client(Opts0 = #{}) -> on_exit(fun() -> catch emqtt:stop(Client) end), Client. +start_connect_client(Opts = #{}) -> + Client = start_client(Opts), + ?assertMatch({ok, _}, emqtt:connect(Client)), + Client. + +mk_clientid(Prefix, ID) -> + iolist_to_binary(io_lib:format("~p/~p", [Prefix, ID])). + restart_node(Node, NodeSpec) -> ?tp(will_restart_node, #{}), emqx_cth_cluster:restart(Node, NodeSpec), @@ -599,3 +606,66 @@ t_session_gc(Config) -> [] ), ok. + +t_session_replay_retry(_Config) -> + %% Verify that the session recovers smoothly from transient errors during + %% replay. + + ok = emqx_ds_test_helpers:mock_rpc(), + + NClients = 10, + ClientSubOpts = #{ + clientid => mk_clientid(?FUNCTION_NAME, sub), + auto_ack => never + }, + ClientSub = start_connect_client(ClientSubOpts), + ?assertMatch( + {ok, _, [?RC_GRANTED_QOS_1]}, + emqtt:subscribe(ClientSub, <<"t/#">>, ?QOS_1) + ), + + ClientsPub = [ + start_connect_client(#{ + clientid => mk_clientid(?FUNCTION_NAME, I), + properties => #{'Session-Expiry-Interval' => 0} + }) + || I <- lists:seq(1, NClients) + ], + lists:foreach( + fun(Client) -> + Index = integer_to_binary(rand:uniform(NClients)), + Topic = <<"t/", Index/binary>>, + ?assertMatch({ok, #{}}, emqtt:publish(Client, Topic, Index, 1)) + end, + ClientsPub + ), + + Pubs0 = emqx_common_test_helpers:wait_publishes(NClients, 5_000), + NPubs = length(Pubs0), + ?assertEqual(NClients, NPubs, ?drainMailbox()), + + ok = emqtt:stop(ClientSub), + + %% Make `emqx_ds` believe that roughly half of the shards are unavailable. + ok = emqx_ds_test_helpers:mock_rpc_result( + fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) -> + case erlang:phash2(Shard) rem 2 of + 0 -> unavailable; + 1 -> passthrough + end + end + ), + + _ClientSub = start_connect_client(ClientSubOpts#{clean_start => false}), + + Pubs1 = emqx_common_test_helpers:wait_publishes(NPubs, 5_000), + ?assert(length(Pubs1) < length(Pubs0), Pubs1), + + %% "Recover" the shards. + emqx_ds_test_helpers:unmock_rpc(), + + Pubs2 = emqx_common_test_helpers:wait_publishes(NPubs - length(Pubs1), 5_000), + ?assertEqual( + [maps:with([topic, payload, qos], P) || P <- Pubs0], + [maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2] + ). diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index a383e0b2c..7a25e925d 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -61,6 +61,7 @@ read_schema_configs/2, render_config_file/2, wait_for/4, + wait_publishes/2, wait_mqtt_payload/1, select_free_port/1 ]). @@ -426,6 +427,16 @@ wait_for(Fn, Ln, F, Timeout) -> {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). +wait_publishes(0, _Timeout) -> + []; +wait_publishes(Count, Timeout) -> + receive + {publish, Msg} -> + [Msg | wait_publishes(Count - 1, Timeout)] + after Timeout -> + [] + end. + flush() -> flush([]). diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index b491657b0..002f5c557 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(N_SHARDS, 1). @@ -553,9 +554,93 @@ t_get_streams_concurrently_with_drop_generation(_Config) -> ok end, [] + ). + +t_error_mapping_replication_layer(_Config) -> + %% This checks that the replication layer maps recoverable errors correctly. + + ok = emqx_ds_test_helpers:mock_rpc(), + ok = snabbkaffe:start_trace(), + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB), + + TopicFilter = emqx_topic:words(<<"foo/#">>), + Msgs = [ + message(<<"C1">>, <<"foo/bar">>, <<"1">>, 0), + message(<<"C1">>, <<"foo/baz">>, <<"2">>, 1), + message(<<"C2">>, <<"foo/foo">>, <<"3">>, 2), + message(<<"C3">>, <<"foo/xyz">>, <<"4">>, 3), + message(<<"C4">>, <<"foo/bar">>, <<"5">>, 4), + message(<<"C5">>, <<"foo/oof">>, <<"6">>, 5) + ], + + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + + ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}), + ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}), + + Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0), + Iterators0 = lists:map( + fun({_Rank, S}) -> + {ok, Iter} = emqx_ds:make_iterator(DB, S, TopicFilter, 0), + Iter + end, + Streams0 ), - ok. + %% Disrupt the link to the second shard. + ok = emqx_ds_test_helpers:mock_rpc_result( + fun(_Node, emqx_ds_replication_layer, _Function, Args) -> + case Args of + [DB, Shard1 | _] -> passthrough; + [DB, Shard2 | _] -> unavailable + end + end + ), + + %% Result of `emqx_ds:get_streams/3` will just contain partial results, not an error. + Streams1 = emqx_ds:get_streams(DB, TopicFilter, 0), + ?assert( + length(Streams1) > 0 andalso length(Streams1) =< length(Streams0), + Streams1 + ), + + %% At least one of `emqx_ds:make_iterator/4` will end in an error. + Results1 = lists:map( + fun({_Rank, S}) -> + ?assertMatchOneOf( + {ok, _Iter}, + {error, recoverable, {erpc, _}}, + emqx_ds:make_iterator(DB, S, TopicFilter, 0) + ) + end, + Streams0 + ), + ?assert( + length([error || {error, _, _} <- Results1]) > 0, + Results1 + ), + + %% At least one of `emqx_ds:next/3` over initial set of iterators will end in an error. + Results2 = lists:map( + fun(Iter) -> + ?assertMatchOneOf( + {ok, _Iter, [_ | _]}, + {error, recoverable, {badrpc, _}}, + emqx_ds:next(DB, Iter, _BatchSize = 42) + ) + end, + Iterators0 + ), + ?assert( + length([error || {error, _, _} <- Results2]) > 0, + Results2 + ), + + snabbkaffe:stop(), + meck:unload(). update_data_set() -> [ @@ -591,6 +676,10 @@ fetch_all(DB, TopicFilter, StartTime) -> Streams ). +message(ClientId, Topic, Payload, PublishedAt) -> + Msg = message(Topic, Payload, PublishedAt), + Msg#message{from = ClientId}. + message(Topic, Payload, PublishedAt) -> #message{ topic = Topic, diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl new file mode 100644 index 000000000..d26c6dd30 --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_test_helpers). + +-compile(export_all). +-compile(nowarn_export_all). + +%% RPC mocking + +mock_rpc() -> + ok = meck:new(erpc, [passthrough, no_history, unstick]), + ok = meck:new(gen_rpc, [passthrough, no_history]). + +unmock_rpc() -> + catch meck:unload(erpc), + catch meck:unload(gen_rpc). + +mock_rpc_result(ExpectFun) -> + mock_rpc_result(erpc, ExpectFun), + mock_rpc_result(gen_rpc, ExpectFun). + +mock_rpc_result(erpc, ExpectFun) -> + ok = meck:expect(erpc, call, fun(Node, Mod, Function, Args) -> + case ExpectFun(Node, Mod, Function, Args) of + passthrough -> + meck:passthrough([Node, Mod, Function, Args]); + unavailable -> + meck:exception(error, {erpc, noconnection}); + {timeout, Timeout} -> + ok = timer:sleep(Timeout), + meck:exception(error, {erpc, timeout}) + end + end); +mock_rpc_result(gen_rpc, ExpectFun) -> + ok = meck:expect(gen_rpc, call, fun(Dest = {Node, _}, Mod, Function, Args) -> + case ExpectFun(Node, Mod, Function, Args) of + passthrough -> + meck:passthrough([Dest, Mod, Function, Args]); + unavailable -> + {badtcp, econnrefused}; + {timeout, Timeout} -> + ok = timer:sleep(Timeout), + {badrpc, timeout} + end + end).