test(ds): Add emqx_ds_replication_SUITE:t_drop_generation
This commit is contained in:
parent
3a4b8e6c24
commit
3d9837a0b8
|
@ -25,8 +25,8 @@
|
|||
|
||||
-define(DB, testdb).
|
||||
|
||||
-define(ON(NODE, BODY),
|
||||
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
|
||||
-define(ON(NODES, BODY),
|
||||
emqx_ds_test_helpers:on(NODES, fun() -> BODY end)
|
||||
).
|
||||
|
||||
opts() ->
|
||||
|
@ -476,6 +476,84 @@ t_rebalance_offline_restarts(Config) ->
|
|||
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
|
||||
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
|
||||
|
||||
t_drop_generation(Config) ->
|
||||
Apps = [appspec(emqx_durable_storage)],
|
||||
[_, _, NS3] =
|
||||
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
|
||||
[
|
||||
{t_drop_generation1, #{apps => Apps}},
|
||||
{t_drop_generation2, #{apps => Apps}},
|
||||
{t_drop_generation3, #{apps => Apps}}
|
||||
],
|
||||
#{
|
||||
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
|
||||
}
|
||||
),
|
||||
|
||||
Nodes = [N1, _, N3] = emqx_cth_cluster:start(NodeSpecs),
|
||||
?check_trace(
|
||||
try
|
||||
%% Initialize DB on all 3 nodes.
|
||||
Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}),
|
||||
?assertEqual(
|
||||
[{ok, ok} || _ <- Nodes],
|
||||
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
|
||||
),
|
||||
timer:sleep(1000),
|
||||
%% Create a generation while all nodes are online:
|
||||
?ON(N1, ?assertMatch(ok, emqx_ds:add_generation(?DB))),
|
||||
?ON(
|
||||
Nodes,
|
||||
?assertEqual(
|
||||
[{<<"0">>, 1}, {<<"0">>, 2}],
|
||||
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
|
||||
)
|
||||
),
|
||||
%% Drop generation while all nodes are online:
|
||||
?ON(N1, ?assertMatch(ok, emqx_ds:drop_generation(?DB, {<<"0">>, 1}))),
|
||||
?ON(
|
||||
Nodes,
|
||||
?assertEqual(
|
||||
[{<<"0">>, 2}],
|
||||
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
|
||||
)
|
||||
),
|
||||
%% Ston N3, then create and drop generation when it's offline:
|
||||
ok = emqx_cth_cluster:stop_node(N3),
|
||||
?ON(
|
||||
N1,
|
||||
begin
|
||||
ok = emqx_ds:add_generation(?DB),
|
||||
ok = emqx_ds:drop_generation(?DB, {<<"0">>, 2})
|
||||
end
|
||||
),
|
||||
%% Restart N3 and verify that it reached the consistent state:
|
||||
emqx_cth_cluster:restart(NS3),
|
||||
ok = ?ON(N3, emqx_ds:open_db(?DB, Opts)),
|
||||
%% N3 can be in unstalbe state right now, but it still
|
||||
%% must successfully return streams:
|
||||
?ON(
|
||||
Nodes,
|
||||
?assertEqual([], emqx_ds:get_streams(?DB, ['#'], 0))
|
||||
),
|
||||
timer:sleep(1000),
|
||||
?ON(
|
||||
Nodes,
|
||||
?assertEqual(
|
||||
[{<<"0">>, 3}],
|
||||
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
|
||||
)
|
||||
)
|
||||
after
|
||||
emqx_cth_cluster:stop(Nodes)
|
||||
end,
|
||||
fun(Trace) ->
|
||||
%% TODO: some idempotency errors still happen
|
||||
%% ?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)),
|
||||
true
|
||||
end
|
||||
).
|
||||
|
||||
%%
|
||||
|
||||
shard_server_info(Node, DB, Shard, Site, Info) ->
|
||||
|
|
|
@ -23,9 +23,31 @@
|
|||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
-define(ON(NODE, BODY),
|
||||
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
|
||||
emqx_ds_test_helpers:on(NODE, fun() -> BODY end)
|
||||
).
|
||||
|
||||
-spec on([node()] | node(), fun(() -> A)) -> A | [A].
|
||||
on(Node, Fun) when is_atom(Node) ->
|
||||
[Ret] = on([Node], Fun),
|
||||
Ret;
|
||||
on(Nodes, Fun) ->
|
||||
Results = erpc:multicall(Nodes, erlang, apply, [Fun, []]),
|
||||
lists:map(
|
||||
fun
|
||||
({_Node, {ok, Result}}) ->
|
||||
Result;
|
||||
({Node, Error}) ->
|
||||
ct:pal("Error on node ~p", [Node]),
|
||||
case Error of
|
||||
{error, {exception, Reason, Stack}} ->
|
||||
erlang:raise(error, Reason, Stack);
|
||||
_ ->
|
||||
error(Error)
|
||||
end
|
||||
end,
|
||||
lists:zip(Nodes, Results)
|
||||
).
|
||||
|
||||
%% RPC mocking
|
||||
|
||||
mock_rpc() ->
|
||||
|
|
Loading…
Reference in New Issue