Compare commits
4 Commits
master
...
dev/fix-dr
Author | SHA1 | Date |
---|---|---|
![]() |
66ec2e6ad0 | |
![]() |
53620e8439 | |
![]() |
a7259bc35d | |
![]() |
0d234d6f37 |
|
@ -566,7 +566,11 @@ list_nodes() ->
|
||||||
EXPR
|
EXPR
|
||||||
catch
|
catch
|
||||||
error:RPCError__ = {erpc, _} ->
|
error:RPCError__ = {erpc, _} ->
|
||||||
{error, recoverable, RPCError__}
|
{error, recoverable, RPCError__};
|
||||||
|
%% Note: remote node never _throws_ unrecoverable errors, so
|
||||||
|
%% we can assume that all exceptions are transient.
|
||||||
|
EC__:RPCError__:Stack__ ->
|
||||||
|
{error, recoverable, #{EC__ => RPCError__, stacktrace => Stack__}}
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -605,13 +609,7 @@ ra_add_generation(DB, Shard) ->
|
||||||
?tag => add_generation,
|
?tag => add_generation,
|
||||||
?since => emqx_ds:timestamp_us()
|
?since => emqx_ds:timestamp_us()
|
||||||
},
|
},
|
||||||
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
|
ra_command(DB, Shard, Command, 10).
|
||||||
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
|
|
||||||
{ok, Result, _Leader} ->
|
|
||||||
Result;
|
|
||||||
Error ->
|
|
||||||
error(Error, [DB, Shard])
|
|
||||||
end.
|
|
||||||
|
|
||||||
ra_update_config(DB, Shard, Opts) ->
|
ra_update_config(DB, Shard, Opts) ->
|
||||||
Command = #{
|
Command = #{
|
||||||
|
@ -619,20 +617,20 @@ ra_update_config(DB, Shard, Opts) ->
|
||||||
?config => Opts,
|
?config => Opts,
|
||||||
?since => emqx_ds:timestamp_us()
|
?since => emqx_ds:timestamp_us()
|
||||||
},
|
},
|
||||||
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
|
ra_command(DB, Shard, Command, 10).
|
||||||
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
|
|
||||||
{ok, Result, _Leader} ->
|
|
||||||
Result;
|
|
||||||
Error ->
|
|
||||||
error(Error, [DB, Shard])
|
|
||||||
end.
|
|
||||||
|
|
||||||
ra_drop_generation(DB, Shard, GenId) ->
|
ra_drop_generation(DB, Shard, GenId) ->
|
||||||
Command = #{?tag => drop_generation, ?generation => GenId},
|
Command = #{?tag => drop_generation, ?generation => GenId},
|
||||||
|
ra_command(DB, Shard, Command, 10).
|
||||||
|
|
||||||
|
ra_command(DB, Shard, Command, Retries) ->
|
||||||
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
|
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
|
||||||
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
|
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
|
||||||
{ok, Result, _Leader} ->
|
{ok, Result, _Leader} ->
|
||||||
Result;
|
Result;
|
||||||
|
_Error when Retries > 0 ->
|
||||||
|
timer:sleep(?RA_TIMEOUT),
|
||||||
|
ra_command(DB, Shard, Command, Retries - 1);
|
||||||
Error ->
|
Error ->
|
||||||
error(Error, [DB, Shard])
|
error(Error, [DB, Shard])
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -591,6 +591,7 @@ init({ShardId, Options}) ->
|
||||||
shard = Shard
|
shard = Shard
|
||||||
},
|
},
|
||||||
commit_metadata(S),
|
commit_metadata(S),
|
||||||
|
?tp(debug, ds_storage_init_state, #{shard => ShardId, s => S}),
|
||||||
{ok, S}.
|
{ok, S}.
|
||||||
|
|
||||||
format_status(Status) ->
|
format_status(Status) ->
|
||||||
|
@ -625,7 +626,6 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
|
||||||
{reply, Generations, S};
|
{reply, Generations, S};
|
||||||
handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
|
handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
|
||||||
{Reply, S} = handle_drop_generation(S0, GenId),
|
{Reply, S} = handle_drop_generation(S0, GenId),
|
||||||
commit_metadata(S),
|
|
||||||
{reply, Reply, S};
|
{reply, Reply, S};
|
||||||
handle_call(#call_take_snapshot{}, _From, S) ->
|
handle_call(#call_take_snapshot{}, _From, S) ->
|
||||||
Snapshot = handle_take_snapshot(S),
|
Snapshot = handle_take_snapshot(S),
|
||||||
|
@ -774,6 +774,21 @@ handle_drop_generation(S0, GenId) ->
|
||||||
shard = OldShard,
|
shard = OldShard,
|
||||||
cf_refs = OldCFRefs
|
cf_refs = OldCFRefs
|
||||||
} = S0,
|
} = S0,
|
||||||
|
%% 1. Commit the metadata first, so other functions are less
|
||||||
|
%% likely to see stale data, and replicas don't end up
|
||||||
|
%% inconsistent state, where generation's column families are
|
||||||
|
%% absent, but its metadata is still present.
|
||||||
|
%%
|
||||||
|
%% Note: in theory, this operation may be interrupted in the
|
||||||
|
%% middle. This will leave column families hanging.
|
||||||
|
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
|
||||||
|
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
|
||||||
|
S1 = S0#s{
|
||||||
|
shard = Shard,
|
||||||
|
schema = Schema
|
||||||
|
},
|
||||||
|
commit_metadata(S1),
|
||||||
|
%% 2. Now, actually drop the data from RocksDB:
|
||||||
#{module := Mod, cf_refs := GenCFRefs} = GenSchema,
|
#{module := Mod, cf_refs := GenCFRefs} = GenSchema,
|
||||||
#{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
|
#{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
|
||||||
try
|
try
|
||||||
|
@ -793,13 +808,7 @@ handle_drop_generation(S0, GenId) ->
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
CFRefs = OldCFRefs -- GenCFRefs,
|
CFRefs = OldCFRefs -- GenCFRefs,
|
||||||
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
|
S = S1#s{cf_refs = CFRefs},
|
||||||
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
|
|
||||||
S = S0#s{
|
|
||||||
cf_refs = CFRefs,
|
|
||||||
shard = Shard,
|
|
||||||
schema = Schema
|
|
||||||
},
|
|
||||||
{ok, S}.
|
{ok, S}.
|
||||||
|
|
||||||
-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
|
-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
|
||||||
|
|
|
@ -25,8 +25,8 @@
|
||||||
|
|
||||||
-define(DB, testdb).
|
-define(DB, testdb).
|
||||||
|
|
||||||
-define(ON(NODE, BODY),
|
-define(ON(NODES, BODY),
|
||||||
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
|
emqx_ds_test_helpers:on(NODES, fun() -> BODY end)
|
||||||
).
|
).
|
||||||
|
|
||||||
opts() ->
|
opts() ->
|
||||||
|
@ -476,6 +476,83 @@ t_rebalance_offline_restarts(Config) ->
|
||||||
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
|
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
|
||||||
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
|
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
|
||||||
|
|
||||||
|
t_drop_generation(Config) ->
|
||||||
|
Apps = [appspec(emqx_durable_storage)],
|
||||||
|
[_, _, NS3] =
|
||||||
|
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
|
||||||
|
[
|
||||||
|
{t_drop_generation1, #{apps => Apps}},
|
||||||
|
{t_drop_generation2, #{apps => Apps}},
|
||||||
|
{t_drop_generation3, #{apps => Apps}}
|
||||||
|
],
|
||||||
|
#{
|
||||||
|
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
|
||||||
|
}
|
||||||
|
),
|
||||||
|
|
||||||
|
Nodes = [N1, _, N3] = emqx_cth_cluster:start(NodeSpecs),
|
||||||
|
?check_trace(
|
||||||
|
try
|
||||||
|
%% Initialize DB on all 3 nodes.
|
||||||
|
Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}),
|
||||||
|
?assertEqual(
|
||||||
|
[{ok, ok} || _ <- Nodes],
|
||||||
|
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
|
||||||
|
),
|
||||||
|
timer:sleep(1000),
|
||||||
|
%% Create a generation while all nodes are online:
|
||||||
|
?ON(N1, ?assertMatch(ok, emqx_ds:add_generation(?DB))),
|
||||||
|
?ON(
|
||||||
|
Nodes,
|
||||||
|
?assertEqual(
|
||||||
|
[{<<"0">>, 1}, {<<"0">>, 2}],
|
||||||
|
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
|
||||||
|
)
|
||||||
|
),
|
||||||
|
%% Drop generation while all nodes are online:
|
||||||
|
?ON(N1, ?assertMatch(ok, emqx_ds:drop_generation(?DB, {<<"0">>, 1}))),
|
||||||
|
?ON(
|
||||||
|
Nodes,
|
||||||
|
?assertEqual(
|
||||||
|
[{<<"0">>, 2}],
|
||||||
|
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
|
||||||
|
)
|
||||||
|
),
|
||||||
|
%% Ston N3, then create and drop generation when it's offline:
|
||||||
|
ok = emqx_cth_cluster:stop_node(N3),
|
||||||
|
?ON(
|
||||||
|
N1,
|
||||||
|
begin
|
||||||
|
ok = emqx_ds:add_generation(?DB),
|
||||||
|
ok = emqx_ds:drop_generation(?DB, {<<"0">>, 2})
|
||||||
|
end
|
||||||
|
),
|
||||||
|
%% Restart N3 and verify that it reached the consistent state:
|
||||||
|
emqx_cth_cluster:restart(NS3),
|
||||||
|
ok = ?ON(N3, emqx_ds:open_db(?DB, Opts)),
|
||||||
|
%% N3 can be in unstalbe state right now, but it still
|
||||||
|
%% must successfully return streams:
|
||||||
|
?ON(
|
||||||
|
Nodes,
|
||||||
|
?assertEqual([], emqx_ds:get_streams(?DB, ['#'], 0))
|
||||||
|
),
|
||||||
|
timer:sleep(1000),
|
||||||
|
?ON(
|
||||||
|
Nodes,
|
||||||
|
?assertEqual(
|
||||||
|
[{<<"0">>, 3}],
|
||||||
|
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
after
|
||||||
|
emqx_cth_cluster:stop(Nodes)
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)),
|
||||||
|
true
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
shard_server_info(Node, DB, Shard, Site, Info) ->
|
shard_server_info(Node, DB, Shard, Site, Info) ->
|
||||||
|
|
|
@ -23,7 +23,29 @@
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
-define(ON(NODE, BODY),
|
-define(ON(NODE, BODY),
|
||||||
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
|
emqx_ds_test_helpers:on(NODE, fun() -> BODY end)
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec on([node()] | node(), fun(() -> A)) -> A | [A].
|
||||||
|
on(Node, Fun) when is_atom(Node) ->
|
||||||
|
[Ret] = on([Node], Fun),
|
||||||
|
Ret;
|
||||||
|
on(Nodes, Fun) ->
|
||||||
|
Results = erpc:multicall(Nodes, erlang, apply, [Fun, []]),
|
||||||
|
lists:map(
|
||||||
|
fun
|
||||||
|
({_Node, {ok, Result}}) ->
|
||||||
|
Result;
|
||||||
|
({Node, Error}) ->
|
||||||
|
ct:pal("Error on node ~p", [Node]),
|
||||||
|
case Error of
|
||||||
|
{error, {exception, Reason, Stack}} ->
|
||||||
|
erlang:raise(error, Reason, Stack);
|
||||||
|
_ ->
|
||||||
|
error(Error)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
lists:zip(Nodes, Results)
|
||||||
).
|
).
|
||||||
|
|
||||||
%% RPC mocking
|
%% RPC mocking
|
||||||
|
|
Loading…
Reference in New Issue