From 75b08b525b33ef50b9006a54631252ac753c4856 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 15 Jan 2024 16:52:59 -0300 Subject: [PATCH] feat(ds): add `list_generations` and `drop_generation` APIs --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_durable_storage/src/emqx_ds.erl | 57 +++- .../src/emqx_ds_replication_layer.erl | 45 ++- .../src/emqx_ds_storage_bitfield_lts.erl | 16 + .../src/emqx_ds_storage_layer.erl | 293 ++++++++++++++---- .../src/emqx_ds_storage_reference.erl | 8 +- .../src/proto/emqx_ds_proto_v3.erl | 147 +++++++++ .../test/emqx_ds_SUITE.erl | 250 ++++++++++++++- 8 files changed, 755 insertions(+), 62 deletions(-) create mode 100644 apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 2777aec53..2b25cf4be 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -21,6 +21,7 @@ {emqx_delayed,3}. {emqx_ds,1}. {emqx_ds,2}. +{emqx_ds,3}. {emqx_eviction_agent,1}. {emqx_eviction_agent,2}. {emqx_exhook,1}. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index d679f7097..434169520 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -22,7 +22,14 @@ -module(emqx_ds). %% Management API: --export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]). +-export([ + open_db/2, + update_db_config/2, + add_generation/1, + list_generations_with_lifetimes/1, + drop_generation/2, + drop_db/1 +]). %% Message storage API: -export([store_batch/2, store_batch/3]). @@ -52,7 +59,10 @@ get_iterator_result/1, ds_specific_stream/0, - ds_specific_iterator/0 + ds_specific_iterator/0, + ds_specific_generation_rank/0, + generation_rank/0, + generation_info/0 ]). %%================================================================================ @@ -80,6 +90,8 @@ -type ds_specific_stream() :: term(). +-type ds_specific_generation_rank() :: term(). + -type message_key() :: binary(). -type store_batch_result() :: ok | {error, _}. @@ -114,6 +126,17 @@ -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined. +%% An opaque term identifying a generation. Each implementation will possibly add +%% information to this term to match its inner structure (e.g.: by embedding the shard id, +%% in the case of `emqx_ds_replication_layer'). +-opaque generation_rank() :: ds_specific_generation_rank(). + +-type generation_info() :: #{ + created_at := time(), + since := time(), + until := time() | undefined +}. + -define(persistent_term(DB), {emqx_ds_db_backend, DB}). -define(module(DB), (persistent_term:get(?persistent_term(DB)))). @@ -128,6 +151,11 @@ -callback update_db_config(db(), create_db_opts()) -> ok | {error, _}. +-callback list_generations_with_lifetimes(db()) -> + #{generation_rank() => generation_info()}. + +-callback drop_generation(db(), generation_rank()) -> ok | {error, _}. + -callback drop_db(db()) -> ok | {error, _}. -callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). @@ -142,6 +170,11 @@ -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator). +-optional_callbacks([ + list_generations_with_lifetimes/1, + drop_generation/2 +]). + %%================================================================================ %% API funcions %%================================================================================ @@ -166,6 +199,26 @@ add_generation(DB) -> update_db_config(DB, Opts) -> ?module(DB):update_db_config(DB, Opts). +-spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}. +list_generations_with_lifetimes(DB) -> + Mod = ?module(DB), + case erlang:function_exported(Mod, list_generations_with_lifetimes, 1) of + true -> + Mod:list_generations_with_lifetimes(DB); + false -> + #{} + end. + +-spec drop_generation(db(), generation_rank()) -> ok | {error, _}. +drop_generation(DB, GenId) -> + Mod = ?module(DB), + case erlang:function_exported(Mod, drop_generation, 2) of + true -> + Mod:drop_generation(DB, GenId); + false -> + {error, not_implemented} + end. + %% @doc TODO: currently if one or a few shards are down, they won't be %% deleted. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 68d9459ee..387587570 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -25,6 +25,8 @@ open_db/2, add_generation/1, update_db_config/2, + list_generations_with_lifetimes/1, + drop_generation/2, drop_db/1, store_batch/3, get_streams/3, @@ -41,7 +43,9 @@ do_make_iterator_v1/5, do_update_iterator_v2/4, do_next_v1/4, - do_add_generation_v2/1 + do_add_generation_v2/1, + do_list_generations_with_lifetimes_v3/2, + do_drop_generation_v3/3 ]). -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). @@ -104,6 +108,8 @@ ?batch_messages := [emqx_types:message()] }. +-type generation_rank() :: {shard_id(), term()}. + %%================================================================================ %% API functions %%================================================================================ @@ -135,6 +141,32 @@ add_generation(DB) -> update_db_config(DB, CreateOpts) -> emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). +-spec list_generations_with_lifetimes(emqx_ds:db()) -> + #{generation_rank() => emqx_ds:generation_info()}. +list_generations_with_lifetimes(DB) -> + Shards = list_shards(DB), + lists:foldl( + fun(Shard, GensAcc) -> + Node = node_of_shard(DB, Shard), + maps:fold( + fun(GenId, Data, AccInner) -> + AccInner#{{Shard, GenId} => Data} + end, + GensAcc, + emqx_ds_proto_v3:list_generations_with_lifetimes(Node, DB, Shard) + ) + end, + #{}, + Shards + ). + +-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}. +drop_generation(DB, {Shard, GenId}) -> + %% TODO: drop generation in all nodes in the replica set, not only in the leader, + %% after we have proper replication in place. + Node = node_of_shard(DB, Shard), + emqx_ds_proto_v3:drop_generation(Node, DB, Shard, GenId). + -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> Nodes = list_nodes(), @@ -301,7 +333,6 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}. do_add_generation_v2(DB) -> MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), - lists:foreach( fun(ShardId) -> emqx_ds_storage_layer:add_generation({DB, ShardId}) @@ -309,6 +340,16 @@ do_add_generation_v2(DB) -> MyShards ). +-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) -> + #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}. +do_list_generations_with_lifetimes_v3(DB, ShardId) -> + emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}). + +-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) -> + ok | {error, _}. +do_drop_generation_v3(DB, ShardId, GenId) -> + emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId). + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 4c59a5f62..27d41e6c6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -27,6 +27,7 @@ -export([ create/4, open/5, + drop/5, store_batch/4, get_streams/4, make_iterator/5, @@ -199,6 +200,21 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> ts_offset = TSOffsetBits }. +-spec drop( + emqx_ds_storage_layer:shard_id(), + rocksdb:db_handle(), + emqx_ds_storage_layer:gen_id(), + emqx_ds_storage_layer:cf_refs(), + s() +) -> + ok. +drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> + {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), + {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), + ok = rocksdb:drop_column_family(DBHandle, DataCF), + ok = rocksdb:drop_column_family(DBHandle, TrieCF), + ok. + -spec store_batch( emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() ) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index ab64005b6..ed161d290 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -27,7 +27,9 @@ update_iterator/3, next/3, update_config/2, - add_generation/1 + add_generation/1, + list_generations_with_lifetimes/1, + drop_generation/2 ]). %% gen_server @@ -44,7 +46,8 @@ iterator/0, shard_id/0, options/0, - prototype/0 + prototype/0, + post_creation_context/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -95,11 +98,18 @@ %%%% Generation: +-define(GEN_KEY(GEN_ID), {generation, GEN_ID}). + -type generation(Data) :: #{ %% Module that handles data for the generation: module := module(), %% Module-specific data defined at generation creation time: data := Data, + %% Column families used by this generation + cf_refs := cf_refs(), + %% Time at which this was created. Might differ from `since', in particular for the + %% first generation. + created_at := emqx_ds:time(), %% When should this generation become active? %% This generation should only contain messages timestamped no earlier than that. %% The very first generation will have `since` equal 0. @@ -121,7 +131,7 @@ %% This data is used to create new generation: prototype := prototype(), %% Generations: - {generation, gen_id()} => GenData + ?GEN_KEY(gen_id()) => GenData }. %% Shard schema (persistent): @@ -132,6 +142,18 @@ -type options() :: map(). +-type post_creation_context() :: + #{ + shard_id := emqx_ds_storage_layer:shard_id(), + db := rocksdb:db_handle(), + new_gen_id := emqx_ds_storage_layer:gen_id(), + old_gen_id := emqx_ds_storage_layer:gen_id(), + new_cf_refs := cf_refs(), + old_cf_refs := cf_refs(), + new_gen_runtime_data := _NewData, + old_gen_runtime_data := _OldData + }. + %%================================================================================ %% Generation callbacks %%================================================================================ @@ -145,6 +167,9 @@ -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> _Data. +-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> + ok | {error, _Reason}. + -callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). @@ -157,10 +182,17 @@ -callback next(shard_id(), _Data, Iter, pos_integer()) -> {ok, Iter, [emqx_types:message()]} | {error, _}. +-callback post_creation_actions(post_creation_context()) -> _Data. + +-optional_callbacks([post_creation_actions/1]). + %%================================================================================ %% API for the replication layer %%================================================================================ +-record(call_list_generations_with_lifetimes, {}). +-record(call_drop_generation, {gen_id :: gen_id()}). + -spec open_shard(shard_id(), options()) -> ok. open_shard(Shard, Options) -> emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). @@ -182,18 +214,25 @@ store_batch(Shard, Messages, Options) -> [{integer(), stream()}]. get_streams(Shard, TopicFilter, StartTime) -> Gens = generations_since(Shard, StartTime), + ?tp(get_streams_all_gens, #{gens => Gens}), lists:flatmap( fun(GenId) -> - #{module := Mod, data := GenData} = generation_get(Shard, GenId), - Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), - [ - {GenId, #{ - ?tag => ?STREAM, - ?generation => GenId, - ?enc => Stream - }} - || Stream <- Streams - ] + ?tp(get_streams_get_gen, #{gen_id => GenId}), + case generation_get_safe(Shard, GenId) of + {ok, #{module := Mod, data := GenData}} -> + Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), + [ + {GenId, #{ + ?tag => ?STREAM, + ?generation => GenId, + ?enc => Stream + }} + || Stream <- Streams + ]; + {error, not_found} -> + %% race condition: generation was dropped before getting its streams? + [] + end end, Gens ). @@ -203,16 +242,20 @@ get_streams(Shard, TopicFilter, StartTime) -> make_iterator( Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime ) -> - #{module := Mod, data := GenData} = generation_get(Shard, GenId), - case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of - {ok, Iter} -> - {ok, #{ - ?tag => ?IT, - ?generation => GenId, - ?enc => Iter - }}; - {error, _} = Err -> - Err + case generation_get_safe(Shard, GenId) of + {ok, #{module := Mod, data := GenData}} -> + case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of + {ok, Iter} -> + {ok, #{ + ?tag => ?IT, + ?generation => GenId, + ?enc => Iter + }}; + {error, _} = Err -> + Err + end; + {error, not_found} -> + {error, end_of_stream} end. -spec update_iterator( @@ -224,33 +267,42 @@ update_iterator( #{?tag := ?IT, ?generation := GenId, ?enc := OldIter}, DSKey ) -> - #{module := Mod, data := GenData} = generation_get(Shard, GenId), - case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of - {ok, Iter} -> - {ok, #{ - ?tag => ?IT, - ?generation => GenId, - ?enc => Iter - }}; - {error, _} = Err -> - Err + case generation_get_safe(Shard, GenId) of + {ok, #{module := Mod, data := GenData}} -> + case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of + {ok, Iter} -> + {ok, #{ + ?tag => ?IT, + ?generation => GenId, + ?enc => Iter + }}; + {error, _} = Err -> + Err + end; + {error, not_found} -> + {error, end_of_stream} end. -spec next(shard_id(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> - #{module := Mod, data := GenData} = generation_get(Shard, GenId), - Current = generation_current(Shard), - case Mod:next(Shard, GenData, GenIter0, BatchSize) of - {ok, _GenIter, []} when GenId < Current -> - %% This is a past generation. Storage layer won't write - %% any more messages here. The iterator reached the end: - %% the stream has been fully replayed. - {ok, end_of_stream}; - {ok, GenIter, Batch} -> - {ok, Iter#{?enc := GenIter}, Batch}; - Error = {error, _} -> - Error + case generation_get_safe(Shard, GenId) of + {ok, #{module := Mod, data := GenData}} -> + Current = generation_current(Shard), + case Mod:next(Shard, GenData, GenIter0, BatchSize) of + {ok, _GenIter, []} when GenId < Current -> + %% This is a past generation. Storage layer won't write + %% any more messages here. The iterator reached the end: + %% the stream has been fully replayed. + {ok, end_of_stream}; + {ok, GenIter, Batch} -> + {ok, Iter#{?enc := GenIter}, Batch}; + Error = {error, _} -> + Error + end; + {error, not_found} -> + %% generation was possibly dropped by GC + {ok, end_of_stream} end. -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok. @@ -261,6 +313,21 @@ update_config(ShardId, Options) -> add_generation(ShardId) -> gen_server:call(?REF(ShardId), add_generation, infinity). +-spec list_generations_with_lifetimes(shard_id()) -> + #{ + gen_id() => #{ + created_at := emqx_ds:time(), + since := emqx_ds:time(), + until := undefined | emqx_ds:time() + } + }. +list_generations_with_lifetimes(ShardId) -> + gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity). + +-spec drop_generation(shard_id(), gen_id()) -> ok. +drop_generation(ShardId, GenId) -> + gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity). + %%================================================================================ %% gen_server for the shard %%================================================================================ @@ -322,6 +389,13 @@ handle_call(add_generation, _From, S0) -> S = add_generation(S0, Since), commit_metadata(S), {reply, ok, S}; +handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> + Generations = handle_list_generations_with_lifetimes(S), + {reply, Generations, S}; +handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> + {Reply, S} = handle_drop_generation(S0, GenId), + commit_metadata(S), + {reply, Reply, S}; handle_call(#call_create_generation{since = Since}, _From, S0) -> S = add_generation(S0, Since), commit_metadata(S), @@ -353,7 +427,7 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> %% Transform generation schemas to generation runtime data: maps:map( fun - ({generation, GenId}, GenSchema) -> + (?GEN_KEY(GenId), GenSchema) -> open_generation(ShardId, DB, CFRefs, GenId, GenSchema); (_K, Val) -> Val @@ -366,10 +440,40 @@ add_generation(S0, Since) -> #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, Schema1 = update_last_until(Schema0, Since), Shard1 = update_last_until(Shard0, Since), + + #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0, + OldKey = ?GEN_KEY(OldGenId), + #{OldKey := OldGenSchema} = Schema0, + #{cf_refs := OldCFRefs} = OldGenSchema, + #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0, + {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), + CFRefs = NewCFRefs ++ CFRefs0, - Key = {generation, GenId}, - Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), + Key = ?GEN_KEY(GenId), + Generation0 = + #{data := NewGenData0} = + open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), + + %% When the new generation's module is the same as the last one, we might want to + %% perform actions like inheriting some of the previous (meta)data. + NewGenData = + run_post_creation_actions( + #{ + shard_id => ShardId, + db => DB, + new_gen_id => GenId, + old_gen_id => OldGenId, + new_cf_refs => NewCFRefs, + old_cf_refs => OldCFRefs, + new_gen_runtime_data => NewGenData0, + old_gen_runtime_data => OldGenData, + new_module => CurrentMod, + old_module => OldMod + } + ), + Generation = Generation0#{data := NewGenData}, + Shard = Shard1#{current_generation := GenId, Key => Generation}, S0#s{ cf_refs = CFRefs, @@ -377,6 +481,54 @@ add_generation(S0, Since) -> shard = Shard }. +-spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}. +handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) -> + maps:fold( + fun + (?GEN_KEY(GenId), GenSchema, Acc) -> + Acc#{GenId => export_generation(GenSchema)}; + (_Key, _Value, Acc) -> + Acc + end, + #{}, + ShardSchema + ). + +-spec export_generation(generation_schema()) -> map(). +export_generation(GenSchema) -> + maps:with([created_at, since, until], GenSchema). + +-spec handle_drop_generation(server_state(), gen_id()) -> + {ok | {error, current_generation}, server_state()}. +handle_drop_generation(#s{schema = #{current_generation := GenId}} = S0, GenId) -> + {{error, current_generation}, S0}; +handle_drop_generation(#s{schema = Schema} = S0, GenId) when + not is_map_key(?GEN_KEY(GenId), Schema) +-> + {{error, not_found}, S0}; +handle_drop_generation(S0, GenId) -> + #s{ + shard_id = ShardId, + db = DB, + schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema, + shard = OldShard, + cf_refs = OldCFRefs + } = S0, + #{module := Mod, cf_refs := GenCFRefs} = GenSchema, + #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, + case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of + ok -> + CFRefs = OldCFRefs -- GenCFRefs, + Shard = maps:remove(?GEN_KEY(GenId), OldShard), + Schema = maps:remove(?GEN_KEY(GenId), OldSchema), + S = S0#s{ + cf_refs = CFRefs, + shard = Shard, + schema = Schema + }, + {ok, S} + end. + -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> generation(). open_generation(ShardId, DB, CFRefs, GenId, GenSchema) -> @@ -403,10 +555,17 @@ new_generation(ShardId, DB, Schema0, Since) -> #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0, GenId = PrevGenId + 1, {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf), - GenSchema = #{module => Mod, data => GenData, since => Since, until => undefined}, + GenSchema = #{ + module => Mod, + data => GenData, + cf_refs => NewCFRefs, + created_at => emqx_message:timestamp_now(), + since => Since, + until => undefined + }, Schema = Schema0#{ current_generation => GenId, - {generation, GenId} => GenSchema + ?GEN_KEY(GenId) => GenSchema }, {GenId, Schema, NewCFRefs}. @@ -453,9 +612,26 @@ db_dir({DB, ShardId}) -> -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard(). update_last_until(Schema, Until) -> #{current_generation := GenId} = Schema, - GenData0 = maps:get({generation, GenId}, Schema), + GenData0 = maps:get(?GEN_KEY(GenId), Schema), GenData = GenData0#{until := Until}, - Schema#{{generation, GenId} := GenData}. + Schema#{?GEN_KEY(GenId) := GenData}. + +run_post_creation_actions( + #{ + new_module := Mod, + old_module := Mod, + new_gen_runtime_data := NewGenData + } = Context +) -> + case erlang:function_exported(Mod, post_creation_actions, 1) of + true -> + Mod:post_creation_actions(Context); + false -> + NewGenData + end; +run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) -> + %% Different implementation modules + NewGenData. %%-------------------------------------------------------------------------------- %% Schema access @@ -468,15 +644,24 @@ generation_current(Shard) -> -spec generation_get(shard_id(), gen_id()) -> generation(). generation_get(Shard, GenId) -> - #{{generation, GenId} := GenData} = get_schema_runtime(Shard), + {ok, GenData} = generation_get_safe(Shard, GenId), GenData. +-spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}. +generation_get_safe(Shard, GenId) -> + case get_schema_runtime(Shard) of + #{?GEN_KEY(GenId) := GenData} -> + {ok, GenData}; + #{} -> + {error, not_found} + end. + -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()]. generations_since(Shard, Since) -> Schema = get_schema_runtime(Shard), maps:fold( fun - ({generation, GenId}, #{until := Until}, Acc) when Until >= Since -> + (?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since -> [GenId | Acc]; (_K, _V, Acc) -> Acc diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index da7ac79f6..c958e56dc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -30,6 +30,7 @@ -export([ create/4, open/5, + drop/5, store_batch/4, get_streams/4, make_iterator/5, @@ -85,6 +86,10 @@ open(_Shard, DBHandle, GenId, CFRefs, #schema{}) -> {_, CF} = lists:keyfind(data_cf(GenId), 1, CFRefs), #s{db = DBHandle, cf = CF}. +drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> + ok = rocksdb:drop_column_family(DBHandle, CFHandle), + ok. + store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> lists:foreach( fun(Msg) -> @@ -142,7 +147,8 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> case rocksdb:iterator_move(IT, Action) of {ok, Key, Blob} -> Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), - case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of + TopicWords = emqx_topic:words(Topic), + case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of true -> do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]); false -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl new file mode 100644 index 000000000..74a174c4c --- /dev/null +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl @@ -0,0 +1,147 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_proto_v3). + +-behavior(emqx_bpapi). + +-include_lib("emqx_utils/include/bpapi.hrl"). +%% API: +-export([ + drop_db/2, + store_batch/5, + get_streams/5, + make_iterator/6, + next/5, + update_iterator/5, + add_generation/2, + + %% introduced in v3 + list_generations_with_lifetimes/3, + drop_generation/4 +]). + +%% behavior callbacks: +-export([introduced_in/0]). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec drop_db([node()], emqx_ds:db()) -> + [{ok, ok} | {error, _}]. +drop_db(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]). + +-spec get_streams( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + [{integer(), emqx_ds_storage_layer:stream()}]. +get_streams(Node, DB, Shard, TopicFilter, Time) -> + erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]). + +-spec make_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [ + DB, Shard, Stream, TopicFilter, StartTime + ]). + +-spec next( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + pos_integer() +) -> + {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]} + | {ok, end_of_stream} + | {error, _}. +next(Node, DB, Shard, Iter, BatchSize) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). + +-spec store_batch( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_replication_layer:batch(), + emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). +store_batch(Node, DB, Shard, Batch, Options) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [ + DB, Shard, Batch, Options + ]). + +-spec update_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + emqx_ds:message_key() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +update_iterator(Node, DB, Shard, OldIter, DSKey) -> + erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [ + DB, Shard, OldIter, DSKey + ]). + +-spec add_generation([node()], emqx_ds:db()) -> + [{ok, ok} | {error, _}]. +add_generation(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]). + +%%-------------------------------------------------------------------------------- +%% Introduced in V3 +%%-------------------------------------------------------------------------------- + +-spec list_generations_with_lifetimes( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id() +) -> + #{ + emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info() + }. +list_generations_with_lifetimes(Node, DB, Shard) -> + erpc:call(Node, emqx_ds_replication_layer, do_list_generations_with_lifetimes_v3, [DB, Shard]). + +-spec drop_generation( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:gen_id() +) -> + ok | {error, _}. +drop_generation(Node, DB, Shard, GenId) -> + erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +introduced_in() -> + "5.6.0". diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index cb9d81580..d7dccccf5 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -155,7 +155,7 @@ t_05_update_iterator(_Config) -> ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}), ok. -t_05_update_config(_Config) -> +t_06_update_config(_Config) -> DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, opts())), TopicFilter = ['#'], @@ -199,7 +199,7 @@ t_05_update_config(_Config) -> end, lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). -t_06_add_generation(_Config) -> +t_07_add_generation(_Config) -> DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, opts())), TopicFilter = ['#'], @@ -243,6 +243,250 @@ t_06_add_generation(_Config) -> end, lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). +%% Verifies the basic usage of `list_generations_with_lifetimes' and `drop_generation'... +%% 1) Cannot drop current generation. +%% 2) All existing generations are returned by `list_generation_with_lifetimes'. +%% 3) Dropping a generation removes it from the list. +%% 4) Dropped generations stay dropped even after restarting the application. +t_08_smoke_list_drop_generation(_Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + %% Exactly one generation at first. + Generations0 = emqx_ds:list_generations_with_lifetimes(DB), + ?assertMatch( + [{_GenId, #{since := _, until := _}}], + maps:to_list(Generations0), + #{gens => Generations0} + ), + [{GenId0, _}] = maps:to_list(Generations0), + %% Cannot delete current generation + ?assertEqual({error, current_generation}, emqx_ds:drop_generation(DB, GenId0)), + + %% New gen + ok = emqx_ds:add_generation(DB), + Generations1 = emqx_ds:list_generations_with_lifetimes(DB), + ?assertMatch( + [ + {GenId0, #{since := _, until := _}}, + {_GenId1, #{since := _, until := _}} + ], + lists:sort(maps:to_list(Generations1)), + #{gens => Generations1} + ), + [GenId0, GenId1] = lists:sort(maps:keys(Generations1)), + + %% Drop the older one + ?assertEqual(ok, emqx_ds:drop_generation(DB, GenId0)), + Generations2 = emqx_ds:list_generations_with_lifetimes(DB), + ?assertMatch( + [{GenId1, #{since := _, until := _}}], + lists:sort(maps:to_list(Generations2)), + #{gens => Generations2} + ), + + %% Unknown gen_id, as it was already dropped + ?assertEqual({error, not_found}, emqx_ds:drop_generation(DB, GenId0)), + + %% Should persist surviving generation list + ok = application:stop(emqx_durable_storage), + {ok, _} = application:ensure_all_started(emqx_durable_storage), + ok = emqx_ds:open_db(DB, opts()), + + Generations3 = emqx_ds:list_generations_with_lifetimes(DB), + ?assertMatch( + [{GenId1, #{since := _, until := _}}], + lists:sort(maps:to_list(Generations3)), + #{gens => Generations3} + ), + + ok + end, + [] + ), + ok. + +t_drop_generation_with_never_used_iterator(_Config) -> + %% This test checks how the iterator behaves when: + %% 1) it's created at generation 1 and not consumed from. + %% 2) generation 2 is created and 1 dropped. + %% 3) iteration begins. + %% In this case, the iterator won't see any messages and the stream will end. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + Now = emqx_message:timestamp_now(), + Msgs1 = [ + message(<<"foo/bar">>, <<"3">>, Now + 100), + message(<<"foo/baz">>, <<"4">>, Now + 101) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), + + ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter0, 1)), + + %% New iterator for the new stream will only see the later messages. + [{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + ?assertNotEqual(Stream0, Stream1), + {ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime), + + {ok, Iter, Batch} = iterate(DB, Iter1, 1), + ?assertNotEqual(end_of_stream, Iter), + ?assertEqual(Msgs1, [Msg || {_Key, Msg} <- Batch]), + + ok. + +t_drop_generation_with_used_once_iterator(_Config) -> + %% This test checks how the iterator behaves when: + %% 1) it's created at generation 1 and consumes at least 1 message. + %% 2) generation 2 is created and 1 dropped. + %% 3) iteration continues. + %% In this case, the iterator should see no more messages and the stream will end. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = + [Msg0 | _] = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime), + {ok, Iter1, Batch1} = emqx_ds:next(DB, Iter0, 1), + ?assertNotEqual(end_of_stream, Iter1), + ?assertEqual([Msg0], [Msg || {_Key, Msg} <- Batch1]), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + Now = emqx_message:timestamp_now(), + Msgs1 = [ + message(<<"foo/bar">>, <<"3">>, Now + 100), + message(<<"foo/baz">>, <<"4">>, Now + 101) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), + + ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter1, 1)), + + ok. + +t_drop_generation_update_iterator(_Config) -> + %% This checks the behavior of `emqx_ds:update_iterator' after the generation + %% underlying the iterator has been dropped. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime), + {ok, Iter1, _Batch1} = emqx_ds:next(DB, Iter0, 1), + {ok, _Iter2, [{Key2, _Msg}]} = emqx_ds:next(DB, Iter1, 1), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + ?assertEqual({error, end_of_stream}, emqx_ds:update_iterator(DB, Iter1, Key2)), + + ok. + +t_make_iterator_stale_stream(_Config) -> + %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying + %% the stream has been dropped. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + ?assertEqual( + {error, end_of_stream}, + emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime) + ), + + ok. + +t_get_streams_concurrently_with_drop_generation(_Config) -> + %% This checks that we can get all streams while a generation is dropped + %% mid-iteration. + + DB = ?FUNCTION_NAME, + ?check_trace( + #{timetrap => 5_000}, + begin + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:add_generation(DB), + + %% All streams + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + ?assertMatch([_, _, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)), + + ?force_ordering( + #{?snk_kind := dropped_gen}, + #{?snk_kind := get_streams_get_gen} + ), + + spawn_link(fun() -> + {ok, _} = ?block_until(#{?snk_kind := get_streams_all_gens}), + ok = emqx_ds:drop_generation(DB, GenId0), + ?tp(dropped_gen, #{}) + end), + + ?assertMatch([_, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)), + + ok + end, + [] + ), + + ok. + update_data_set() -> [ [ @@ -295,7 +539,7 @@ iterate(DB, It0, BatchSize, Acc) -> {ok, It, Msgs} -> iterate(DB, It, BatchSize, Acc ++ Msgs); {ok, end_of_stream} -> - {ok, It0, Acc}; + {ok, end_of_stream, Acc}; Ret -> Ret end.