From ed38ca67d55d629d35b98eb826cc2bc0906d3e0d Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 15 Dec 2023 16:44:48 +0800 Subject: [PATCH] fix(ds): Unified the names of the `add_generation` API --- apps/emqx_durable_storage/src/emqx_ds.erl | 26 ++++++------------- .../src/emqx_ds_replication_layer.erl | 18 ++++++------- .../src/emqx_ds_replication_layer_meta.erl | 6 ++--- .../src/emqx_ds_storage_layer.erl | 2 -- .../src/proto/emqx_ds_proto_v1.erl | 8 +----- .../src/proto/emqx_ds_proto_v2.erl | 8 +++++- .../test/emqx_ds_SUITE.erl | 7 ++--- 7 files changed, 32 insertions(+), 43 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 2f6e93714..aedd5cea0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -22,7 +22,7 @@ -module(emqx_ds). %% Management API: --export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]). +-export([open_db/2, add_generation/2, add_generation/1, drop_db/1]). %% Message storage API: -export([store_batch/2, store_batch/3]). @@ -124,10 +124,10 @@ -callback open_db(db(), create_db_opts()) -> ok | {error, _}. --callback update_db_config(db(), create_db_opts()) -> ok | {error, _}. - -callback add_generation(db()) -> ok | {error, _}. +-callback add_generation(db(), create_db_opts()) -> ok | {error, _}. + -callback drop_db(db()) -> ok | {error, _}. -callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). @@ -158,23 +158,13 @@ open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin orelse Backen persistent_term:put(?persistent_term(DB), Module), ?module(DB):open_db(DB, Opts). --spec update_db_config(db(), create_db_opts()) -> ok. -update_db_config(DB, Opts) -> - case persistent_term:get(?persistent_term(DB), undefined) of - undefined -> - ok; - Module -> - Module:update_db_config(DB, Opts) - end. - -spec add_generation(db()) -> ok. add_generation(DB) -> - case persistent_term:get(?persistent_term(DB), undefined) of - undefined -> - ok; - Module -> - Module:add_generation(DB) - end. + ?module(DB):add_generation(DB). + +-spec add_generation(db(), create_db_opts()) -> ok. +add_generation(DB, Opts) -> + ?module(DB):add_generation(DB, Opts). %% @doc TODO: currently if one or a few shards are down, they won't be diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 6bc0f9f32..9b5c06bff 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -23,8 +23,8 @@ -export([ list_shards/1, open_db/2, - update_db_config/2, add_generation/1, + add_generation/2, drop_db/1, store_batch/3, get_streams/3, @@ -41,7 +41,7 @@ do_make_iterator_v1/5, do_update_iterator_v2/4, do_next_v1/4, - do_add_generation_v1/1 + do_add_generation_v2/1 ]). -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). @@ -124,16 +124,16 @@ open_db(DB, CreateOpts) -> MyShards ). --spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. -update_db_config(DB, CreateOpts) -> - emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). - -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), - _ = emqx_ds_proto_v1:add_generation(Nodes, DB), + _ = emqx_ds_proto_v2:add_generation(Nodes, DB), ok. +-spec add_generation(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. +add_generation(DB, CreateOpts) -> + emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). + -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> Nodes = list_nodes(), @@ -297,8 +297,8 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> do_next_v1(DB, Shard, Iter, BatchSize) -> emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize). --spec do_add_generation_v1(emqx_ds:db()) -> ok | {error, _}. -do_add_generation_v1(DB) -> +-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}. +do_add_generation_v2(DB) -> MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), lists:foreach( diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 7a30139f8..38c2dbbe7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -470,15 +470,15 @@ filter_shards(DB, Predicte) -> ShardId end). -filter_shards(DB, Predicte, Maper) -> +filter_shards(DB, Predicate, Mapper) -> eval_qlc( qlc:q([ - Maper(Shard) + Mapper(Shard) || #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table( ?SHARD_TAB ), D =:= DB, - Predicte(Shard) + Predicate(Shard) ]) ). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 2fb2d1c1f..28a5924c5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -265,8 +265,6 @@ add_generation(ShardId) -> %% gen_server for the shard %%================================================================================ --define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). - -spec start_link(shard_id(), emqx_ds:builtin_db_options()) -> {ok, pid()}. start_link(Shard = {_, _}, Options) -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index f4b792632..67ed1a3ca 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -24,8 +24,7 @@ store_batch/5, get_streams/5, make_iterator/6, - next/5, - add_generation/2 + next/5 ]). %% behavior callbacks: @@ -90,11 +89,6 @@ store_batch(Node, DB, Shard, Batch, Options) -> DB, Shard, Batch, Options ]). --spec add_generation([node()], emqx_ds:db()) -> - [{ok, ok} | erpc:caught_call_exception()]. -add_generation(Node, DB) -> - erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v1, [DB]). - %%================================================================================ %% behavior callbacks %%================================================================================ diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl index e3d575c38..5322c0f17 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl @@ -27,7 +27,8 @@ next/5, %% introduced in v2 - update_iterator/5 + update_iterator/5, + add_generation/2 ]). %% behavior callbacks: @@ -110,6 +111,11 @@ update_iterator(Node, DB, Shard, OldIter, DSKey) -> DB, Shard, OldIter, DSKey ]). +-spec add_generation([node()], emqx_ds:db()) -> + [{ok, ok} | erpc:caught_call_exception()]. +add_generation(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]). + %%================================================================================ %% behavior callbacks %%================================================================================ diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index b99fd6a8d..e6984f200 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -155,7 +155,7 @@ t_05_update_iterator(_Config) -> ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}), ok. -t_05_update(_Config) -> +t_05_update_config(_Config) -> DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, opts())), TopicFilter = ['#'], @@ -180,7 +180,7 @@ t_05_update(_Config) -> {false, TimeAcc, [Msgs | MsgAcc]}; (Datas, {Any, TimeAcc, MsgAcc}) -> timer:sleep(500), - ?assertMatch(ok, emqx_ds:update_db_config(DB, opts())), + ?assertMatch(ok, emqx_ds:add_generation(DB, opts())), timer:sleep(500), StartTime = emqx_message:timestamp_now(), Msgs = ToMsgs(Datas), @@ -269,7 +269,8 @@ fetch_all(DB, TopicFilter, StartTime) -> lists:foldl( fun({_, Stream}, Acc) -> {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), - {ok, _, Msgs} = iterate(DB, Iter0, StartTime), + {ok, _, Msgs0} = iterate(DB, Iter0, StartTime), + Msgs = lists:map(fun({_, Msg}) -> Msg end, Msgs0), Acc ++ Msgs end, [],