diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 941e20641..1b2a21105 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -25,8 +25,8 @@ -define(DB, testdb). --define(ON(NODE, BODY), - erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +-define(ON(NODES, BODY), + emqx_ds_test_helpers:on(NODES, fun() -> BODY end) ). opts() -> @@ -476,6 +476,84 @@ t_rebalance_offline_restarts(Config) -> ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). +t_drop_generation(Config) -> + Apps = [appspec(emqx_durable_storage)], + [_, _, NS3] = + NodeSpecs = emqx_cth_cluster:mk_nodespecs( + [ + {t_drop_generation1, #{apps => Apps}}, + {t_drop_generation2, #{apps => Apps}}, + {t_drop_generation3, #{apps => Apps}} + ], + #{ + work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) + } + ), + + Nodes = [N1, _, N3] = emqx_cth_cluster:start(NodeSpecs), + ?check_trace( + try + %% Initialize DB on all 3 nodes. + Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + timer:sleep(1000), + %% Create a generation while all nodes are online: + ?ON(N1, ?assertMatch(ok, emqx_ds:add_generation(?DB))), + ?ON( + Nodes, + ?assertEqual( + [{<<"0">>, 1}, {<<"0">>, 2}], + maps:keys(emqx_ds:list_generations_with_lifetimes(?DB)) + ) + ), + %% Drop generation while all nodes are online: + ?ON(N1, ?assertMatch(ok, emqx_ds:drop_generation(?DB, {<<"0">>, 1}))), + ?ON( + Nodes, + ?assertEqual( + [{<<"0">>, 2}], + maps:keys(emqx_ds:list_generations_with_lifetimes(?DB)) + ) + ), + %% Ston N3, then create and drop generation when it's offline: + ok = emqx_cth_cluster:stop_node(N3), + ?ON( + N1, + begin + ok = emqx_ds:add_generation(?DB), + ok = emqx_ds:drop_generation(?DB, {<<"0">>, 2}) + end + ), + %% Restart N3 and verify that it reached the consistent state: + emqx_cth_cluster:restart(NS3), + ok = ?ON(N3, emqx_ds:open_db(?DB, Opts)), + %% N3 can be in unstalbe state right now, but it still + %% must successfully return streams: + ?ON( + Nodes, + ?assertEqual([], emqx_ds:get_streams(?DB, ['#'], 0)) + ), + timer:sleep(1000), + ?ON( + Nodes, + ?assertEqual( + [{<<"0">>, 3}], + maps:keys(emqx_ds:list_generations_with_lifetimes(?DB)) + ) + ) + after + emqx_cth_cluster:stop(Nodes) + end, + fun(Trace) -> + %% TODO: some idempotency errors still happen + %% ?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)), + true + end + ). + %% shard_server_info(Node, DB, Shard, Site, Info) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index f3ad1c151..4fed1d57b 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -23,9 +23,31 @@ -include_lib("stdlib/include/assert.hrl"). -define(ON(NODE, BODY), - erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) + emqx_ds_test_helpers:on(NODE, fun() -> BODY end) ). +-spec on([node()] | node(), fun(() -> A)) -> A | [A]. +on(Node, Fun) when is_atom(Node) -> + [Ret] = on([Node], Fun), + Ret; +on(Nodes, Fun) -> + Results = erpc:multicall(Nodes, erlang, apply, [Fun, []]), + lists:map( + fun + ({_Node, {ok, Result}}) -> + Result; + ({Node, Error}) -> + ct:pal("Error on node ~p", [Node]), + case Error of + {error, {exception, Reason, Stack}} -> + erlang:raise(error, Reason, Stack); + _ -> + error(Error) + end + end, + lists:zip(Nodes, Results) + ). + %% RPC mocking mock_rpc() ->