feat(ds): add `list_generations` and `drop_generation` APIs
This commit is contained in:
parent
33aa6da15c
commit
75b08b525b
|
@ -21,6 +21,7 @@
|
||||||
{emqx_delayed,3}.
|
{emqx_delayed,3}.
|
||||||
{emqx_ds,1}.
|
{emqx_ds,1}.
|
||||||
{emqx_ds,2}.
|
{emqx_ds,2}.
|
||||||
|
{emqx_ds,3}.
|
||||||
{emqx_eviction_agent,1}.
|
{emqx_eviction_agent,1}.
|
||||||
{emqx_eviction_agent,2}.
|
{emqx_eviction_agent,2}.
|
||||||
{emqx_exhook,1}.
|
{emqx_exhook,1}.
|
||||||
|
|
|
@ -22,7 +22,14 @@
|
||||||
-module(emqx_ds).
|
-module(emqx_ds).
|
||||||
|
|
||||||
%% Management API:
|
%% Management API:
|
||||||
-export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]).
|
-export([
|
||||||
|
open_db/2,
|
||||||
|
update_db_config/2,
|
||||||
|
add_generation/1,
|
||||||
|
list_generations_with_lifetimes/1,
|
||||||
|
drop_generation/2,
|
||||||
|
drop_db/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% Message storage API:
|
%% Message storage API:
|
||||||
-export([store_batch/2, store_batch/3]).
|
-export([store_batch/2, store_batch/3]).
|
||||||
|
@ -52,7 +59,10 @@
|
||||||
get_iterator_result/1,
|
get_iterator_result/1,
|
||||||
|
|
||||||
ds_specific_stream/0,
|
ds_specific_stream/0,
|
||||||
ds_specific_iterator/0
|
ds_specific_iterator/0,
|
||||||
|
ds_specific_generation_rank/0,
|
||||||
|
generation_rank/0,
|
||||||
|
generation_info/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -80,6 +90,8 @@
|
||||||
|
|
||||||
-type ds_specific_stream() :: term().
|
-type ds_specific_stream() :: term().
|
||||||
|
|
||||||
|
-type ds_specific_generation_rank() :: term().
|
||||||
|
|
||||||
-type message_key() :: binary().
|
-type message_key() :: binary().
|
||||||
|
|
||||||
-type store_batch_result() :: ok | {error, _}.
|
-type store_batch_result() :: ok | {error, _}.
|
||||||
|
@ -114,6 +126,17 @@
|
||||||
|
|
||||||
-type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
|
-type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
|
||||||
|
|
||||||
|
%% An opaque term identifying a generation. Each implementation will possibly add
|
||||||
|
%% information to this term to match its inner structure (e.g.: by embedding the shard id,
|
||||||
|
%% in the case of `emqx_ds_replication_layer').
|
||||||
|
-opaque generation_rank() :: ds_specific_generation_rank().
|
||||||
|
|
||||||
|
-type generation_info() :: #{
|
||||||
|
created_at := time(),
|
||||||
|
since := time(),
|
||||||
|
until := time() | undefined
|
||||||
|
}.
|
||||||
|
|
||||||
-define(persistent_term(DB), {emqx_ds_db_backend, DB}).
|
-define(persistent_term(DB), {emqx_ds_db_backend, DB}).
|
||||||
|
|
||||||
-define(module(DB), (persistent_term:get(?persistent_term(DB)))).
|
-define(module(DB), (persistent_term:get(?persistent_term(DB)))).
|
||||||
|
@ -128,6 +151,11 @@
|
||||||
|
|
||||||
-callback update_db_config(db(), create_db_opts()) -> ok | {error, _}.
|
-callback update_db_config(db(), create_db_opts()) -> ok | {error, _}.
|
||||||
|
|
||||||
|
-callback list_generations_with_lifetimes(db()) ->
|
||||||
|
#{generation_rank() => generation_info()}.
|
||||||
|
|
||||||
|
-callback drop_generation(db(), generation_rank()) -> ok | {error, _}.
|
||||||
|
|
||||||
-callback drop_db(db()) -> ok | {error, _}.
|
-callback drop_db(db()) -> ok | {error, _}.
|
||||||
|
|
||||||
-callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
|
-callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
|
||||||
|
@ -142,6 +170,11 @@
|
||||||
|
|
||||||
-callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
|
-callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
|
||||||
|
|
||||||
|
-optional_callbacks([
|
||||||
|
list_generations_with_lifetimes/1,
|
||||||
|
drop_generation/2
|
||||||
|
]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -166,6 +199,26 @@ add_generation(DB) ->
|
||||||
update_db_config(DB, Opts) ->
|
update_db_config(DB, Opts) ->
|
||||||
?module(DB):update_db_config(DB, Opts).
|
?module(DB):update_db_config(DB, Opts).
|
||||||
|
|
||||||
|
-spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}.
|
||||||
|
list_generations_with_lifetimes(DB) ->
|
||||||
|
Mod = ?module(DB),
|
||||||
|
case erlang:function_exported(Mod, list_generations_with_lifetimes, 1) of
|
||||||
|
true ->
|
||||||
|
Mod:list_generations_with_lifetimes(DB);
|
||||||
|
false ->
|
||||||
|
#{}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec drop_generation(db(), generation_rank()) -> ok | {error, _}.
|
||||||
|
drop_generation(DB, GenId) ->
|
||||||
|
Mod = ?module(DB),
|
||||||
|
case erlang:function_exported(Mod, drop_generation, 2) of
|
||||||
|
true ->
|
||||||
|
Mod:drop_generation(DB, GenId);
|
||||||
|
false ->
|
||||||
|
{error, not_implemented}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc TODO: currently if one or a few shards are down, they won't be
|
%% @doc TODO: currently if one or a few shards are down, they won't be
|
||||||
|
|
||||||
%% deleted.
|
%% deleted.
|
||||||
|
|
|
@ -25,6 +25,8 @@
|
||||||
open_db/2,
|
open_db/2,
|
||||||
add_generation/1,
|
add_generation/1,
|
||||||
update_db_config/2,
|
update_db_config/2,
|
||||||
|
list_generations_with_lifetimes/1,
|
||||||
|
drop_generation/2,
|
||||||
drop_db/1,
|
drop_db/1,
|
||||||
store_batch/3,
|
store_batch/3,
|
||||||
get_streams/3,
|
get_streams/3,
|
||||||
|
@ -41,7 +43,9 @@
|
||||||
do_make_iterator_v1/5,
|
do_make_iterator_v1/5,
|
||||||
do_update_iterator_v2/4,
|
do_update_iterator_v2/4,
|
||||||
do_next_v1/4,
|
do_next_v1/4,
|
||||||
do_add_generation_v2/1
|
do_add_generation_v2/1,
|
||||||
|
do_list_generations_with_lifetimes_v3/2,
|
||||||
|
do_drop_generation_v3/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
|
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
|
||||||
|
@ -104,6 +108,8 @@
|
||||||
?batch_messages := [emqx_types:message()]
|
?batch_messages := [emqx_types:message()]
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-type generation_rank() :: {shard_id(), term()}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API functions
|
%% API functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -135,6 +141,32 @@ add_generation(DB) ->
|
||||||
update_db_config(DB, CreateOpts) ->
|
update_db_config(DB, CreateOpts) ->
|
||||||
emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
|
emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
|
||||||
|
|
||||||
|
-spec list_generations_with_lifetimes(emqx_ds:db()) ->
|
||||||
|
#{generation_rank() => emqx_ds:generation_info()}.
|
||||||
|
list_generations_with_lifetimes(DB) ->
|
||||||
|
Shards = list_shards(DB),
|
||||||
|
lists:foldl(
|
||||||
|
fun(Shard, GensAcc) ->
|
||||||
|
Node = node_of_shard(DB, Shard),
|
||||||
|
maps:fold(
|
||||||
|
fun(GenId, Data, AccInner) ->
|
||||||
|
AccInner#{{Shard, GenId} => Data}
|
||||||
|
end,
|
||||||
|
GensAcc,
|
||||||
|
emqx_ds_proto_v3:list_generations_with_lifetimes(Node, DB, Shard)
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
Shards
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
|
||||||
|
drop_generation(DB, {Shard, GenId}) ->
|
||||||
|
%% TODO: drop generation in all nodes in the replica set, not only in the leader,
|
||||||
|
%% after we have proper replication in place.
|
||||||
|
Node = node_of_shard(DB, Shard),
|
||||||
|
emqx_ds_proto_v3:drop_generation(Node, DB, Shard, GenId).
|
||||||
|
|
||||||
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
||||||
drop_db(DB) ->
|
drop_db(DB) ->
|
||||||
Nodes = list_nodes(),
|
Nodes = list_nodes(),
|
||||||
|
@ -301,7 +333,6 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
|
||||||
-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
|
-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
|
||||||
do_add_generation_v2(DB) ->
|
do_add_generation_v2(DB) ->
|
||||||
MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
|
MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
|
||||||
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(ShardId) ->
|
fun(ShardId) ->
|
||||||
emqx_ds_storage_layer:add_generation({DB, ShardId})
|
emqx_ds_storage_layer:add_generation({DB, ShardId})
|
||||||
|
@ -309,6 +340,16 @@ do_add_generation_v2(DB) ->
|
||||||
MyShards
|
MyShards
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
|
||||||
|
#{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}.
|
||||||
|
do_list_generations_with_lifetimes_v3(DB, ShardId) ->
|
||||||
|
emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}).
|
||||||
|
|
||||||
|
-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
|
||||||
|
ok | {error, _}.
|
||||||
|
do_drop_generation_v3(DB, ShardId, GenId) ->
|
||||||
|
emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
-export([
|
-export([
|
||||||
create/4,
|
create/4,
|
||||||
open/5,
|
open/5,
|
||||||
|
drop/5,
|
||||||
store_batch/4,
|
store_batch/4,
|
||||||
get_streams/4,
|
get_streams/4,
|
||||||
make_iterator/5,
|
make_iterator/5,
|
||||||
|
@ -199,6 +200,21 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
|
||||||
ts_offset = TSOffsetBits
|
ts_offset = TSOffsetBits
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-spec drop(
|
||||||
|
emqx_ds_storage_layer:shard_id(),
|
||||||
|
rocksdb:db_handle(),
|
||||||
|
emqx_ds_storage_layer:gen_id(),
|
||||||
|
emqx_ds_storage_layer:cf_refs(),
|
||||||
|
s()
|
||||||
|
) ->
|
||||||
|
ok.
|
||||||
|
drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
|
||||||
|
{_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
|
||||||
|
{_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
|
||||||
|
ok = rocksdb:drop_column_family(DBHandle, DataCF),
|
||||||
|
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
|
||||||
|
ok.
|
||||||
|
|
||||||
-spec store_batch(
|
-spec store_batch(
|
||||||
emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
|
emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
|
||||||
) ->
|
) ->
|
||||||
|
|
|
@ -27,7 +27,9 @@
|
||||||
update_iterator/3,
|
update_iterator/3,
|
||||||
next/3,
|
next/3,
|
||||||
update_config/2,
|
update_config/2,
|
||||||
add_generation/1
|
add_generation/1,
|
||||||
|
list_generations_with_lifetimes/1,
|
||||||
|
drop_generation/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server
|
%% gen_server
|
||||||
|
@ -44,7 +46,8 @@
|
||||||
iterator/0,
|
iterator/0,
|
||||||
shard_id/0,
|
shard_id/0,
|
||||||
options/0,
|
options/0,
|
||||||
prototype/0
|
prototype/0,
|
||||||
|
post_creation_context/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
@ -95,11 +98,18 @@
|
||||||
|
|
||||||
%%%% Generation:
|
%%%% Generation:
|
||||||
|
|
||||||
|
-define(GEN_KEY(GEN_ID), {generation, GEN_ID}).
|
||||||
|
|
||||||
-type generation(Data) :: #{
|
-type generation(Data) :: #{
|
||||||
%% Module that handles data for the generation:
|
%% Module that handles data for the generation:
|
||||||
module := module(),
|
module := module(),
|
||||||
%% Module-specific data defined at generation creation time:
|
%% Module-specific data defined at generation creation time:
|
||||||
data := Data,
|
data := Data,
|
||||||
|
%% Column families used by this generation
|
||||||
|
cf_refs := cf_refs(),
|
||||||
|
%% Time at which this was created. Might differ from `since', in particular for the
|
||||||
|
%% first generation.
|
||||||
|
created_at := emqx_ds:time(),
|
||||||
%% When should this generation become active?
|
%% When should this generation become active?
|
||||||
%% This generation should only contain messages timestamped no earlier than that.
|
%% This generation should only contain messages timestamped no earlier than that.
|
||||||
%% The very first generation will have `since` equal 0.
|
%% The very first generation will have `since` equal 0.
|
||||||
|
@ -121,7 +131,7 @@
|
||||||
%% This data is used to create new generation:
|
%% This data is used to create new generation:
|
||||||
prototype := prototype(),
|
prototype := prototype(),
|
||||||
%% Generations:
|
%% Generations:
|
||||||
{generation, gen_id()} => GenData
|
?GEN_KEY(gen_id()) => GenData
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%% Shard schema (persistent):
|
%% Shard schema (persistent):
|
||||||
|
@ -132,6 +142,18 @@
|
||||||
|
|
||||||
-type options() :: map().
|
-type options() :: map().
|
||||||
|
|
||||||
|
-type post_creation_context() ::
|
||||||
|
#{
|
||||||
|
shard_id := emqx_ds_storage_layer:shard_id(),
|
||||||
|
db := rocksdb:db_handle(),
|
||||||
|
new_gen_id := emqx_ds_storage_layer:gen_id(),
|
||||||
|
old_gen_id := emqx_ds_storage_layer:gen_id(),
|
||||||
|
new_cf_refs := cf_refs(),
|
||||||
|
old_cf_refs := cf_refs(),
|
||||||
|
new_gen_runtime_data := _NewData,
|
||||||
|
old_gen_runtime_data := _OldData
|
||||||
|
}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Generation callbacks
|
%% Generation callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -145,6 +167,9 @@
|
||||||
-callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
|
-callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
|
||||||
_Data.
|
_Data.
|
||||||
|
|
||||||
|
-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
|
||||||
|
ok | {error, _Reason}.
|
||||||
|
|
||||||
-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
|
|
||||||
|
@ -157,10 +182,17 @@
|
||||||
-callback next(shard_id(), _Data, Iter, pos_integer()) ->
|
-callback next(shard_id(), _Data, Iter, pos_integer()) ->
|
||||||
{ok, Iter, [emqx_types:message()]} | {error, _}.
|
{ok, Iter, [emqx_types:message()]} | {error, _}.
|
||||||
|
|
||||||
|
-callback post_creation_actions(post_creation_context()) -> _Data.
|
||||||
|
|
||||||
|
-optional_callbacks([post_creation_actions/1]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API for the replication layer
|
%% API for the replication layer
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
-record(call_list_generations_with_lifetimes, {}).
|
||||||
|
-record(call_drop_generation, {gen_id :: gen_id()}).
|
||||||
|
|
||||||
-spec open_shard(shard_id(), options()) -> ok.
|
-spec open_shard(shard_id(), options()) -> ok.
|
||||||
open_shard(Shard, Options) ->
|
open_shard(Shard, Options) ->
|
||||||
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
|
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
|
||||||
|
@ -182,18 +214,25 @@ store_batch(Shard, Messages, Options) ->
|
||||||
[{integer(), stream()}].
|
[{integer(), stream()}].
|
||||||
get_streams(Shard, TopicFilter, StartTime) ->
|
get_streams(Shard, TopicFilter, StartTime) ->
|
||||||
Gens = generations_since(Shard, StartTime),
|
Gens = generations_since(Shard, StartTime),
|
||||||
|
?tp(get_streams_all_gens, #{gens => Gens}),
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(GenId) ->
|
fun(GenId) ->
|
||||||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
?tp(get_streams_get_gen, #{gen_id => GenId}),
|
||||||
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
|
case generation_get_safe(Shard, GenId) of
|
||||||
[
|
{ok, #{module := Mod, data := GenData}} ->
|
||||||
{GenId, #{
|
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
|
||||||
?tag => ?STREAM,
|
[
|
||||||
?generation => GenId,
|
{GenId, #{
|
||||||
?enc => Stream
|
?tag => ?STREAM,
|
||||||
}}
|
?generation => GenId,
|
||||||
|| Stream <- Streams
|
?enc => Stream
|
||||||
]
|
}}
|
||||||
|
|| Stream <- Streams
|
||||||
|
];
|
||||||
|
{error, not_found} ->
|
||||||
|
%% race condition: generation was dropped before getting its streams?
|
||||||
|
[]
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
Gens
|
Gens
|
||||||
).
|
).
|
||||||
|
@ -203,16 +242,20 @@ get_streams(Shard, TopicFilter, StartTime) ->
|
||||||
make_iterator(
|
make_iterator(
|
||||||
Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
|
Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
|
||||||
) ->
|
) ->
|
||||||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
case generation_get_safe(Shard, GenId) of
|
||||||
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
|
{ok, #{module := Mod, data := GenData}} ->
|
||||||
{ok, Iter} ->
|
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
|
||||||
{ok, #{
|
{ok, Iter} ->
|
||||||
?tag => ?IT,
|
{ok, #{
|
||||||
?generation => GenId,
|
?tag => ?IT,
|
||||||
?enc => Iter
|
?generation => GenId,
|
||||||
}};
|
?enc => Iter
|
||||||
{error, _} = Err ->
|
}};
|
||||||
Err
|
{error, _} = Err ->
|
||||||
|
Err
|
||||||
|
end;
|
||||||
|
{error, not_found} ->
|
||||||
|
{error, end_of_stream}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec update_iterator(
|
-spec update_iterator(
|
||||||
|
@ -224,33 +267,42 @@ update_iterator(
|
||||||
#{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
|
#{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
|
||||||
DSKey
|
DSKey
|
||||||
) ->
|
) ->
|
||||||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
case generation_get_safe(Shard, GenId) of
|
||||||
case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
|
{ok, #{module := Mod, data := GenData}} ->
|
||||||
{ok, Iter} ->
|
case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
|
||||||
{ok, #{
|
{ok, Iter} ->
|
||||||
?tag => ?IT,
|
{ok, #{
|
||||||
?generation => GenId,
|
?tag => ?IT,
|
||||||
?enc => Iter
|
?generation => GenId,
|
||||||
}};
|
?enc => Iter
|
||||||
{error, _} = Err ->
|
}};
|
||||||
Err
|
{error, _} = Err ->
|
||||||
|
Err
|
||||||
|
end;
|
||||||
|
{error, not_found} ->
|
||||||
|
{error, end_of_stream}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec next(shard_id(), iterator(), pos_integer()) ->
|
-spec next(shard_id(), iterator(), pos_integer()) ->
|
||||||
emqx_ds:next_result(iterator()).
|
emqx_ds:next_result(iterator()).
|
||||||
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
|
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
|
||||||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
case generation_get_safe(Shard, GenId) of
|
||||||
Current = generation_current(Shard),
|
{ok, #{module := Mod, data := GenData}} ->
|
||||||
case Mod:next(Shard, GenData, GenIter0, BatchSize) of
|
Current = generation_current(Shard),
|
||||||
{ok, _GenIter, []} when GenId < Current ->
|
case Mod:next(Shard, GenData, GenIter0, BatchSize) of
|
||||||
%% This is a past generation. Storage layer won't write
|
{ok, _GenIter, []} when GenId < Current ->
|
||||||
%% any more messages here. The iterator reached the end:
|
%% This is a past generation. Storage layer won't write
|
||||||
%% the stream has been fully replayed.
|
%% any more messages here. The iterator reached the end:
|
||||||
{ok, end_of_stream};
|
%% the stream has been fully replayed.
|
||||||
{ok, GenIter, Batch} ->
|
{ok, end_of_stream};
|
||||||
{ok, Iter#{?enc := GenIter}, Batch};
|
{ok, GenIter, Batch} ->
|
||||||
Error = {error, _} ->
|
{ok, Iter#{?enc := GenIter}, Batch};
|
||||||
Error
|
Error = {error, _} ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
{error, not_found} ->
|
||||||
|
%% generation was possibly dropped by GC
|
||||||
|
{ok, end_of_stream}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
|
-spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
|
||||||
|
@ -261,6 +313,21 @@ update_config(ShardId, Options) ->
|
||||||
add_generation(ShardId) ->
|
add_generation(ShardId) ->
|
||||||
gen_server:call(?REF(ShardId), add_generation, infinity).
|
gen_server:call(?REF(ShardId), add_generation, infinity).
|
||||||
|
|
||||||
|
-spec list_generations_with_lifetimes(shard_id()) ->
|
||||||
|
#{
|
||||||
|
gen_id() => #{
|
||||||
|
created_at := emqx_ds:time(),
|
||||||
|
since := emqx_ds:time(),
|
||||||
|
until := undefined | emqx_ds:time()
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
list_generations_with_lifetimes(ShardId) ->
|
||||||
|
gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity).
|
||||||
|
|
||||||
|
-spec drop_generation(shard_id(), gen_id()) -> ok.
|
||||||
|
drop_generation(ShardId, GenId) ->
|
||||||
|
gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% gen_server for the shard
|
%% gen_server for the shard
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -322,6 +389,13 @@ handle_call(add_generation, _From, S0) ->
|
||||||
S = add_generation(S0, Since),
|
S = add_generation(S0, Since),
|
||||||
commit_metadata(S),
|
commit_metadata(S),
|
||||||
{reply, ok, S};
|
{reply, ok, S};
|
||||||
|
handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
|
||||||
|
Generations = handle_list_generations_with_lifetimes(S),
|
||||||
|
{reply, Generations, S};
|
||||||
|
handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
|
||||||
|
{Reply, S} = handle_drop_generation(S0, GenId),
|
||||||
|
commit_metadata(S),
|
||||||
|
{reply, Reply, S};
|
||||||
handle_call(#call_create_generation{since = Since}, _From, S0) ->
|
handle_call(#call_create_generation{since = Since}, _From, S0) ->
|
||||||
S = add_generation(S0, Since),
|
S = add_generation(S0, Since),
|
||||||
commit_metadata(S),
|
commit_metadata(S),
|
||||||
|
@ -353,7 +427,7 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
|
||||||
%% Transform generation schemas to generation runtime data:
|
%% Transform generation schemas to generation runtime data:
|
||||||
maps:map(
|
maps:map(
|
||||||
fun
|
fun
|
||||||
({generation, GenId}, GenSchema) ->
|
(?GEN_KEY(GenId), GenSchema) ->
|
||||||
open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
|
open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
|
||||||
(_K, Val) ->
|
(_K, Val) ->
|
||||||
Val
|
Val
|
||||||
|
@ -366,10 +440,40 @@ add_generation(S0, Since) ->
|
||||||
#s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
|
#s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
|
||||||
Schema1 = update_last_until(Schema0, Since),
|
Schema1 = update_last_until(Schema0, Since),
|
||||||
Shard1 = update_last_until(Shard0, Since),
|
Shard1 = update_last_until(Shard0, Since),
|
||||||
|
|
||||||
|
#{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
|
||||||
|
OldKey = ?GEN_KEY(OldGenId),
|
||||||
|
#{OldKey := OldGenSchema} = Schema0,
|
||||||
|
#{cf_refs := OldCFRefs} = OldGenSchema,
|
||||||
|
#{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
|
||||||
|
|
||||||
{GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
|
{GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
|
||||||
|
|
||||||
CFRefs = NewCFRefs ++ CFRefs0,
|
CFRefs = NewCFRefs ++ CFRefs0,
|
||||||
Key = {generation, GenId},
|
Key = ?GEN_KEY(GenId),
|
||||||
Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
|
Generation0 =
|
||||||
|
#{data := NewGenData0} =
|
||||||
|
open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
|
||||||
|
|
||||||
|
%% When the new generation's module is the same as the last one, we might want to
|
||||||
|
%% perform actions like inheriting some of the previous (meta)data.
|
||||||
|
NewGenData =
|
||||||
|
run_post_creation_actions(
|
||||||
|
#{
|
||||||
|
shard_id => ShardId,
|
||||||
|
db => DB,
|
||||||
|
new_gen_id => GenId,
|
||||||
|
old_gen_id => OldGenId,
|
||||||
|
new_cf_refs => NewCFRefs,
|
||||||
|
old_cf_refs => OldCFRefs,
|
||||||
|
new_gen_runtime_data => NewGenData0,
|
||||||
|
old_gen_runtime_data => OldGenData,
|
||||||
|
new_module => CurrentMod,
|
||||||
|
old_module => OldMod
|
||||||
|
}
|
||||||
|
),
|
||||||
|
Generation = Generation0#{data := NewGenData},
|
||||||
|
|
||||||
Shard = Shard1#{current_generation := GenId, Key => Generation},
|
Shard = Shard1#{current_generation := GenId, Key => Generation},
|
||||||
S0#s{
|
S0#s{
|
||||||
cf_refs = CFRefs,
|
cf_refs = CFRefs,
|
||||||
|
@ -377,6 +481,54 @@ add_generation(S0, Since) ->
|
||||||
shard = Shard
|
shard = Shard
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}.
|
||||||
|
handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) ->
|
||||||
|
maps:fold(
|
||||||
|
fun
|
||||||
|
(?GEN_KEY(GenId), GenSchema, Acc) ->
|
||||||
|
Acc#{GenId => export_generation(GenSchema)};
|
||||||
|
(_Key, _Value, Acc) ->
|
||||||
|
Acc
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
ShardSchema
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec export_generation(generation_schema()) -> map().
|
||||||
|
export_generation(GenSchema) ->
|
||||||
|
maps:with([created_at, since, until], GenSchema).
|
||||||
|
|
||||||
|
-spec handle_drop_generation(server_state(), gen_id()) ->
|
||||||
|
{ok | {error, current_generation}, server_state()}.
|
||||||
|
handle_drop_generation(#s{schema = #{current_generation := GenId}} = S0, GenId) ->
|
||||||
|
{{error, current_generation}, S0};
|
||||||
|
handle_drop_generation(#s{schema = Schema} = S0, GenId) when
|
||||||
|
not is_map_key(?GEN_KEY(GenId), Schema)
|
||||||
|
->
|
||||||
|
{{error, not_found}, S0};
|
||||||
|
handle_drop_generation(S0, GenId) ->
|
||||||
|
#s{
|
||||||
|
shard_id = ShardId,
|
||||||
|
db = DB,
|
||||||
|
schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema,
|
||||||
|
shard = OldShard,
|
||||||
|
cf_refs = OldCFRefs
|
||||||
|
} = S0,
|
||||||
|
#{module := Mod, cf_refs := GenCFRefs} = GenSchema,
|
||||||
|
#{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
|
||||||
|
case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of
|
||||||
|
ok ->
|
||||||
|
CFRefs = OldCFRefs -- GenCFRefs,
|
||||||
|
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
|
||||||
|
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
|
||||||
|
S = S0#s{
|
||||||
|
cf_refs = CFRefs,
|
||||||
|
shard = Shard,
|
||||||
|
schema = Schema
|
||||||
|
},
|
||||||
|
{ok, S}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
|
-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
|
||||||
generation().
|
generation().
|
||||||
open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
|
open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
|
||||||
|
@ -403,10 +555,17 @@ new_generation(ShardId, DB, Schema0, Since) ->
|
||||||
#{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
|
#{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
|
||||||
GenId = PrevGenId + 1,
|
GenId = PrevGenId + 1,
|
||||||
{GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
|
{GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
|
||||||
GenSchema = #{module => Mod, data => GenData, since => Since, until => undefined},
|
GenSchema = #{
|
||||||
|
module => Mod,
|
||||||
|
data => GenData,
|
||||||
|
cf_refs => NewCFRefs,
|
||||||
|
created_at => emqx_message:timestamp_now(),
|
||||||
|
since => Since,
|
||||||
|
until => undefined
|
||||||
|
},
|
||||||
Schema = Schema0#{
|
Schema = Schema0#{
|
||||||
current_generation => GenId,
|
current_generation => GenId,
|
||||||
{generation, GenId} => GenSchema
|
?GEN_KEY(GenId) => GenSchema
|
||||||
},
|
},
|
||||||
{GenId, Schema, NewCFRefs}.
|
{GenId, Schema, NewCFRefs}.
|
||||||
|
|
||||||
|
@ -453,9 +612,26 @@ db_dir({DB, ShardId}) ->
|
||||||
-spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
|
-spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
|
||||||
update_last_until(Schema, Until) ->
|
update_last_until(Schema, Until) ->
|
||||||
#{current_generation := GenId} = Schema,
|
#{current_generation := GenId} = Schema,
|
||||||
GenData0 = maps:get({generation, GenId}, Schema),
|
GenData0 = maps:get(?GEN_KEY(GenId), Schema),
|
||||||
GenData = GenData0#{until := Until},
|
GenData = GenData0#{until := Until},
|
||||||
Schema#{{generation, GenId} := GenData}.
|
Schema#{?GEN_KEY(GenId) := GenData}.
|
||||||
|
|
||||||
|
run_post_creation_actions(
|
||||||
|
#{
|
||||||
|
new_module := Mod,
|
||||||
|
old_module := Mod,
|
||||||
|
new_gen_runtime_data := NewGenData
|
||||||
|
} = Context
|
||||||
|
) ->
|
||||||
|
case erlang:function_exported(Mod, post_creation_actions, 1) of
|
||||||
|
true ->
|
||||||
|
Mod:post_creation_actions(Context);
|
||||||
|
false ->
|
||||||
|
NewGenData
|
||||||
|
end;
|
||||||
|
run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
|
||||||
|
%% Different implementation modules
|
||||||
|
NewGenData.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
%% Schema access
|
%% Schema access
|
||||||
|
@ -468,15 +644,24 @@ generation_current(Shard) ->
|
||||||
|
|
||||||
-spec generation_get(shard_id(), gen_id()) -> generation().
|
-spec generation_get(shard_id(), gen_id()) -> generation().
|
||||||
generation_get(Shard, GenId) ->
|
generation_get(Shard, GenId) ->
|
||||||
#{{generation, GenId} := GenData} = get_schema_runtime(Shard),
|
{ok, GenData} = generation_get_safe(Shard, GenId),
|
||||||
GenData.
|
GenData.
|
||||||
|
|
||||||
|
-spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}.
|
||||||
|
generation_get_safe(Shard, GenId) ->
|
||||||
|
case get_schema_runtime(Shard) of
|
||||||
|
#{?GEN_KEY(GenId) := GenData} ->
|
||||||
|
{ok, GenData};
|
||||||
|
#{} ->
|
||||||
|
{error, not_found}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
|
-spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
|
||||||
generations_since(Shard, Since) ->
|
generations_since(Shard, Since) ->
|
||||||
Schema = get_schema_runtime(Shard),
|
Schema = get_schema_runtime(Shard),
|
||||||
maps:fold(
|
maps:fold(
|
||||||
fun
|
fun
|
||||||
({generation, GenId}, #{until := Until}, Acc) when Until >= Since ->
|
(?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since ->
|
||||||
[GenId | Acc];
|
[GenId | Acc];
|
||||||
(_K, _V, Acc) ->
|
(_K, _V, Acc) ->
|
||||||
Acc
|
Acc
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
-export([
|
-export([
|
||||||
create/4,
|
create/4,
|
||||||
open/5,
|
open/5,
|
||||||
|
drop/5,
|
||||||
store_batch/4,
|
store_batch/4,
|
||||||
get_streams/4,
|
get_streams/4,
|
||||||
make_iterator/5,
|
make_iterator/5,
|
||||||
|
@ -85,6 +86,10 @@ open(_Shard, DBHandle, GenId, CFRefs, #schema{}) ->
|
||||||
{_, CF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
|
{_, CF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
|
||||||
#s{db = DBHandle, cf = CF}.
|
#s{db = DBHandle, cf = CF}.
|
||||||
|
|
||||||
|
drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
|
||||||
|
ok = rocksdb:drop_column_family(DBHandle, CFHandle),
|
||||||
|
ok.
|
||||||
|
|
||||||
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
|
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Msg) ->
|
fun(Msg) ->
|
||||||
|
@ -142,7 +147,8 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
|
||||||
case rocksdb:iterator_move(IT, Action) of
|
case rocksdb:iterator_move(IT, Action) of
|
||||||
{ok, Key, Blob} ->
|
{ok, Key, Blob} ->
|
||||||
Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
|
Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
|
||||||
case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of
|
TopicWords = emqx_topic:words(Topic),
|
||||||
|
case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
|
||||||
true ->
|
true ->
|
||||||
do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]);
|
do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]);
|
||||||
false ->
|
false ->
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ds_proto_v3).
|
||||||
|
|
||||||
|
-behavior(emqx_bpapi).
|
||||||
|
|
||||||
|
-include_lib("emqx_utils/include/bpapi.hrl").
|
||||||
|
%% API:
|
||||||
|
-export([
|
||||||
|
drop_db/2,
|
||||||
|
store_batch/5,
|
||||||
|
get_streams/5,
|
||||||
|
make_iterator/6,
|
||||||
|
next/5,
|
||||||
|
update_iterator/5,
|
||||||
|
add_generation/2,
|
||||||
|
|
||||||
|
%% introduced in v3
|
||||||
|
list_generations_with_lifetimes/3,
|
||||||
|
drop_generation/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% behavior callbacks:
|
||||||
|
-export([introduced_in/0]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API funcions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec drop_db([node()], emqx_ds:db()) ->
|
||||||
|
[{ok, ok} | {error, _}].
|
||||||
|
drop_db(Node, DB) ->
|
||||||
|
erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
|
||||||
|
|
||||||
|
-spec get_streams(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds:topic_filter(),
|
||||||
|
emqx_ds:time()
|
||||||
|
) ->
|
||||||
|
[{integer(), emqx_ds_storage_layer:stream()}].
|
||||||
|
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
|
||||||
|
|
||||||
|
-spec make_iterator(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:stream(),
|
||||||
|
emqx_ds:topic_filter(),
|
||||||
|
emqx_ds:time()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||||
|
make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
|
||||||
|
DB, Shard, Stream, TopicFilter, StartTime
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec next(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:iterator(),
|
||||||
|
pos_integer()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
|
||||||
|
| {ok, end_of_stream}
|
||||||
|
| {error, _}.
|
||||||
|
next(Node, DB, Shard, Iter, BatchSize) ->
|
||||||
|
emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
|
||||||
|
|
||||||
|
-spec store_batch(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_replication_layer:batch(),
|
||||||
|
emqx_ds:message_store_opts()
|
||||||
|
) ->
|
||||||
|
emqx_ds:store_batch_result().
|
||||||
|
store_batch(Node, DB, Shard, Batch, Options) ->
|
||||||
|
emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
|
||||||
|
DB, Shard, Batch, Options
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec update_iterator(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:iterator(),
|
||||||
|
emqx_ds:message_key()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||||
|
update_iterator(Node, DB, Shard, OldIter, DSKey) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
|
||||||
|
DB, Shard, OldIter, DSKey
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec add_generation([node()], emqx_ds:db()) ->
|
||||||
|
[{ok, ok} | {error, _}].
|
||||||
|
add_generation(Node, DB) ->
|
||||||
|
erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------------------
|
||||||
|
%% Introduced in V3
|
||||||
|
%%--------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec list_generations_with_lifetimes(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id()
|
||||||
|
) ->
|
||||||
|
#{
|
||||||
|
emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()
|
||||||
|
}.
|
||||||
|
list_generations_with_lifetimes(Node, DB, Shard) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_list_generations_with_lifetimes_v3, [DB, Shard]).
|
||||||
|
|
||||||
|
-spec drop_generation(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:gen_id()
|
||||||
|
) ->
|
||||||
|
ok | {error, _}.
|
||||||
|
drop_generation(Node, DB, Shard, GenId) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% behavior callbacks
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.6.0".
|
|
@ -155,7 +155,7 @@ t_05_update_iterator(_Config) ->
|
||||||
?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
|
?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_05_update_config(_Config) ->
|
t_06_update_config(_Config) ->
|
||||||
DB = ?FUNCTION_NAME,
|
DB = ?FUNCTION_NAME,
|
||||||
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
TopicFilter = ['#'],
|
TopicFilter = ['#'],
|
||||||
|
@ -199,7 +199,7 @@ t_05_update_config(_Config) ->
|
||||||
end,
|
end,
|
||||||
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
|
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
|
||||||
|
|
||||||
t_06_add_generation(_Config) ->
|
t_07_add_generation(_Config) ->
|
||||||
DB = ?FUNCTION_NAME,
|
DB = ?FUNCTION_NAME,
|
||||||
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
TopicFilter = ['#'],
|
TopicFilter = ['#'],
|
||||||
|
@ -243,6 +243,250 @@ t_06_add_generation(_Config) ->
|
||||||
end,
|
end,
|
||||||
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
|
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
|
||||||
|
|
||||||
|
%% Verifies the basic usage of `list_generations_with_lifetimes' and `drop_generation'...
|
||||||
|
%% 1) Cannot drop current generation.
|
||||||
|
%% 2) All existing generations are returned by `list_generation_with_lifetimes'.
|
||||||
|
%% 3) Dropping a generation removes it from the list.
|
||||||
|
%% 4) Dropped generations stay dropped even after restarting the application.
|
||||||
|
t_08_smoke_list_drop_generation(_Config) ->
|
||||||
|
DB = ?FUNCTION_NAME,
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
%% Exactly one generation at first.
|
||||||
|
Generations0 = emqx_ds:list_generations_with_lifetimes(DB),
|
||||||
|
?assertMatch(
|
||||||
|
[{_GenId, #{since := _, until := _}}],
|
||||||
|
maps:to_list(Generations0),
|
||||||
|
#{gens => Generations0}
|
||||||
|
),
|
||||||
|
[{GenId0, _}] = maps:to_list(Generations0),
|
||||||
|
%% Cannot delete current generation
|
||||||
|
?assertEqual({error, current_generation}, emqx_ds:drop_generation(DB, GenId0)),
|
||||||
|
|
||||||
|
%% New gen
|
||||||
|
ok = emqx_ds:add_generation(DB),
|
||||||
|
Generations1 = emqx_ds:list_generations_with_lifetimes(DB),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
{GenId0, #{since := _, until := _}},
|
||||||
|
{_GenId1, #{since := _, until := _}}
|
||||||
|
],
|
||||||
|
lists:sort(maps:to_list(Generations1)),
|
||||||
|
#{gens => Generations1}
|
||||||
|
),
|
||||||
|
[GenId0, GenId1] = lists:sort(maps:keys(Generations1)),
|
||||||
|
|
||||||
|
%% Drop the older one
|
||||||
|
?assertEqual(ok, emqx_ds:drop_generation(DB, GenId0)),
|
||||||
|
Generations2 = emqx_ds:list_generations_with_lifetimes(DB),
|
||||||
|
?assertMatch(
|
||||||
|
[{GenId1, #{since := _, until := _}}],
|
||||||
|
lists:sort(maps:to_list(Generations2)),
|
||||||
|
#{gens => Generations2}
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Unknown gen_id, as it was already dropped
|
||||||
|
?assertEqual({error, not_found}, emqx_ds:drop_generation(DB, GenId0)),
|
||||||
|
|
||||||
|
%% Should persist surviving generation list
|
||||||
|
ok = application:stop(emqx_durable_storage),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_durable_storage),
|
||||||
|
ok = emqx_ds:open_db(DB, opts()),
|
||||||
|
|
||||||
|
Generations3 = emqx_ds:list_generations_with_lifetimes(DB),
|
||||||
|
?assertMatch(
|
||||||
|
[{GenId1, #{since := _, until := _}}],
|
||||||
|
lists:sort(maps:to_list(Generations3)),
|
||||||
|
#{gens => Generations3}
|
||||||
|
),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_drop_generation_with_never_used_iterator(_Config) ->
|
||||||
|
%% This test checks how the iterator behaves when:
|
||||||
|
%% 1) it's created at generation 1 and not consumed from.
|
||||||
|
%% 2) generation 2 is created and 1 dropped.
|
||||||
|
%% 3) iteration begins.
|
||||||
|
%% In this case, the iterator won't see any messages and the stream will end.
|
||||||
|
|
||||||
|
DB = ?FUNCTION_NAME,
|
||||||
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
|
||||||
|
|
||||||
|
TopicFilter = emqx_topic:words(<<"foo/+">>),
|
||||||
|
StartTime = 0,
|
||||||
|
Msgs0 = [
|
||||||
|
message(<<"foo/bar">>, <<"1">>, 0),
|
||||||
|
message(<<"foo/baz">>, <<"2">>, 1)
|
||||||
|
],
|
||||||
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
|
||||||
|
|
||||||
|
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
||||||
|
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
|
||||||
|
|
||||||
|
ok = emqx_ds:add_generation(DB),
|
||||||
|
ok = emqx_ds:drop_generation(DB, GenId0),
|
||||||
|
|
||||||
|
Now = emqx_message:timestamp_now(),
|
||||||
|
Msgs1 = [
|
||||||
|
message(<<"foo/bar">>, <<"3">>, Now + 100),
|
||||||
|
message(<<"foo/baz">>, <<"4">>, Now + 101)
|
||||||
|
],
|
||||||
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
|
||||||
|
|
||||||
|
?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter0, 1)),
|
||||||
|
|
||||||
|
%% New iterator for the new stream will only see the later messages.
|
||||||
|
[{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
||||||
|
?assertNotEqual(Stream0, Stream1),
|
||||||
|
{ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime),
|
||||||
|
|
||||||
|
{ok, Iter, Batch} = iterate(DB, Iter1, 1),
|
||||||
|
?assertNotEqual(end_of_stream, Iter),
|
||||||
|
?assertEqual(Msgs1, [Msg || {_Key, Msg} <- Batch]),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_drop_generation_with_used_once_iterator(_Config) ->
|
||||||
|
%% This test checks how the iterator behaves when:
|
||||||
|
%% 1) it's created at generation 1 and consumes at least 1 message.
|
||||||
|
%% 2) generation 2 is created and 1 dropped.
|
||||||
|
%% 3) iteration continues.
|
||||||
|
%% In this case, the iterator should see no more messages and the stream will end.
|
||||||
|
|
||||||
|
DB = ?FUNCTION_NAME,
|
||||||
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
|
||||||
|
|
||||||
|
TopicFilter = emqx_topic:words(<<"foo/+">>),
|
||||||
|
StartTime = 0,
|
||||||
|
Msgs0 =
|
||||||
|
[Msg0 | _] = [
|
||||||
|
message(<<"foo/bar">>, <<"1">>, 0),
|
||||||
|
message(<<"foo/baz">>, <<"2">>, 1)
|
||||||
|
],
|
||||||
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
|
||||||
|
|
||||||
|
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
||||||
|
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
|
||||||
|
{ok, Iter1, Batch1} = emqx_ds:next(DB, Iter0, 1),
|
||||||
|
?assertNotEqual(end_of_stream, Iter1),
|
||||||
|
?assertEqual([Msg0], [Msg || {_Key, Msg} <- Batch1]),
|
||||||
|
|
||||||
|
ok = emqx_ds:add_generation(DB),
|
||||||
|
ok = emqx_ds:drop_generation(DB, GenId0),
|
||||||
|
|
||||||
|
Now = emqx_message:timestamp_now(),
|
||||||
|
Msgs1 = [
|
||||||
|
message(<<"foo/bar">>, <<"3">>, Now + 100),
|
||||||
|
message(<<"foo/baz">>, <<"4">>, Now + 101)
|
||||||
|
],
|
||||||
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
|
||||||
|
|
||||||
|
?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter1, 1)),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_drop_generation_update_iterator(_Config) ->
|
||||||
|
%% This checks the behavior of `emqx_ds:update_iterator' after the generation
|
||||||
|
%% underlying the iterator has been dropped.
|
||||||
|
|
||||||
|
DB = ?FUNCTION_NAME,
|
||||||
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
|
||||||
|
|
||||||
|
TopicFilter = emqx_topic:words(<<"foo/+">>),
|
||||||
|
StartTime = 0,
|
||||||
|
Msgs0 = [
|
||||||
|
message(<<"foo/bar">>, <<"1">>, 0),
|
||||||
|
message(<<"foo/baz">>, <<"2">>, 1)
|
||||||
|
],
|
||||||
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
|
||||||
|
|
||||||
|
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
||||||
|
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
|
||||||
|
{ok, Iter1, _Batch1} = emqx_ds:next(DB, Iter0, 1),
|
||||||
|
{ok, _Iter2, [{Key2, _Msg}]} = emqx_ds:next(DB, Iter1, 1),
|
||||||
|
|
||||||
|
ok = emqx_ds:add_generation(DB),
|
||||||
|
ok = emqx_ds:drop_generation(DB, GenId0),
|
||||||
|
|
||||||
|
?assertEqual({error, end_of_stream}, emqx_ds:update_iterator(DB, Iter1, Key2)),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_make_iterator_stale_stream(_Config) ->
|
||||||
|
%% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying
|
||||||
|
%% the stream has been dropped.
|
||||||
|
|
||||||
|
DB = ?FUNCTION_NAME,
|
||||||
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
|
||||||
|
|
||||||
|
TopicFilter = emqx_topic:words(<<"foo/+">>),
|
||||||
|
StartTime = 0,
|
||||||
|
Msgs0 = [
|
||||||
|
message(<<"foo/bar">>, <<"1">>, 0),
|
||||||
|
message(<<"foo/baz">>, <<"2">>, 1)
|
||||||
|
],
|
||||||
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
|
||||||
|
|
||||||
|
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
||||||
|
|
||||||
|
ok = emqx_ds:add_generation(DB),
|
||||||
|
ok = emqx_ds:drop_generation(DB, GenId0),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
{error, end_of_stream},
|
||||||
|
emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_get_streams_concurrently_with_drop_generation(_Config) ->
|
||||||
|
%% This checks that we can get all streams while a generation is dropped
|
||||||
|
%% mid-iteration.
|
||||||
|
|
||||||
|
DB = ?FUNCTION_NAME,
|
||||||
|
?check_trace(
|
||||||
|
#{timetrap => 5_000},
|
||||||
|
begin
|
||||||
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
|
||||||
|
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
|
||||||
|
ok = emqx_ds:add_generation(DB),
|
||||||
|
ok = emqx_ds:add_generation(DB),
|
||||||
|
|
||||||
|
%% All streams
|
||||||
|
TopicFilter = emqx_topic:words(<<"foo/+">>),
|
||||||
|
StartTime = 0,
|
||||||
|
?assertMatch([_, _, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
|
||||||
|
|
||||||
|
?force_ordering(
|
||||||
|
#{?snk_kind := dropped_gen},
|
||||||
|
#{?snk_kind := get_streams_get_gen}
|
||||||
|
),
|
||||||
|
|
||||||
|
spawn_link(fun() ->
|
||||||
|
{ok, _} = ?block_until(#{?snk_kind := get_streams_all_gens}),
|
||||||
|
ok = emqx_ds:drop_generation(DB, GenId0),
|
||||||
|
?tp(dropped_gen, #{})
|
||||||
|
end),
|
||||||
|
|
||||||
|
?assertMatch([_, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
update_data_set() ->
|
update_data_set() ->
|
||||||
[
|
[
|
||||||
[
|
[
|
||||||
|
@ -295,7 +539,7 @@ iterate(DB, It0, BatchSize, Acc) ->
|
||||||
{ok, It, Msgs} ->
|
{ok, It, Msgs} ->
|
||||||
iterate(DB, It, BatchSize, Acc ++ Msgs);
|
iterate(DB, It, BatchSize, Acc ++ Msgs);
|
||||||
{ok, end_of_stream} ->
|
{ok, end_of_stream} ->
|
||||||
{ok, It0, Acc};
|
{ok, end_of_stream, Acc};
|
||||||
Ret ->
|
Ret ->
|
||||||
Ret
|
Ret
|
||||||
end.
|
end.
|
||||||
|
|
Loading…
Reference in New Issue