feat(ds): add an API for making new generations

This commit is contained in:
firest 2023-12-15 16:07:00 +08:00
parent bcae3d67c7
commit 31060733a5
6 changed files with 347 additions and 22 deletions

View File

@ -22,7 +22,7 @@
-module(emqx_ds).
%% Management API:
-export([open_db/2, drop_db/1]).
-export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]).
%% Message storage API:
-export([store_batch/2, store_batch/3]).
@ -95,7 +95,7 @@
%% Timestamp
%% Earliest possible timestamp is 0.
%% TODO granularity? Currently, we should always use micro second, as that's the unit we
%% TODO granularity? Currently, we should always use milliseconds, as that's the unit we
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
-type time() :: non_neg_integer().
@ -124,6 +124,10 @@
-callback open_db(db(), create_db_opts()) -> ok | {error, _}.
-callback update_db_config(db(), create_db_opts()) -> ok | {error, _}.
-callback add_generation(db()) -> ok | {error, _}.
-callback drop_db(db()) -> ok | {error, _}.
-callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
@ -154,6 +158,24 @@ open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin orelse Backen
persistent_term:put(?persistent_term(DB), Module),
?module(DB):open_db(DB, Opts).
-spec update_db_config(db(), create_db_opts()) -> ok.
update_db_config(DB, Opts) ->
case persistent_term:get(?persistent_term(DB), undefined) of
undefined ->
ok;
Module ->
Module:update_db_config(DB, Opts)
end.
-spec add_generation(db()) -> ok.
add_generation(DB) ->
case persistent_term:get(?persistent_term(DB), undefined) of
undefined ->
ok;
Module ->
Module:add_generation(DB)
end.
%% @doc TODO: currently if one or a few shards are down, they won't be
%% deleted.

View File

@ -23,6 +23,8 @@
-export([
list_shards/1,
open_db/2,
update_db_config/2,
add_generation/1,
drop_db/1,
store_batch/3,
get_streams/3,
@ -38,7 +40,8 @@
do_get_streams_v1/4,
do_make_iterator_v1/5,
do_update_iterator_v2/4,
do_next_v1/4
do_next_v1/4,
do_add_generation_v1/1
]).
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
@ -121,6 +124,16 @@ open_db(DB, CreateOpts) ->
MyShards
).
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
update_db_config(DB, CreateOpts) ->
emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
add_generation(DB) ->
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
_ = emqx_ds_proto_v1:add_generation(Nodes, DB),
ok.
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
drop_db(DB) ->
Nodes = list_nodes(),
@ -284,6 +297,17 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
do_next_v1(DB, Shard, Iter, BatchSize) ->
emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
-spec do_add_generation_v1(emqx_ds:db()) -> ok | {error, _}.
do_add_generation_v1(DB) ->
MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
lists:foreach(
fun(ShardId) ->
emqx_ds_storage_layer:add_generation({DB, ShardId})
end,
MyShards
).
%%================================================================================
%% Internal functions
%%================================================================================

View File

@ -21,20 +21,26 @@
%% implementation details from this module.
-module(emqx_ds_replication_layer_meta).
-compile(inline).
-behaviour(gen_server).
%% API:
-export([
shards/1,
my_shards/1,
my_owned_shards/1,
leader_nodes/1,
replica_set/2,
in_sync_replicas/2,
sites/0,
open_db/2,
update_db_config/2,
drop_db/1,
shard_leader/2,
this_site/0,
set_leader/3,
is_leader/1,
print_status/0
]).
@ -44,16 +50,19 @@
%% internal exports:
-export([
open_db_trans/2,
update_db_config_trans/2,
drop_db_trans/1,
claim_site/2,
in_sync_replicas_trans/2,
set_leader_trans/3,
is_leader_trans/1,
n_shards/1
]).
-export_type([site/0]).
-include_lib("stdlib/include/qlc.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%%================================================================================
%% Type declarations
@ -145,22 +154,34 @@ start_link() ->
-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
shards(DB) ->
eval_qlc(
qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB])
).
filter_shards(DB).
-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
my_shards(DB) ->
Site = this_site(),
eval_qlc(
qlc:q([
Shard
|| #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table(
?SHARD_TAB
),
D =:= DB,
lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
])
filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) ->
lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
end).
-spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
my_owned_shards(DB) ->
Self = node(),
filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) ->
Self =:= Leader
end).
-spec leader_nodes(emqx_ds:db()) -> [node()].
leader_nodes(DB) ->
lists:uniq(
filter_shards(
DB,
fun(#?SHARD_TAB{leader = Leader}) ->
Leader =/= undefined
end,
fun(#?SHARD_TAB{leader = Leader}) ->
Leader
end
)
).
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
@ -204,12 +225,25 @@ set_leader(DB, Shard, Node) ->
{atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
ok.
-spec is_leader(node()) -> boolean().
is_leader(Node) ->
{atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
Result.
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
emqx_ds_replication_layer:builtin_db_opts().
open_db(DB, DefaultOpts) ->
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
Opts.
-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
ok | {error, _}.
update_db_config(DB, DefaultOpts) ->
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:update_db_config_trans/2, [
DB, DefaultOpts
]),
Opts.
-spec drop_db(emqx_ds:db()) -> ok.
drop_db(DB) ->
_ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]),
@ -226,6 +260,7 @@ init([]) ->
logger:set_process_metadata(#{domain => [ds, meta]}),
ensure_tables(),
ensure_site(),
{ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}),
S = #s{},
{ok, S}.
@ -235,6 +270,18 @@ handle_call(_Call, _From, S) ->
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(
{mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S
) ->
MyShards = my_owned_shards(DB),
lists:foreach(
fun(ShardId) ->
emqx_ds_storage_layer:update_config({DB, ShardId}, Options)
end,
MyShards
),
{noreply, S};
handle_info(_Info, S) ->
{noreply, S}.
@ -260,6 +307,31 @@ open_db_trans(DB, CreateOpts) ->
Opts
end.
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
ok | {error, database}.
update_db_config_trans(DB, CreateOpts) ->
case mnesia:wread({?META_TAB, DB}) of
[#?META_TAB{db_props = Opts}] ->
%% Since this is an update and not a reopen,
%% we should keep the shard number and replication factor
%% and not create a new shard server
#{
n_shards := NShards,
replication_factor := ReplicationFactor
} = Opts,
mnesia:write(#?META_TAB{
db = DB,
db_props = CreateOpts#{
n_shards := NShards,
replication_factor := ReplicationFactor
}
}),
ok;
[] ->
{error, no_database}
end.
-spec drop_db_trans(emqx_ds:db()) -> ok.
drop_db_trans(DB) ->
mnesia:delete({?META_TAB, DB}),
@ -287,6 +359,24 @@ set_leader_trans(DB, Shard, Node) ->
Record = Record0#?SHARD_TAB{leader = Node},
mnesia:write(Record).
-spec is_leader_trans(node) -> boolean().
is_leader_trans(Node) ->
case
mnesia:select(
?SHARD_TAB,
ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) ->
Leader =:= Node
end),
1,
read
)
of
{[_ | _], _Cont} ->
true;
_ ->
false
end.
%%================================================================================
%% Internal functions
%%================================================================================
@ -346,7 +436,7 @@ create_shards(DB, NShards, ReplicationFactor) ->
Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites],
Hashes = lists:sort(Hashes0),
{_, Sites} = lists:unzip(Hashes),
[First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
[First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor),
Record = #?SHARD_TAB{
shard = {DB, Shard},
replica_set = ReplicaSet,
@ -369,3 +459,30 @@ eval_qlc(Q) ->
{atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end),
Result
end.
filter_shards(DB) ->
filter_shards(DB, const(true)).
-spec filter_shards(emqx_ds:db(), fun((_) -> boolean())) ->
[emqx_ds_replication_layer:shard_id()].
filter_shards(DB, Predicte) ->
filter_shards(DB, Predicte, fun(#?SHARD_TAB{shard = {_, ShardId}}) ->
ShardId
end).
filter_shards(DB, Predicte, Maper) ->
eval_qlc(
qlc:q([
Maper(Shard)
|| #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table(
?SHARD_TAB
),
D =:= DB,
Predicte(Shard)
])
).
const(Result) ->
fun(_) ->
Result
end.

View File

@ -25,7 +25,9 @@
get_streams/3,
make_iterator/4,
update_iterator/3,
next/3
next/3,
update_config/2,
add_generation/1
]).
%% gen_server
@ -47,6 +49,8 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
%%================================================================================
%% Type declarations
%%================================================================================
@ -249,13 +253,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
Error
end.
-spec update_config(shard_id(), emqx_ds:builtin_db_opts()) -> ok.
update_config(ShardId, Options) ->
gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity).
-spec add_generation(shard_id()) -> ok.
add_generation(ShardId) ->
gen_server:call(?REF(ShardId), add_generation, infinity).
%%================================================================================
%% gen_server for the shard
%%================================================================================
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
-spec start_link(shard_id(), options()) ->
-spec start_link(shard_id(), emqx_ds:builtin_db_options()) ->
{ok, pid()}.
start_link(Shard = {_, _}, Options) ->
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
@ -300,6 +312,18 @@ init({ShardId, Options}) ->
commit_metadata(S),
{ok, S}.
handle_call({update_config, Options}, _From, #s{schema = Schema} = S0) ->
Prototype = maps:get(storage, Options),
S1 = S0#s{schema = Schema#{prototype := Prototype}},
Since = emqx_message:timestamp_now(),
S = add_generation(S1, Since),
commit_metadata(S),
{reply, ok, S};
handle_call(add_generation, _From, S0) ->
Since = emqx_message:timestamp_now(),
S = add_generation(S0, Since),
commit_metadata(S),
{reply, ok, S};
handle_call(#call_create_generation{since = Since}, _From, S0) ->
S = add_generation(S0, Since),
commit_metadata(S),
@ -342,11 +366,13 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
-spec add_generation(server_state(), emqx_ds:time()) -> server_state().
add_generation(S0, Since) ->
#s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
{GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, Since),
Schema1 = update_last_until(Schema0, Since),
Shard1 = update_last_until(Shard0, Since),
{GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
CFRefs = NewCFRefs ++ CFRefs0,
Key = {generation, GenId},
Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
Shard = Shard0#{Key => Generation},
Shard = Shard1#{current_generation := GenId, Key => Generation},
S0#s{
cf_refs = CFRefs,
schema = Schema,
@ -426,6 +452,13 @@ rocksdb_open(Shard, Options) ->
db_dir({DB, ShardId}) ->
filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
-spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
update_last_until(Schema, Until) ->
#{current_generation := GenId} = Schema,
GenData0 = maps:get({generation, GenId}, Schema),
GenData = GenData0#{until := Until},
Schema#{{generation, GenId} := GenData}.
%%--------------------------------------------------------------------------------
%% Schema access
%%--------------------------------------------------------------------------------

View File

@ -24,7 +24,8 @@
store_batch/5,
get_streams/5,
make_iterator/6,
next/5
next/5,
add_generation/2
]).
%% behavior callbacks:
@ -89,6 +90,11 @@ store_batch(Node, DB, Shard, Batch, Options) ->
DB, Shard, Batch, Options
]).
-spec add_generation([node()], emqx_ds:db()) ->
[{ok, ok} | erpc:caught_call_exception()].
add_generation(Node, DB) ->
erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v1, [DB]).
%%================================================================================
%% behavior callbacks
%%================================================================================

View File

@ -50,7 +50,7 @@ t_00_smoke_open_drop(_Config) ->
lists:foreach(
fun(Shard) ->
?assertEqual(
{ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
{ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
),
?assertEqual(
[Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
@ -155,6 +155,127 @@ t_05_update_iterator(_Config) ->
?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
ok.
t_05_update(_Config) ->
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
TopicFilter = ['#'],
DataSet = update_data_set(),
ToMsgs = fun(Datas) ->
lists:map(
fun({Topic, Payload}) ->
message(Topic, Payload, emqx_message:timestamp_now())
end,
Datas
)
end,
{_, StartTimes, MsgsList} =
lists:foldl(
fun
(Datas, {true, TimeAcc, MsgAcc}) ->
Msgs = ToMsgs(Datas),
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
{false, TimeAcc, [Msgs | MsgAcc]};
(Datas, {Any, TimeAcc, MsgAcc}) ->
timer:sleep(500),
?assertMatch(ok, emqx_ds:update_db_config(DB, opts())),
timer:sleep(500),
StartTime = emqx_message:timestamp_now(),
Msgs = ToMsgs(Datas),
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
{Any, [StartTime | TimeAcc], [Msgs | MsgAcc]}
end,
{true, [emqx_message:timestamp_now()], []},
DataSet
),
Checker = fun({StartTime, Msgs0}, Acc) ->
Msgs = Msgs0 ++ Acc,
Batch = fetch_all(DB, TopicFilter, StartTime),
?assertEqual(Msgs, Batch, {StartTime}),
Msgs
end,
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
t_06_add_generation(_Config) ->
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
TopicFilter = ['#'],
DataSet = update_data_set(),
ToMsgs = fun(Datas) ->
lists:map(
fun({Topic, Payload}) ->
message(Topic, Payload, emqx_message:timestamp_now())
end,
Datas
)
end,
{_, StartTimes, MsgsList} =
lists:foldl(
fun
(Datas, {true, TimeAcc, MsgAcc}) ->
Msgs = ToMsgs(Datas),
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
{false, TimeAcc, [Msgs | MsgAcc]};
(Datas, {Any, TimeAcc, MsgAcc}) ->
timer:sleep(500),
?assertMatch(ok, emqx_ds:add_generation(DB)),
timer:sleep(500),
StartTime = emqx_message:timestamp_now(),
Msgs = ToMsgs(Datas),
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
{Any, [StartTime | TimeAcc], [Msgs | MsgAcc]}
end,
{true, [emqx_message:timestamp_now()], []},
DataSet
),
Checker = fun({StartTime, Msgs0}, Acc) ->
Msgs = Msgs0 ++ Acc,
Batch = fetch_all(DB, TopicFilter, StartTime),
?assertEqual(Msgs, Batch, {StartTime}),
Msgs
end,
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
update_data_set() ->
[
[
{<<"foo/bar">>, <<"1">>}
],
[
{<<"foo">>, <<"2">>}
],
[
{<<"bar/bar">>, <<"3">>}
]
].
fetch_all(DB, TopicFilter, StartTime) ->
Streams0 = emqx_ds:get_streams(DB, TopicFilter, StartTime),
Streams = lists:sort(
fun({{_, A}, _}, {{_, B}, _}) ->
A < B
end,
Streams0
),
lists:foldl(
fun({_, Stream}, Acc) ->
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
{ok, _, Msgs} = iterate(DB, Iter0, StartTime),
Acc ++ Msgs
end,
[],
Streams
).
message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,
@ -172,6 +293,8 @@ iterate(DB, It0, BatchSize, Acc) ->
{ok, It, Acc};
{ok, It, Msgs} ->
iterate(DB, It, BatchSize, Acc ++ Msgs);
{ok, end_of_stream} ->
{ok, It0, Acc};
Ret ->
Ret
end.