refactor(ds): Implement emqx_ds:open_db

This commit is contained in:
ieQu1 2023-10-03 02:48:06 +02:00
parent 7095cb8583
commit 59d01dc823
8 changed files with 59 additions and 54 deletions

View File

@ -42,18 +42,7 @@
init() ->
?WHEN_ENABLED(begin
ok = emqx_ds:ensure_shard(
?DS_SHARD,
#{
dir => filename:join([
emqx:data_dir(),
ds,
messages,
?DEFAULT_KEYSPACE,
?DS_SHARD_ID
])
}
),
ok = emqx_ds:create_db(<<"default">>, #{}),
ok = emqx_persistent_session_ds_router:init_tables(),
ok = emqx_persistent_session_ds:create_tables(),
ok

View File

@ -16,7 +16,7 @@
-module(emqx_ds).
%% Management API:
-export([create_db/2]).
-export([open_db/2]).
%% Message storage API:
-export([message_store/1, message_store/2, message_store/3]).
@ -88,9 +88,9 @@
%% API funcions
%%================================================================================
-spec create_db(db(), create_db_opts()) -> ok.
create_db(DB, Opts) ->
emqx_ds_replication_layer:create_db(DB, Opts).
-spec open_db(db(), create_db_opts()) -> ok.
open_db(DB, Opts) ->
emqx_ds_replication_layer:open_db(DB, Opts).
-spec message_store([emqx_types:message()]) ->
{ok, [message_id()]} | {error, _}.
@ -102,6 +102,7 @@ message_store(Msgs) ->
message_store(DB, Msgs, Opts) ->
emqx_ds_replication_layer:message_store(DB, Msgs, Opts).
%% TODO: Do we really need to return message IDs? It's extra work...
-spec message_store(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
message_store(DB, Msgs) ->
message_store(DB, Msgs, #{}).

View File

@ -143,7 +143,7 @@ get_streams(Keyspace, TopicFilter, StartTime) ->
-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
ok | {error, _Reason}.
ensure_shard(Shard, Options) ->
ensure_shard(Sharzd, Options) ->
case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
{ok, _Pid} ->
ok;

View File

@ -87,7 +87,7 @@
-export([delete/4]).
-export([get_streams/2]).
-export([make_iterator/2, make_iterator/3, next/1]).
-export([make_iterator/3, next/1]).
-export([preserve_iterator/1]).
-export([restore_iterator/2]).
@ -295,13 +295,6 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic
get_streams(_, _) ->
[singleton_stream].
-spec make_iterator(db(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(DB, Replay) ->
{Keyspace, _ShardId} = DB#db.shard,
Options = emqx_ds_conf:iteration_options(Keyspace),
make_iterator(DB, Replay, Options).
-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time
% and call it a day: client violated the contract anyway.
@ -373,7 +366,8 @@ restore_iterator(DB, #{
cursor := Cursor,
replay := Replay = {_TopicFilter, _StartTime}
}) ->
case make_iterator(DB, Replay) of
Options = #{}, % TODO: passthrough options
case make_iterator(DB, Replay, Options) of
{ok, It} when Cursor == undefined ->
% Iterator was preserved right after it has been made.
{ok, It};

View File

@ -17,7 +17,7 @@
-export([
list_shards/1,
create_db/2,
open_db/2,
message_store/3,
get_streams/3,
open_iterator/3,
@ -26,7 +26,7 @@
%% internal exports:
-export([ do_create_shard_v1/2,
-export([ do_open_shard_v1/2,
do_get_streams_v1/3,
do_open_iterator_v1/3,
do_next_v1/3
@ -55,16 +55,16 @@ list_shards(DB) ->
%% TODO: milestone 5
lists:map(
fun(Node) ->
term_to_binary({DB, Node})
shard_id(DB, Node)
end,
list_nodes()).
-spec create_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok.
create_db(DB, Opts) ->
-spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok.
open_db(DB, Opts) ->
lists:foreach(
fun(Node) ->
Shard = term_to_binary({DB, Node}),
emqx_ds_proto_v1:create_shard(Node, Shard, Opts)
Shard = shard_id(DB, Node),
emqx_ds_proto_v1:open_shard(Node, Shard, Opts)
end,
list_nodes()).
@ -107,9 +107,9 @@ next(Shard, Iter, BatchSize) ->
%% Internal exports (RPC targets)
%%================================================================================
-spec do_create_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok.
do_create_shard_v1(Shard, Opts) ->
error({todo, Shard, Opts}).
-spec do_open_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok.
do_open_shard_v1(Shard, Opts) ->
emqx_ds_storage_layer_sup:ensure_shard(Shard, Opts).
-spec do_get_streams_v1(shard(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), stream()}].
@ -129,10 +129,16 @@ do_next_v1(Shard, Iter, BatchSize) ->
%% Internal functions
%%================================================================================
shard_id(DB, Node) ->
%% TODO: don't bake node name into the schema, don't repeat the
%% Mnesia's 1M$ mistake.
NodeBin = atom_to_binary(Node),
<<DB/binary, ":", NodeBin/binary>>.
-spec node_of_shard(shard()) -> node().
node_of_shard(ShardId) ->
{_DB, Node} = binary_to_term(ShardId),
Node.
[_DB, NodeBin] = binary:split(ShardId, <<":">>),
binary_to_atom(NodeBin).
list_nodes() ->
mria:running_nodes().

View File

@ -69,6 +69,7 @@
-record(s, {
shard :: emqx_ds:shard(),
keyspace :: emqx_ds_conf:keyspace(),
db :: rocksdb:db_handle(),
cf_iterator :: rocksdb:cf_handle(),
cf_generations :: cf_refs()
@ -176,7 +177,8 @@ message_store(Shard, Msgs, _Opts) ->
{_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp),
Topic = emqx_topic:words(emqx_message:topic(Msg)),
Payload = serialize(Msg),
Mod:store(ModState, GUID, Timestamp, Topic, Payload)
Mod:store(ModState, GUID, Timestamp, Topic, Payload),
GUID
end,
Msgs)}.
@ -356,7 +358,7 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
meta_register_gen(Shard, GenId, Gen).
-spec ensure_current_generation(state()) -> state().
ensure_current_generation(S = #s{shard = {Keyspace, _ShardId}, db = DBHandle}) ->
ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) ->
case schema_get_current(DBHandle) of
undefined ->
Config = emqx_ds_conf:keyspace_config(Keyspace),
@ -396,9 +398,11 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
open_db(Shard = {Keyspace, ShardId}, Options) ->
DefaultDir = filename:join([atom_to_binary(Keyspace), ShardId]),
open_db(Shard, Options) ->
DefaultDir = binary_to_list(Shard),
DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
%% TODO: properly forward keyspace
Keyspace = maps:get(keyspace, Options, default_keyspace),
DBOptions = [
{create_if_missing, true},
{create_missing_column_families, true}
@ -423,6 +427,7 @@ open_db(Shard = {Keyspace, ShardId}, Options) ->
{CFNames, _} = lists:unzip(ExistingCFs),
{ok, #s{
shard = Shard,
keyspace = Keyspace,
db = DBHandle,
cf_iterator = CFIterator,
cf_generations = lists:zip(CFNames, CFRefs)
@ -451,7 +456,8 @@ open_next_iterator(Gen = #{}, It) ->
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
case Mod:make_iterator(Data, It#it.replay) of
Options = #{}, % TODO: passthrough options
case Mod:make_iterator(Data, It#it.replay, Options) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
@ -611,9 +617,9 @@ meta_register_gen(Shard, GenId, Gen) ->
-spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Shard, Time) ->
% TODO
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
% towards a "no".
%% TODO
%% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
%% towards a "no".
Current = meta_lookup(Shard, current),
Gens = meta_lookup(Shard, Current),
find_gen(Time, Current, Gens).
@ -671,7 +677,8 @@ is_gen_valid(_Shard, 0, 0) ->
ok.
serialize(Msg) ->
%% TODO: remove topic, GUID, etc. from the stored message.
%% TODO: remove topic, GUID, etc. from the stored
%% message. Reconstruct it from the metadata.
term_to_binary(emqx_message:to_map(Msg)).
deserialize(Bin) ->

View File

@ -35,6 +35,17 @@ stop_shard(Shard) ->
ok = supervisor:terminate_child(?SUP, Shard),
ok = supervisor:delete_child(?SUP, Shard).
-spec ensure_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}.
ensure_shard(Shard, Options) ->
case start_shard(Shard, Options) of
{ok, _Pid} ->
ok;
{error, {already_started, _Pid}} ->
ok;
{error, Reason} ->
{error, Reason}
end.
%%================================================================================
%% behaviour callbacks
%%================================================================================

View File

@ -282,14 +282,11 @@ init_per_testcase(TC, Config) ->
end_per_testcase(TC, _Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
keyspace(TC) ->
list_to_atom(lists:concat([?MODULE, "_", TC])).
shard_id(_TC) ->
<<"shard">>.
shard(TC) ->
{keyspace(TC), shard_id(TC)}.
iolist_to_binary([?MODULE_STRING, "_", atom_to_list(TC)]).
keyspace(TC) ->
TC.
set_keyspace_config(Keyspace, Config) ->
ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).