From 3d9837a0b8447061150863cace609e82fe471cd1 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 24 May 2024 13:48:42 +0200 Subject: [PATCH 1/4] test(ds): Add emqx_ds_replication_SUITE:t_drop_generation --- .../test/emqx_ds_replication_SUITE.erl | 82 ++++++++++++++++++- .../test/emqx_ds_test_helpers.erl | 24 +++++- 2 files changed, 103 insertions(+), 3 deletions(-) diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 941e20641..1b2a21105 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -25,8 +25,8 @@ -define(DB, testdb). --define(ON(NODE, BODY), - erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +-define(ON(NODES, BODY), + emqx_ds_test_helpers:on(NODES, fun() -> BODY end) ). opts() -> @@ -476,6 +476,84 @@ t_rebalance_offline_restarts(Config) -> ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). +t_drop_generation(Config) -> + Apps = [appspec(emqx_durable_storage)], + [_, _, NS3] = + NodeSpecs = emqx_cth_cluster:mk_nodespecs( + [ + {t_drop_generation1, #{apps => Apps}}, + {t_drop_generation2, #{apps => Apps}}, + {t_drop_generation3, #{apps => Apps}} + ], + #{ + work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) + } + ), + + Nodes = [N1, _, N3] = emqx_cth_cluster:start(NodeSpecs), + ?check_trace( + try + %% Initialize DB on all 3 nodes. + Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + timer:sleep(1000), + %% Create a generation while all nodes are online: + ?ON(N1, ?assertMatch(ok, emqx_ds:add_generation(?DB))), + ?ON( + Nodes, + ?assertEqual( + [{<<"0">>, 1}, {<<"0">>, 2}], + maps:keys(emqx_ds:list_generations_with_lifetimes(?DB)) + ) + ), + %% Drop generation while all nodes are online: + ?ON(N1, ?assertMatch(ok, emqx_ds:drop_generation(?DB, {<<"0">>, 1}))), + ?ON( + Nodes, + ?assertEqual( + [{<<"0">>, 2}], + maps:keys(emqx_ds:list_generations_with_lifetimes(?DB)) + ) + ), + %% Ston N3, then create and drop generation when it's offline: + ok = emqx_cth_cluster:stop_node(N3), + ?ON( + N1, + begin + ok = emqx_ds:add_generation(?DB), + ok = emqx_ds:drop_generation(?DB, {<<"0">>, 2}) + end + ), + %% Restart N3 and verify that it reached the consistent state: + emqx_cth_cluster:restart(NS3), + ok = ?ON(N3, emqx_ds:open_db(?DB, Opts)), + %% N3 can be in unstalbe state right now, but it still + %% must successfully return streams: + ?ON( + Nodes, + ?assertEqual([], emqx_ds:get_streams(?DB, ['#'], 0)) + ), + timer:sleep(1000), + ?ON( + Nodes, + ?assertEqual( + [{<<"0">>, 3}], + maps:keys(emqx_ds:list_generations_with_lifetimes(?DB)) + ) + ) + after + emqx_cth_cluster:stop(Nodes) + end, + fun(Trace) -> + %% TODO: some idempotency errors still happen + %% ?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)), + true + end + ). + %% shard_server_info(Node, DB, Shard, Site, Info) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index f3ad1c151..4fed1d57b 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -23,9 +23,31 @@ -include_lib("stdlib/include/assert.hrl"). -define(ON(NODE, BODY), - erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) + emqx_ds_test_helpers:on(NODE, fun() -> BODY end) ). +-spec on([node()] | node(), fun(() -> A)) -> A | [A]. +on(Node, Fun) when is_atom(Node) -> + [Ret] = on([Node], Fun), + Ret; +on(Nodes, Fun) -> + Results = erpc:multicall(Nodes, erlang, apply, [Fun, []]), + lists:map( + fun + ({_Node, {ok, Result}}) -> + Result; + ({Node, Error}) -> + ct:pal("Error on node ~p", [Node]), + case Error of + {error, {exception, Reason, Stack}} -> + erlang:raise(error, Reason, Stack); + _ -> + error(Error) + end + end, + lists:zip(Nodes, Results) + ). + %% RPC mocking mock_rpc() -> From 25721bceba4e464b274dd3b352034c267ae49735 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 24 May 2024 13:57:10 +0200 Subject: [PATCH 2/4] fix(dsstor): Delete generation metadata before dropping it --- .../src/emqx_ds_storage_layer.erl | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 2245e81c5..f35792c17 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -591,6 +591,7 @@ init({ShardId, Options}) -> shard = Shard }, commit_metadata(S), + ?tp(debug, ds_storage_init_state, #{shard => ShardId, s => S}), {ok, S}. format_status(Status) -> @@ -625,7 +626,6 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> {reply, Generations, S}; handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> {Reply, S} = handle_drop_generation(S0, GenId), - commit_metadata(S), {reply, Reply, S}; handle_call(#call_take_snapshot{}, _From, S) -> Snapshot = handle_take_snapshot(S), @@ -774,6 +774,21 @@ handle_drop_generation(S0, GenId) -> shard = OldShard, cf_refs = OldCFRefs } = S0, + %% 1. Commit the metadata first, so other functions are less + %% likely to see stale data, and replicas don't end up + %% inconsistent state, where generation's column families are + %% absent, but its metadata is still present. + %% + %% Note: in theory, this operation may be interrupted in the + %% middle. This will leave column families hanging. + Shard = maps:remove(?GEN_KEY(GenId), OldShard), + Schema = maps:remove(?GEN_KEY(GenId), OldSchema), + S1 = S0#s{ + shard = Shard, + schema = Schema + }, + commit_metadata(S1), + %% 2. Now, actually drop the data from RocksDB: #{module := Mod, cf_refs := GenCFRefs} = GenSchema, #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, try @@ -793,13 +808,7 @@ handle_drop_generation(S0, GenId) -> ) end, CFRefs = OldCFRefs -- GenCFRefs, - Shard = maps:remove(?GEN_KEY(GenId), OldShard), - Schema = maps:remove(?GEN_KEY(GenId), OldSchema), - S = S0#s{ - cf_refs = CFRefs, - shard = Shard, - schema = Schema - }, + S = S1#s{cf_refs = CFRefs}, {ok, S}. -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> From 1ffc7d5d9ec98018d8a206b699217158c48f01d5 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 24 May 2024 13:58:06 +0200 Subject: [PATCH 3/4] fix(dsrepl): Treat all exceptions from storage layer as recoverable --- apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index ee4e71812..896999af7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -566,7 +566,11 @@ list_nodes() -> EXPR catch error:RPCError__ = {erpc, _} -> - {error, recoverable, RPCError__} + {error, recoverable, RPCError__}; + %% Note: remote node never _throws_ unrecoverable errors, so + %% we can assume that all exceptions are transient. + EC__:RPCError__:Stack__ -> + {error, recoverable, #{EC__ => RPCError__, stacktrace => Stack__}} end ). From 830b62d899a6b5b5989e7d875e04816c00d041b2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 24 May 2024 19:07:12 +0200 Subject: [PATCH 4/4] fix(dsrepl): Retry sending ra commands to the leader --- .../src/emqx_ds_replication_layer.erl | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 896999af7..836e9df07 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -609,13 +609,7 @@ ra_add_generation(DB, Shard) -> ?tag => add_generation, ?since => emqx_ds:timestamp_us() }, - Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command, ?RA_TIMEOUT) of - {ok, Result, _Leader} -> - Result; - Error -> - error(Error, [DB, Shard]) - end. + ra_command(DB, Shard, Command, 10). ra_update_config(DB, Shard, Opts) -> Command = #{ @@ -623,20 +617,20 @@ ra_update_config(DB, Shard, Opts) -> ?config => Opts, ?since => emqx_ds:timestamp_us() }, - Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command, ?RA_TIMEOUT) of - {ok, Result, _Leader} -> - Result; - Error -> - error(Error, [DB, Shard]) - end. + ra_command(DB, Shard, Command, 10). ra_drop_generation(DB, Shard, GenId) -> Command = #{?tag => drop_generation, ?generation => GenId}, + ra_command(DB, Shard, Command, 10). + +ra_command(DB, Shard, Command, Retries) -> Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; + _Error when Retries > 0 -> + timer:sleep(?RA_TIMEOUT), + ra_command(DB, Shard, Command, Retries - 1); Error -> error(Error, [DB, Shard]) end.