diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 8801acce5..82717cd01 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -26,6 +26,8 @@ persist/1 ]). +-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). + %% FIXME -define(WHEN_ENABLED(DO), case is_store_enabled() of @@ -38,7 +40,7 @@ init() -> ?WHEN_ENABLED(begin - ok = emqx_ds:open_db(<<"default">>, #{}), + ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{}), ok = emqx_persistent_session_ds_router:init_tables(), %ok = emqx_persistent_session_ds:create_tables(), ok @@ -65,8 +67,9 @@ persist(Msg) -> needs_persistence(Msg) -> not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). +-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). store_message(Msg) -> - emqx_ds:store_batch([Msg]). + emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]). has_subscribers(#message{topic = Topic}) -> emqx_persistent_session_ds_router:has_any_route(Topic). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 293f2e531..cf4b5a031 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -25,7 +25,7 @@ -export([open_db/2, drop_db/1]). %% Message storage API: --export([store_batch/1, store_batch/2, store_batch/3]). +-export([store_batch/2, store_batch/3]). %% Message replay API: -export([get_streams/3, make_iterator/2, next/2]). @@ -89,8 +89,6 @@ -type message_id() :: emqx_ds_replication_layer:message_id(). --define(DEFAULT_DB, <<"default">>). - %%================================================================================ %% API funcions %%================================================================================ @@ -107,10 +105,6 @@ open_db(DB, Opts) -> drop_db(DB) -> emqx_ds_replication_layer:drop_db(DB). --spec store_batch([emqx_types:message()]) -> store_batch_result(). -store_batch(Msgs) -> - store_batch(?DEFAULT_DB, Msgs, #{}). - -spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). store_batch(DB, Msgs, Opts) -> emqx_ds_replication_layer:store_batch(DB, Msgs, Opts). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index e1c775d5a..b43604469 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -196,10 +196,6 @@ do_next_v1(Shard, Iter, BatchSize) -> %% Internal functions %%================================================================================ -add_shard_to_rank(Shard, RankY) -> - RankX = erlang:phash2(Shard, 255), - {RankX, RankY}. - shard_id(DB, Node) -> %% TODO: don't bake node name into the schema, don't repeat the %% Mnesia's 1M$ mistake. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d531c5985..e9d4edc06 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -24,10 +24,10 @@ -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([drop_shard/1]). - -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + %%================================================================================ %% Type declarations %%================================================================================ @@ -79,9 +79,11 @@ %%%% Shard: -type shard(GenData) :: #{ + %% ID of the current generation (where the new data is written:) current_generation := gen_id(), - default_generation_module := module(), - default_generation_config := term(), + %% This data is used to create new generation: + prototype := {module(), term()}, + %% Generations: {generation, gen_id()} => GenData }. @@ -206,6 +208,9 @@ start_link(Shard, Options) -> shard :: shard() }). +%% Note: we specify gen_server requests as records to make use of Dialyzer: +-record(call_create_generation, {since :: emqx_ds:time()}). + -type server_state() :: #s{}. -define(DEFAULT_CF, "default"). @@ -213,6 +218,7 @@ start_link(Shard, Options) -> init({ShardId, Options}) -> process_flag(trap_exit, true), + logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}), erase_schema_runtime(ShardId), {ok, DB, CFRefs0} = rocksdb_open(ShardId, Options), {Schema, CFRefs} = @@ -233,13 +239,10 @@ init({ShardId, Options}) -> commit_metadata(S), {ok, S}. -%% handle_call({create_generation, Since, Config}, _From, S) -> -%% case create_new_gen(Since, Config, S) of -%% {ok, GenId, NS} -> -%% {reply, {ok, GenId}, NS}; -%% {error, _} = Error -> -%% {reply, Error, S} -%% end; +handle_call(#call_create_generation{since = Since}, _From, S0) -> + S = add_generation(S0, Since), + commit_metadata(S), + {reply, ok, S}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -275,29 +278,52 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> ShardSchema ). +-spec add_generation(server_state(), emqx_ds:time()) -> server_state(). +add_generation(S0, Since) -> + #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, + {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, Since), + CFRefs = NewCFRefs ++ CFRefs0, + Key = {generation, GenId}, + Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), + Shard = Shard0#{Key => Generation}, + S0#s{ + cf_refs = CFRefs, + schema = Schema, + shard = Shard + }. + -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> generation(). open_generation(ShardId, DB, CFRefs, GenId, GenSchema) -> + ?tp(debug, ds_open_generation, #{gen_id => GenId, schema => GenSchema}), #{module := Mod, data := Schema} = GenSchema, RuntimeData = Mod:open(ShardId, DB, GenId, CFRefs, Schema), GenSchema#{data => RuntimeData}. -spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), _Options) -> {shard_schema(), cf_refs()}. -create_new_shard_schema(ShardId, DB, CFRefs, _Options) -> - GenId = 1, - %% TODO: read from options/config - Mod = emqx_ds_storage_reference, - ModConfig = #{}, - {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConfig), - GenSchema = #{module => Mod, data => GenData, since => 0, until => undefined}, - ShardSchema = #{ +create_new_shard_schema(ShardId, DB, CFRefs, Options) -> + ?tp(notice, ds_create_new_shard_schema, #{shard => ShardId, options => Options}), + %% TODO: read prototype from options/config + Schema0 = #{ + current_generation => 0, + prototype => {emqx_ds_storage_reference, #{}} + }, + {_NewGenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, _Since = 0), + {Schema, NewCFRefs ++ CFRefs}. + +-spec new_generation(shard_id(), rocksdb:db_handle(), shard_schema(), emqx_ds:time()) -> + {gen_id(), shard_schema(), cf_refs()}. +new_generation(ShardId, DB, Schema0, Since) -> + #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0, + GenId = PrevGenId + 1, + {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf), + GenSchema = #{module => Mod, data => GenData, since => Since, until => undefined}, + Schema = Schema0#{ current_generation => GenId, - default_generation_module => Mod, - default_generation_confg => ModConfig, {generation, GenId} => GenSchema }, - {ShardSchema, NewCFRefs ++ CFRefs}. + {GenId, Schema, NewCFRefs}. %% @doc Commit current state of the server to both rocksdb and the persistent term -spec commit_metadata(server_state()) -> ok. @@ -393,7 +419,7 @@ get_schema_persistent(DB) -> {ok, Blob} -> Schema = binary_to_term(Blob), %% Sanity check: - #{current_generation := _, default_generation_module := _} = Schema, + #{current_generation := _, prototype := _} = Schema, Schema; not_found -> not_found diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index c0fb29ceb..fd480eeab 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -103,7 +103,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> first -> first; _ -> - rocksdb:iterator_move(ITHandle, Key0), + _ = rocksdb:iterator_move(ITHandle, Key0), next end, {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []), diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index eabd03277..1935e41cf 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -21,9 +21,10 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% A simple smoke test that verifies that opening/closing the DB -%% doesn't crash +%% doesn't crash, and not much else t_00_smoke_open_drop(_Config) -> DB = 'DB', ?assertMatch(ok, emqx_ds:open_db(DB, #{})), @@ -65,6 +66,32 @@ t_03_smoke_iterate(_Config) -> {ok, Iter, Batch} = iterate(Iter0, 1), ?assertEqual(Msgs, Batch, {Iter0, Iter}). +%% Verify that iterators survive restart of the application. This is +%% an important property, since the lifetime of the iterators is tied +%% to the external resources, such as clients' sessions, and they +%% should always be able to continue replaying the topics from where +%% they are left off. +t_04_restart(_Config) -> + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + StartTime = 0, + Msgs = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo">>, <<"2">>, 1), + message(<<"bar/bar">>, <<"3">>, 2) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + [{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), + {ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime), + %% Restart the application: + ?tp(warning, emqx_ds_SUITE_restart_app, #{}), + ok = application:stop(emqx_durable_storage), + {ok, _} = application:ensure_all_started(emqx_durable_storage), + ok = emqx_ds:open_db(DB, #{}), + %% The old iterator should be still operational: + {ok, Iter, Batch} = iterate(Iter0, 1), + ?assertEqual(Msgs, Batch, {Iter0, Iter}). + message(Topic, Payload, PublishedAt) -> #message{ topic = Topic, @@ -102,7 +129,7 @@ end_per_suite(Config) -> ok. init_per_testcase(_TC, Config) -> - snabbkaffe:fix_ct_logging(), + %% snabbkaffe:fix_ct_logging(), application:ensure_all_started(emqx_durable_storage), Config.