From 59d01dc82334ec634ca1894b5b85d2abd228944f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 3 Oct 2023 02:48:06 +0200 Subject: [PATCH] refactor(ds): Implement emqx_ds:open_db --- apps/emqx/src/emqx_persistent_message.erl | 13 +------- apps/emqx_durable_storage/src/emqx_ds.erl | 9 +++--- apps/emqx_durable_storage/src/emqx_ds.erl_ | 2 +- .../src/emqx_ds_message_storage_bitmask.erl | 12 ++------ .../src/emqx_ds_replication_layer.erl | 30 +++++++++++-------- .../src/emqx_ds_storage_layer.erl | 25 ++++++++++------ .../src/emqx_ds_storage_layer_sup.erl | 11 +++++++ .../test/emqx_ds_storage_layer_SUITE.erl | 11 +++---- 8 files changed, 59 insertions(+), 54 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 3f38b4030..96c767d7e 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -42,18 +42,7 @@ init() -> ?WHEN_ENABLED(begin - ok = emqx_ds:ensure_shard( - ?DS_SHARD, - #{ - dir => filename:join([ - emqx:data_dir(), - ds, - messages, - ?DEFAULT_KEYSPACE, - ?DS_SHARD_ID - ]) - } - ), + ok = emqx_ds:create_db(<<"default">>, #{}), ok = emqx_persistent_session_ds_router:init_tables(), ok = emqx_persistent_session_ds:create_tables(), ok diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 762478932..70cdd8d17 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -16,7 +16,7 @@ -module(emqx_ds). %% Management API: --export([create_db/2]). +-export([open_db/2]). %% Message storage API: -export([message_store/1, message_store/2, message_store/3]). @@ -88,9 +88,9 @@ %% API funcions %%================================================================================ --spec create_db(db(), create_db_opts()) -> ok. -create_db(DB, Opts) -> - emqx_ds_replication_layer:create_db(DB, Opts). +-spec open_db(db(), create_db_opts()) -> ok. +open_db(DB, Opts) -> + emqx_ds_replication_layer:open_db(DB, Opts). -spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. @@ -102,6 +102,7 @@ message_store(Msgs) -> message_store(DB, Msgs, Opts) -> emqx_ds_replication_layer:message_store(DB, Msgs, Opts). +%% TODO: Do we really need to return message IDs? It's extra work... -spec message_store(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. message_store(DB, Msgs) -> message_store(DB, Msgs, #{}). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl_ b/apps/emqx_durable_storage/src/emqx_ds.erl_ index 61b4c4bb3..1acbcc7c7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl_ +++ b/apps/emqx_durable_storage/src/emqx_ds.erl_ @@ -143,7 +143,7 @@ get_streams(Keyspace, TopicFilter, StartTime) -> -spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. -ensure_shard(Shard, Options) -> +ensure_shard(Sharzd, Options) -> case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of {ok, _Pid} -> ok; diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index f51d556f1..3290b03e6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -87,7 +87,7 @@ -export([delete/4]). -export([get_streams/2]). --export([make_iterator/2, make_iterator/3, next/1]). +-export([make_iterator/3, next/1]). -export([preserve_iterator/1]). -export([restore_iterator/2]). @@ -295,13 +295,6 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic get_streams(_, _) -> [singleton_stream]. --spec make_iterator(db(), emqx_ds:replay()) -> - {ok, iterator()} | {error, _TODO}. -make_iterator(DB, Replay) -> - {Keyspace, _ShardId} = DB#db.shard, - Options = emqx_ds_conf:iteration_options(Keyspace), - make_iterator(DB, Replay, Options). - -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. @@ -373,7 +366,8 @@ restore_iterator(DB, #{ cursor := Cursor, replay := Replay = {_TopicFilter, _StartTime} }) -> - case make_iterator(DB, Replay) of + Options = #{}, % TODO: passthrough options + case make_iterator(DB, Replay, Options) of {ok, It} when Cursor == undefined -> % Iterator was preserved right after it has been made. {ok, It}; diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index af6087188..846d2ca0c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -17,7 +17,7 @@ -export([ list_shards/1, - create_db/2, + open_db/2, message_store/3, get_streams/3, open_iterator/3, @@ -26,7 +26,7 @@ %% internal exports: --export([ do_create_shard_v1/2, +-export([ do_open_shard_v1/2, do_get_streams_v1/3, do_open_iterator_v1/3, do_next_v1/3 @@ -55,16 +55,16 @@ list_shards(DB) -> %% TODO: milestone 5 lists:map( fun(Node) -> - term_to_binary({DB, Node}) + shard_id(DB, Node) end, list_nodes()). --spec create_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok. -create_db(DB, Opts) -> +-spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok. +open_db(DB, Opts) -> lists:foreach( fun(Node) -> - Shard = term_to_binary({DB, Node}), - emqx_ds_proto_v1:create_shard(Node, Shard, Opts) + Shard = shard_id(DB, Node), + emqx_ds_proto_v1:open_shard(Node, Shard, Opts) end, list_nodes()). @@ -107,9 +107,9 @@ next(Shard, Iter, BatchSize) -> %% Internal exports (RPC targets) %%================================================================================ --spec do_create_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok. -do_create_shard_v1(Shard, Opts) -> - error({todo, Shard, Opts}). +-spec do_open_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok. +do_open_shard_v1(Shard, Opts) -> + emqx_ds_storage_layer_sup:ensure_shard(Shard, Opts). -spec do_get_streams_v1(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -129,10 +129,16 @@ do_next_v1(Shard, Iter, BatchSize) -> %% Internal functions %%================================================================================ +shard_id(DB, Node) -> + %% TODO: don't bake node name into the schema, don't repeat the + %% Mnesia's 1M$ mistake. + NodeBin = atom_to_binary(Node), + <>. + -spec node_of_shard(shard()) -> node(). node_of_shard(ShardId) -> - {_DB, Node} = binary_to_term(ShardId), - Node. + [_DB, NodeBin] = binary:split(ShardId, <<":">>), + binary_to_atom(NodeBin). list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index f4dbbe6f4..93c1aaa1f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -69,6 +69,7 @@ -record(s, { shard :: emqx_ds:shard(), + keyspace :: emqx_ds_conf:keyspace(), db :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle(), cf_generations :: cf_refs() @@ -176,7 +177,8 @@ message_store(Shard, Msgs, _Opts) -> {_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp), Topic = emqx_topic:words(emqx_message:topic(Msg)), Payload = serialize(Msg), - Mod:store(ModState, GUID, Timestamp, Topic, Payload) + Mod:store(ModState, GUID, Timestamp, Topic, Payload), + GUID end, Msgs)}. @@ -356,7 +358,7 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> meta_register_gen(Shard, GenId, Gen). -spec ensure_current_generation(state()) -> state(). -ensure_current_generation(S = #s{shard = {Keyspace, _ShardId}, db = DBHandle}) -> +ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> Config = emqx_ds_conf:keyspace_config(Keyspace), @@ -396,9 +398,11 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. -spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. -open_db(Shard = {Keyspace, ShardId}, Options) -> - DefaultDir = filename:join([atom_to_binary(Keyspace), ShardId]), +open_db(Shard, Options) -> + DefaultDir = binary_to_list(Shard), DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), + %% TODO: properly forward keyspace + Keyspace = maps:get(keyspace, Options, default_keyspace), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} @@ -423,6 +427,7 @@ open_db(Shard = {Keyspace, ShardId}, Options) -> {CFNames, _} = lists:unzip(ExistingCFs), {ok, #s{ shard = Shard, + keyspace = Keyspace, db = DBHandle, cf_iterator = CFIterator, cf_generations = lists:zip(CFNames, CFRefs) @@ -451,7 +456,8 @@ open_next_iterator(Gen = #{}, It) -> -spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. open_iterator(#{module := Mod, data := Data}, It = #it{}) -> - case Mod:make_iterator(Data, It#it.replay) of + Options = #{}, % TODO: passthrough options + case Mod:make_iterator(Data, It#it.replay, Options) of {ok, ItData} -> {ok, It#it{module = Mod, data = ItData}}; Err -> @@ -611,9 +617,9 @@ meta_register_gen(Shard, GenId, Gen) -> -spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}. meta_lookup_gen(Shard, Time) -> - % TODO - % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning - % towards a "no". + %% TODO + %% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning + %% towards a "no". Current = meta_lookup(Shard, current), Gens = meta_lookup(Shard, Current), find_gen(Time, Current, Gens). @@ -671,7 +677,8 @@ is_gen_valid(_Shard, 0, 0) -> ok. serialize(Msg) -> - %% TODO: remove topic, GUID, etc. from the stored message. + %% TODO: remove topic, GUID, etc. from the stored + %% message. Reconstruct it from the metadata. term_to_binary(emqx_message:to_map(Msg)). deserialize(Bin) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index 56c8c760a..2e4f56f10 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -35,6 +35,17 @@ stop_shard(Shard) -> ok = supervisor:terminate_child(?SUP, Shard), ok = supervisor:delete_child(?SUP, Shard). +-spec ensure_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. +ensure_shard(Shard, Options) -> + case start_shard(Shard, Options) of + {ok, _Pid} -> + ok; + {error, {already_started, _Pid}} -> + ok; + {error, Reason} -> + {error, Reason} + end. + %%================================================================================ %% behaviour callbacks %%================================================================================ diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index 981f1062a..25198cfd7 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -282,14 +282,11 @@ init_per_testcase(TC, Config) -> end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). -keyspace(TC) -> - list_to_atom(lists:concat([?MODULE, "_", TC])). - -shard_id(_TC) -> - <<"shard">>. - shard(TC) -> - {keyspace(TC), shard_id(TC)}. + iolist_to_binary([?MODULE_STRING, "_", atom_to_list(TC)]). + +keyspace(TC) -> + TC. set_keyspace_config(Keyspace, Config) -> ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).