Merge pull request #12475 from ieQu1/dev/lean-stream
Use a more compact data structure to represent streams
This commit is contained in:
commit
c7888ad1f1
|
@ -23,6 +23,7 @@
|
||||||
{emqx_ds,1}.
|
{emqx_ds,1}.
|
||||||
{emqx_ds,2}.
|
{emqx_ds,2}.
|
||||||
{emqx_ds,3}.
|
{emqx_ds,3}.
|
||||||
|
{emqx_ds,4}.
|
||||||
{emqx_eviction_agent,1}.
|
{emqx_eviction_agent,1}.
|
||||||
{emqx_eviction_agent,2}.
|
{emqx_eviction_agent,2}.
|
||||||
{emqx_exhook,1}.
|
{emqx_exhook,1}.
|
||||||
|
|
|
@ -169,7 +169,7 @@ del_subscription(SubId, S0) ->
|
||||||
|
|
||||||
ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
|
ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
|
||||||
%% TODO: hash collisions
|
%% TODO: hash collisions
|
||||||
Key = {SubId, erlang:phash2(Stream)},
|
Key = {SubId, Stream},
|
||||||
case emqx_persistent_session_ds_state:get_stream(Key, S) of
|
case emqx_persistent_session_ds_state:get_stream(Key, S) of
|
||||||
undefined ->
|
undefined ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
|
|
|
@ -43,7 +43,9 @@
|
||||||
do_drop_db_v1/1,
|
do_drop_db_v1/1,
|
||||||
do_store_batch_v1/4,
|
do_store_batch_v1/4,
|
||||||
do_get_streams_v1/4,
|
do_get_streams_v1/4,
|
||||||
|
do_get_streams_v2/4,
|
||||||
do_make_iterator_v1/5,
|
do_make_iterator_v1/5,
|
||||||
|
do_make_iterator_v2/5,
|
||||||
do_update_iterator_v2/4,
|
do_update_iterator_v2/4,
|
||||||
do_next_v1/4,
|
do_next_v1/4,
|
||||||
do_add_generation_v2/1,
|
do_add_generation_v2/1,
|
||||||
|
@ -51,7 +53,9 @@
|
||||||
do_drop_generation_v3/3
|
do_drop_generation_v3/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
|
-export_type([
|
||||||
|
shard_id/0, builtin_db_opts/0, stream_v1/0, stream/0, iterator/0, message_id/0, batch/0
|
||||||
|
]).
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||||
-include("emqx_ds_replication_layer.hrl").
|
-include("emqx_ds_replication_layer.hrl").
|
||||||
|
@ -72,17 +76,19 @@
|
||||||
|
|
||||||
%% This enapsulates the stream entity from the replication level.
|
%% This enapsulates the stream entity from the replication level.
|
||||||
%%
|
%%
|
||||||
%% TODO: currently the stream is hardwired to only support the
|
%% TODO: this type is obsolete and is kept only for compatibility with
|
||||||
%% internal rocksdb storage. In the future we want to add another
|
%% v3 BPAPI. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6)
|
||||||
%% implementations for emqx_ds, so this type has to take this into
|
-opaque stream_v1() ::
|
||||||
%% account.
|
|
||||||
-opaque stream() ::
|
|
||||||
#{
|
#{
|
||||||
?tag := ?STREAM,
|
?tag := ?STREAM,
|
||||||
?shard := emqx_ds_replication_layer:shard_id(),
|
?shard := emqx_ds_replication_layer:shard_id(),
|
||||||
?enc := emqx_ds_storage_layer:stream()
|
?enc := emqx_ds_storage_layer:stream_v1()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-define(stream_v2(SHARD, INNER), [2, SHARD | INNER]).
|
||||||
|
|
||||||
|
-opaque stream() :: nonempty_maybe_improper_list().
|
||||||
|
|
||||||
-opaque iterator() ::
|
-opaque iterator() ::
|
||||||
#{
|
#{
|
||||||
?tag := ?IT,
|
?tag := ?IT,
|
||||||
|
@ -121,7 +127,7 @@ open_db(DB, CreateOpts) ->
|
||||||
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
|
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
|
||||||
add_generation(DB) ->
|
add_generation(DB) ->
|
||||||
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
|
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
|
||||||
_ = emqx_ds_proto_v3:add_generation(Nodes, DB),
|
_ = emqx_ds_proto_v4:add_generation(Nodes, DB),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
||||||
|
@ -140,7 +146,7 @@ list_generations_with_lifetimes(DB) ->
|
||||||
AccInner#{{Shard, GenId} => Data}
|
AccInner#{{Shard, GenId} => Data}
|
||||||
end,
|
end,
|
||||||
GensAcc,
|
GensAcc,
|
||||||
emqx_ds_proto_v3:list_generations_with_lifetimes(Node, DB, Shard)
|
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
|
@ -152,12 +158,12 @@ drop_generation(DB, {Shard, GenId}) ->
|
||||||
%% TODO: drop generation in all nodes in the replica set, not only in the leader,
|
%% TODO: drop generation in all nodes in the replica set, not only in the leader,
|
||||||
%% after we have proper replication in place.
|
%% after we have proper replication in place.
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
emqx_ds_proto_v3:drop_generation(Node, DB, Shard, GenId).
|
emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId).
|
||||||
|
|
||||||
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
||||||
drop_db(DB) ->
|
drop_db(DB) ->
|
||||||
Nodes = list_nodes(),
|
Nodes = list_nodes(),
|
||||||
_ = emqx_ds_proto_v3:drop_db(Nodes, DB),
|
_ = emqx_ds_proto_v4:drop_db(Nodes, DB),
|
||||||
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
||||||
emqx_ds_builtin_sup:stop_db(DB),
|
emqx_ds_builtin_sup:stop_db(DB),
|
||||||
ok.
|
ok.
|
||||||
|
@ -174,16 +180,12 @@ get_streams(DB, TopicFilter, StartTime) ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Shard) ->
|
fun(Shard) ->
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime),
|
Streams = emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime),
|
||||||
lists:map(
|
lists:map(
|
||||||
fun({RankY, Stream}) ->
|
fun({RankY, StorageLayerStream}) ->
|
||||||
RankX = Shard,
|
RankX = Shard,
|
||||||
Rank = {RankX, RankY},
|
Rank = {RankX, RankY},
|
||||||
{Rank, #{
|
{Rank, ?stream_v2(Shard, StorageLayerStream)}
|
||||||
?tag => ?STREAM,
|
|
||||||
?shard => Shard,
|
|
||||||
?enc => Stream
|
|
||||||
}}
|
|
||||||
end,
|
end,
|
||||||
Streams
|
Streams
|
||||||
)
|
)
|
||||||
|
@ -194,9 +196,9 @@ get_streams(DB, TopicFilter, StartTime) ->
|
||||||
-spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
-spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||||
emqx_ds:make_iterator_result(iterator()).
|
emqx_ds:make_iterator_result(iterator()).
|
||||||
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
||||||
#{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
|
?stream_v2(Shard, StorageStream) = Stream,
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
|
case emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
||||||
Err = {error, _} ->
|
Err = {error, _} ->
|
||||||
|
@ -213,7 +215,7 @@ update_iterator(DB, OldIter, DSKey) ->
|
||||||
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
case
|
case
|
||||||
emqx_ds_proto_v3:update_iterator(
|
emqx_ds_proto_v4:update_iterator(
|
||||||
Node,
|
Node,
|
||||||
DB,
|
DB,
|
||||||
Shard,
|
Shard,
|
||||||
|
@ -239,7 +241,7 @@ next(DB, Iter0, BatchSize) ->
|
||||||
%%
|
%%
|
||||||
%% This kind of trickery should be probably done here in the
|
%% This kind of trickery should be probably done here in the
|
||||||
%% replication layer. Or, perhaps, in the logic layer.
|
%% replication layer. Or, perhaps, in the logic layer.
|
||||||
case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of
|
case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of
|
||||||
{ok, StorageIter, Batch} ->
|
{ok, StorageIter, Batch} ->
|
||||||
Iter = Iter0#{?enc := StorageIter},
|
Iter = Iter0#{?enc := StorageIter},
|
||||||
{ok, Iter, Batch};
|
{ok, Iter, Batch};
|
||||||
|
@ -311,14 +313,35 @@ do_drop_db_v1(DB) ->
|
||||||
do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
|
do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
|
||||||
emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
|
emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
|
||||||
|
|
||||||
|
%% Remove me in EMQX 5.6
|
||||||
|
-dialyzer({nowarn_function, do_get_streams_v1/4}).
|
||||||
-spec do_get_streams_v1(
|
-spec do_get_streams_v1(
|
||||||
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
|
) ->
|
||||||
|
[{integer(), emqx_ds_storage_layer:stream_v1()}].
|
||||||
|
do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
|
||||||
|
error(obsolete_api).
|
||||||
|
|
||||||
|
-spec do_get_streams_v2(
|
||||||
|
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
[{integer(), emqx_ds_storage_layer:stream()}].
|
||||||
do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
|
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
|
||||||
emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
|
emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, do_make_iterator_v1/5}).
|
||||||
-spec do_make_iterator_v1(
|
-spec do_make_iterator_v1(
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:stream_v1(),
|
||||||
|
emqx_ds:topic_filter(),
|
||||||
|
emqx_ds:time()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||||
|
do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
|
||||||
|
error(obsolete_api).
|
||||||
|
|
||||||
|
-spec do_make_iterator_v2(
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
emqx_ds_storage_layer:stream(),
|
emqx_ds_storage_layer:stream(),
|
||||||
|
@ -326,7 +349,7 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||||
do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) ->
|
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
|
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
|
||||||
|
|
||||||
-spec do_update_iterator_v2(
|
-spec do_update_iterator_v2(
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -89,11 +89,7 @@
|
||||||
|
|
||||||
-type s() :: #s{}.
|
-type s() :: #s{}.
|
||||||
|
|
||||||
-type stream() ::
|
-type stream() :: emqx_ds_lts:msg_storage_key().
|
||||||
#{
|
|
||||||
?tag := ?STREAM,
|
|
||||||
?storage_key := emqx_ds_lts:msg_storage_key()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type iterator() ::
|
-type iterator() ::
|
||||||
#{
|
#{
|
||||||
|
@ -251,8 +247,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) -> [stream()].
|
) -> [stream()].
|
||||||
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
|
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
|
||||||
Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
|
emqx_ds_lts:match_topics(Trie, TopicFilter).
|
||||||
[#{?tag => ?STREAM, ?storage_key => I} || I <- Indexes].
|
|
||||||
|
|
||||||
-spec make_iterator(
|
-spec make_iterator(
|
||||||
emqx_ds_storage_layer:shard_id(),
|
emqx_ds_storage_layer:shard_id(),
|
||||||
|
@ -262,7 +257,7 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) -> {ok, iterator()}.
|
) -> {ok, iterator()}.
|
||||||
make_iterator(
|
make_iterator(
|
||||||
_Shard, _Data, #{?tag := ?STREAM, ?storage_key := StorageKey}, TopicFilter, StartTime
|
_Shard, _Data, StorageKey, TopicFilter, StartTime
|
||||||
) ->
|
) ->
|
||||||
%% Note: it's a good idea to keep the iterator structure lean,
|
%% Note: it's a good idea to keep the iterator structure lean,
|
||||||
%% since it can be stored on a remote node that could update its
|
%% since it can be stored on a remote node that could update its
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
generation/0,
|
generation/0,
|
||||||
cf_refs/0,
|
cf_refs/0,
|
||||||
stream/0,
|
stream/0,
|
||||||
|
stream_v1/0,
|
||||||
iterator/0,
|
iterator/0,
|
||||||
shard_id/0,
|
shard_id/0,
|
||||||
options/0,
|
options/0,
|
||||||
|
@ -54,6 +55,8 @@
|
||||||
|
|
||||||
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
|
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
|
||||||
|
|
||||||
|
-define(stream_v2(GENERATION, INNER), [GENERATION | INNER]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -80,14 +83,17 @@
|
||||||
|
|
||||||
-type gen_id() :: 0..16#ffff.
|
-type gen_id() :: 0..16#ffff.
|
||||||
|
|
||||||
%% Note: this might be stored permanently on a remote node.
|
%% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6
|
||||||
-opaque stream() ::
|
-opaque stream_v1() ::
|
||||||
#{
|
#{
|
||||||
?tag := ?STREAM,
|
?tag := ?STREAM,
|
||||||
?generation := gen_id(),
|
?generation := gen_id(),
|
||||||
?enc := term()
|
?enc := term()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% Note: this might be stored permanently on a remote node.
|
||||||
|
-opaque stream() :: nonempty_maybe_improper_list(gen_id(), term()).
|
||||||
|
|
||||||
%% Note: this might be stred permanently on a remote node.
|
%% Note: this might be stred permanently on a remote node.
|
||||||
-opaque iterator() ::
|
-opaque iterator() ::
|
||||||
#{
|
#{
|
||||||
|
@ -221,12 +227,8 @@ get_streams(Shard, TopicFilter, StartTime) ->
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
{ok, #{module := Mod, data := GenData}} ->
|
||||||
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
|
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
|
||||||
[
|
[
|
||||||
{GenId, #{
|
{GenId, ?stream_v2(GenId, InnerStream)}
|
||||||
?tag => ?STREAM,
|
|| InnerStream <- Streams
|
||||||
?generation => GenId,
|
|
||||||
?enc => Stream
|
|
||||||
}}
|
|
||||||
|| Stream <- Streams
|
|
||||||
];
|
];
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
%% race condition: generation was dropped before getting its streams?
|
%% race condition: generation was dropped before getting its streams?
|
||||||
|
@ -239,7 +241,7 @@ get_streams(Shard, TopicFilter, StartTime) ->
|
||||||
-spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
-spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||||
emqx_ds:make_iterator_result(iterator()).
|
emqx_ds:make_iterator_result(iterator()).
|
||||||
make_iterator(
|
make_iterator(
|
||||||
Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
|
Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime
|
||||||
) ->
|
) ->
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get_safe(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
{ok, #{module := Mod, data := GenData}} ->
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -28,7 +28,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([introduced_in/0]).
|
-export([introduced_in/0, deprecated_since/0]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
|
@ -45,7 +45,7 @@ drop_db(Node, DB) ->
|
||||||
emqx_ds:topic_filter(),
|
emqx_ds:topic_filter(),
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
[{integer(), emqx_ds_storage_layer:stream_v1()}].
|
||||||
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
|
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
node(),
|
node(),
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
emqx_ds_storage_layer:stream(),
|
emqx_ds_storage_layer:stream_v1(),
|
||||||
emqx_ds:topic_filter(),
|
emqx_ds:topic_filter(),
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
|
@ -95,3 +95,6 @@ store_batch(Node, DB, Shard, Batch, Options) ->
|
||||||
|
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
"5.4.0".
|
"5.4.0".
|
||||||
|
|
||||||
|
deprecated_since() ->
|
||||||
|
"5.5.0".
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -32,7 +32,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([introduced_in/0]).
|
-export([introduced_in/0, deprecated_since/0]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
|
@ -50,7 +50,7 @@ drop_db(Node, DB) ->
|
||||||
emqx_ds:topic_filter(),
|
emqx_ds:topic_filter(),
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
[{integer(), emqx_ds_storage_layer:stream_v1()}].
|
||||||
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
|
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
node(),
|
node(),
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
emqx_ds_storage_layer:stream(),
|
emqx_ds_storage_layer:stream_v1(),
|
||||||
emqx_ds:topic_filter(),
|
emqx_ds:topic_filter(),
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
|
@ -122,3 +122,6 @@ add_generation(Node, DB) ->
|
||||||
|
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
"5.5.0".
|
"5.5.0".
|
||||||
|
|
||||||
|
deprecated_since() ->
|
||||||
|
"5.5.0".
|
||||||
|
|
|
@ -34,7 +34,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([introduced_in/0]).
|
-export([introduced_in/0, deprecated_since/0]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
|
@ -52,7 +52,7 @@ drop_db(Node, DB) ->
|
||||||
emqx_ds:topic_filter(),
|
emqx_ds:topic_filter(),
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
[{integer(), emqx_ds_storage_layer:stream_v1()}].
|
||||||
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
|
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
node(),
|
node(),
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
emqx_ds_storage_layer:stream(),
|
emqx_ds_storage_layer:stream_v1(),
|
||||||
emqx_ds:topic_filter(),
|
emqx_ds:topic_filter(),
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
|
@ -144,4 +144,7 @@ drop_generation(Node, DB, Shard, GenId) ->
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
"5.6.0".
|
"5.5.0".
|
||||||
|
|
||||||
|
deprecated_since() ->
|
||||||
|
"5.5.1".
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ds_proto_v4).
|
||||||
|
|
||||||
|
-behavior(emqx_bpapi).
|
||||||
|
|
||||||
|
-include_lib("emqx_utils/include/bpapi.hrl").
|
||||||
|
%% API:
|
||||||
|
-export([
|
||||||
|
drop_db/2,
|
||||||
|
store_batch/5,
|
||||||
|
get_streams/5,
|
||||||
|
make_iterator/6,
|
||||||
|
next/5,
|
||||||
|
update_iterator/5,
|
||||||
|
add_generation/2,
|
||||||
|
|
||||||
|
%% introduced in v3
|
||||||
|
list_generations_with_lifetimes/3,
|
||||||
|
drop_generation/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% behavior callbacks:
|
||||||
|
-export([introduced_in/0]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API funcions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec drop_db([node()], emqx_ds:db()) ->
|
||||||
|
[{ok, ok} | {error, _}].
|
||||||
|
drop_db(Node, DB) ->
|
||||||
|
erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
|
||||||
|
|
||||||
|
-spec get_streams(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds:topic_filter(),
|
||||||
|
emqx_ds:time()
|
||||||
|
) ->
|
||||||
|
[{integer(), emqx_ds_storage_layer:stream()}].
|
||||||
|
get_streams(Node, DB, Shard, TopicFilter, Time) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v2, [DB, Shard, TopicFilter, Time]).
|
||||||
|
|
||||||
|
-spec make_iterator(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:stream(),
|
||||||
|
emqx_ds:topic_filter(),
|
||||||
|
emqx_ds:time()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||||
|
make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [
|
||||||
|
DB, Shard, Stream, TopicFilter, StartTime
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec next(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:iterator(),
|
||||||
|
pos_integer()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
|
||||||
|
| {ok, end_of_stream}
|
||||||
|
| {error, _}.
|
||||||
|
next(Node, DB, Shard, Iter, BatchSize) ->
|
||||||
|
emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
|
||||||
|
|
||||||
|
-spec store_batch(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_replication_layer:batch(),
|
||||||
|
emqx_ds:message_store_opts()
|
||||||
|
) ->
|
||||||
|
emqx_ds:store_batch_result().
|
||||||
|
store_batch(Node, DB, Shard, Batch, Options) ->
|
||||||
|
emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
|
||||||
|
DB, Shard, Batch, Options
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec update_iterator(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:iterator(),
|
||||||
|
emqx_ds:message_key()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||||
|
update_iterator(Node, DB, Shard, OldIter, DSKey) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
|
||||||
|
DB, Shard, OldIter, DSKey
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec add_generation([node()], emqx_ds:db()) ->
|
||||||
|
[{ok, ok} | {error, _}].
|
||||||
|
add_generation(Node, DB) ->
|
||||||
|
erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------------------
|
||||||
|
%% Introduced in V3
|
||||||
|
%%--------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec list_generations_with_lifetimes(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id()
|
||||||
|
) ->
|
||||||
|
#{
|
||||||
|
emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()
|
||||||
|
}.
|
||||||
|
list_generations_with_lifetimes(Node, DB, Shard) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_list_generations_with_lifetimes_v3, [DB, Shard]).
|
||||||
|
|
||||||
|
-spec drop_generation(
|
||||||
|
node(),
|
||||||
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds_storage_layer:gen_id()
|
||||||
|
) ->
|
||||||
|
ok | {error, _}.
|
||||||
|
drop_generation(Node, DB, Shard, GenId) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% behavior callbacks
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.5.1".
|
Loading…
Reference in New Issue