diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 987c19535..9497f04cd 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -23,6 +23,7 @@ {emqx_ds,1}. {emqx_ds,2}. {emqx_ds,3}. +{emqx_ds,4}. {emqx_eviction_agent,1}. {emqx_eviction_agent,2}. {emqx_exhook,1}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 7432fe3c7..ed3a93212 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -43,7 +43,9 @@ do_drop_db_v1/1, do_store_batch_v1/4, do_get_streams_v1/4, + do_get_streams_v2/4, do_make_iterator_v1/5, + do_make_iterator_v2/5, do_update_iterator_v2/4, do_next_v1/4, do_add_generation_v2/1, @@ -51,7 +53,9 @@ do_drop_generation_v3/3 ]). --export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). +-export_type([ + shard_id/0, builtin_db_opts/0, stream_v1/0, stream/0, iterator/0, message_id/0, batch/0 +]). -include_lib("emqx_utils/include/emqx_message.hrl"). -include("emqx_ds_replication_layer.hrl"). @@ -72,17 +76,19 @@ %% This enapsulates the stream entity from the replication level. %% -%% TODO: currently the stream is hardwired to only support the -%% internal rocksdb storage. In the future we want to add another -%% implementations for emqx_ds, so this type has to take this into -%% account. --opaque stream() :: +%% TODO: this type is obsolete and is kept only for compatibility with +%% v3 BPAPI. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6) +-opaque stream_v1() :: #{ ?tag := ?STREAM, ?shard := emqx_ds_replication_layer:shard_id(), - ?enc := emqx_ds_storage_layer:stream() + ?enc := emqx_ds_storage_layer:stream_v1() }. +-define(stream_v2(SHARD, INNER), [2, SHARD | INNER]). + +-opaque stream() :: nonempty_maybe_improper_list(). + -opaque iterator() :: #{ ?tag := ?IT, @@ -121,7 +127,7 @@ open_db(DB, CreateOpts) -> -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), - _ = emqx_ds_proto_v3:add_generation(Nodes, DB), + _ = emqx_ds_proto_v4:add_generation(Nodes, DB), ok. -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. @@ -140,7 +146,7 @@ list_generations_with_lifetimes(DB) -> AccInner#{{Shard, GenId} => Data} end, GensAcc, - emqx_ds_proto_v3:list_generations_with_lifetimes(Node, DB, Shard) + emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard) ) end, #{}, @@ -152,12 +158,12 @@ drop_generation(DB, {Shard, GenId}) -> %% TODO: drop generation in all nodes in the replica set, not only in the leader, %% after we have proper replication in place. Node = node_of_shard(DB, Shard), - emqx_ds_proto_v3:drop_generation(Node, DB, Shard, GenId). + emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId). -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> Nodes = list_nodes(), - _ = emqx_ds_proto_v3:drop_db(Nodes, DB), + _ = emqx_ds_proto_v4:drop_db(Nodes, DB), _ = emqx_ds_replication_layer_meta:drop_db(DB), emqx_ds_builtin_sup:stop_db(DB), ok. @@ -174,16 +180,12 @@ get_streams(DB, TopicFilter, StartTime) -> lists:flatmap( fun(Shard) -> Node = node_of_shard(DB, Shard), - Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime), + Streams = emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime), lists:map( - fun({RankY, Stream}) -> + fun({RankY, StorageLayerStream}) -> RankX = Shard, Rank = {RankX, RankY}, - {Rank, #{ - ?tag => ?STREAM, - ?shard => Shard, - ?enc => Stream - }} + {Rank, ?stream_v2(Shard, StorageLayerStream)} end, Streams ) @@ -194,9 +196,9 @@ get_streams(DB, TopicFilter, StartTime) -> -spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). make_iterator(DB, Stream, TopicFilter, StartTime) -> - #{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream, + ?stream_v2(Shard, StorageStream) = Stream, Node = node_of_shard(DB, Shard), - case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of + case emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> @@ -213,7 +215,7 @@ update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, Node = node_of_shard(DB, Shard), case - emqx_ds_proto_v3:update_iterator( + emqx_ds_proto_v4:update_iterator( Node, DB, Shard, @@ -239,7 +241,7 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of + case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; @@ -311,14 +313,35 @@ do_drop_db_v1(DB) -> do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) -> emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). +%% Remove me in EMQX 5.6 +-dialyzer({nowarn_function, do_get_streams_v1/4}). -spec do_get_streams_v1( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() +) -> + [{integer(), emqx_ds_storage_layer:stream_v1()}]. +do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) -> + error(obsolete_api). + +-spec do_get_streams_v2( + emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() ) -> [{integer(), emqx_ds_storage_layer:stream()}]. -do_get_streams_v1(DB, Shard, TopicFilter, StartTime) -> +do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime). +-dialyzer({nowarn_function, do_make_iterator_v1/5}). -spec do_make_iterator_v1( + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:stream_v1(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> + error(obsolete_api). + +-spec do_make_iterator_v2( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds_storage_layer:stream(), @@ -326,7 +349,7 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) -> emqx_ds:time() ) -> {ok, emqx_ds_storage_layer:iterator()} | {error, _}. -do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) -> +do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). -spec do_update_iterator_v2( diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d407dab41..7ffdd1e2b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -89,11 +89,7 @@ -type s() :: #s{}. --type stream() :: - #{ - ?tag := ?STREAM, - ?storage_key := emqx_ds_lts:msg_storage_key() - }. +-type stream() :: emqx_ds_lts:msg_storage_key(). -type iterator() :: #{ @@ -251,8 +247,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> emqx_ds:time() ) -> [stream()]. get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> - Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter), - [#{?tag => ?STREAM, ?storage_key => I} || I <- Indexes]. + emqx_ds_lts:match_topics(Trie, TopicFilter). -spec make_iterator( emqx_ds_storage_layer:shard_id(), @@ -262,7 +257,7 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> emqx_ds:time() ) -> {ok, iterator()}. make_iterator( - _Shard, _Data, #{?tag := ?STREAM, ?storage_key := StorageKey}, TopicFilter, StartTime + _Shard, _Data, StorageKey, TopicFilter, StartTime ) -> %% Note: it's a good idea to keep the iterator structure lean, %% since it can be stored on a remote node that could update its diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index f22e7423c..e0bf1fa1b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -43,6 +43,7 @@ generation/0, cf_refs/0, stream/0, + stream_v1/0, iterator/0, shard_id/0, options/0, @@ -54,6 +55,8 @@ -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). +-define(stream_v2(GENERATION, INNER), [GENERATION | INNER]). + %%================================================================================ %% Type declarations %%================================================================================ @@ -80,14 +83,17 @@ -type gen_id() :: 0..16#ffff. -%% Note: this might be stored permanently on a remote node. --opaque stream() :: +%% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6 +-opaque stream_v1() :: #{ ?tag := ?STREAM, ?generation := gen_id(), ?enc := term() }. +%% Note: this might be stored permanently on a remote node. +-opaque stream() :: nonempty_maybe_improper_list(gen_id(), term()). + %% Note: this might be stred permanently on a remote node. -opaque iterator() :: #{ @@ -221,12 +227,8 @@ get_streams(Shard, TopicFilter, StartTime) -> {ok, #{module := Mod, data := GenData}} -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), [ - {GenId, #{ - ?tag => ?STREAM, - ?generation => GenId, - ?enc => Stream - }} - || Stream <- Streams + {GenId, ?stream_v2(GenId, InnerStream)} + || InnerStream <- Streams ]; {error, not_found} -> %% race condition: generation was dropped before getting its streams? @@ -239,7 +241,7 @@ get_streams(Shard, TopicFilter, StartTime) -> -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). make_iterator( - Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime + Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime ) -> case generation_get_safe(Shard, GenId) of {ok, #{module := Mod, data := GenData}} -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 67ed1a3ca..e9a19c8df 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ ]). %% behavior callbacks: --export([introduced_in/0]). +-export([introduced_in/0, deprecated_since/0]). %%================================================================================ %% API funcions @@ -45,7 +45,7 @@ drop_db(Node, DB) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - [{integer(), emqx_ds_storage_layer:stream()}]. + [{integer(), emqx_ds_storage_layer:stream_v1()}]. get_streams(Node, DB, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]). @@ -53,7 +53,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) -> node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - emqx_ds_storage_layer:stream(), + emqx_ds_storage_layer:stream_v1(), emqx_ds:topic_filter(), emqx_ds:time() ) -> @@ -95,3 +95,6 @@ store_batch(Node, DB, Shard, Batch, Options) -> introduced_in() -> "5.4.0". + +deprecated_since() -> + "5.5.0". diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl index f771f1a8b..1ab158747 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ ]). %% behavior callbacks: --export([introduced_in/0]). +-export([introduced_in/0, deprecated_since/0]). %%================================================================================ %% API funcions @@ -50,7 +50,7 @@ drop_db(Node, DB) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - [{integer(), emqx_ds_storage_layer:stream()}]. + [{integer(), emqx_ds_storage_layer:stream_v1()}]. get_streams(Node, DB, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]). @@ -58,7 +58,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) -> node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - emqx_ds_storage_layer:stream(), + emqx_ds_storage_layer:stream_v1(), emqx_ds:topic_filter(), emqx_ds:time() ) -> @@ -122,3 +122,6 @@ add_generation(Node, DB) -> introduced_in() -> "5.5.0". + +deprecated_since() -> + "5.5.0". diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl index 74a174c4c..40205c548 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl @@ -34,7 +34,7 @@ ]). %% behavior callbacks: --export([introduced_in/0]). +-export([introduced_in/0, deprecated_since/0]). %%================================================================================ %% API funcions @@ -52,7 +52,7 @@ drop_db(Node, DB) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - [{integer(), emqx_ds_storage_layer:stream()}]. + [{integer(), emqx_ds_storage_layer:stream_v1()}]. get_streams(Node, DB, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]). @@ -60,7 +60,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) -> node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - emqx_ds_storage_layer:stream(), + emqx_ds_storage_layer:stream_v1(), emqx_ds:topic_filter(), emqx_ds:time() ) -> @@ -144,4 +144,7 @@ drop_generation(Node, DB, Shard, GenId) -> %%================================================================================ introduced_in() -> - "5.6.0". + "5.5.0". + +deprecated_since() -> + "5.5.1". diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl new file mode 100644 index 000000000..fcab12507 --- /dev/null +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl @@ -0,0 +1,147 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_proto_v4). + +-behavior(emqx_bpapi). + +-include_lib("emqx_utils/include/bpapi.hrl"). +%% API: +-export([ + drop_db/2, + store_batch/5, + get_streams/5, + make_iterator/6, + next/5, + update_iterator/5, + add_generation/2, + + %% introduced in v3 + list_generations_with_lifetimes/3, + drop_generation/4 +]). + +%% behavior callbacks: +-export([introduced_in/0]). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec drop_db([node()], emqx_ds:db()) -> + [{ok, ok} | {error, _}]. +drop_db(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]). + +-spec get_streams( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + [{integer(), emqx_ds_storage_layer:stream()}]. +get_streams(Node, DB, Shard, TopicFilter, Time) -> + erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v2, [DB, Shard, TopicFilter, Time]). + +-spec make_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [ + DB, Shard, Stream, TopicFilter, StartTime + ]). + +-spec next( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + pos_integer() +) -> + {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]} + | {ok, end_of_stream} + | {error, _}. +next(Node, DB, Shard, Iter, BatchSize) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). + +-spec store_batch( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_replication_layer:batch(), + emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). +store_batch(Node, DB, Shard, Batch, Options) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [ + DB, Shard, Batch, Options + ]). + +-spec update_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + emqx_ds:message_key() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +update_iterator(Node, DB, Shard, OldIter, DSKey) -> + erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [ + DB, Shard, OldIter, DSKey + ]). + +-spec add_generation([node()], emqx_ds:db()) -> + [{ok, ok} | {error, _}]. +add_generation(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]). + +%%-------------------------------------------------------------------------------- +%% Introduced in V3 +%%-------------------------------------------------------------------------------- + +-spec list_generations_with_lifetimes( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id() +) -> + #{ + emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info() + }. +list_generations_with_lifetimes(Node, DB, Shard) -> + erpc:call(Node, emqx_ds_replication_layer, do_list_generations_with_lifetimes_v3, [DB, Shard]). + +-spec drop_generation( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:gen_id() +) -> + ok | {error, _}. +drop_generation(Node, DB, Shard, GenId) -> + erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +introduced_in() -> + "5.5.1".