diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index e2c726fe5..208c34e30 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -19,6 +19,7 @@ {emqx_delayed,1}. {emqx_delayed,2}. {emqx_ds,1}. +{emqx_ds,2}. {emqx_eviction_agent,1}. {emqx_eviction_agent,2}. {emqx_exhook,1}. diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 494730346..1053978dc 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -431,7 +431,7 @@ get_commit_next(comp, #inflight{commits = Commits}) -> publish_fetch(PreprocFun, FirstSeqno, Messages) -> flatmapfoldl( - fun(MessageIn, Acc) -> + fun({_DSKey, MessageIn}, Acc) -> Message = PreprocFun(MessageIn), publish_fetch(Message, Acc) end, @@ -450,7 +450,7 @@ publish_fetch(Messages, Seqno) -> publish_replay(PreprocFun, Commits, FirstSeqno, Messages) -> #{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits, flatmapfoldl( - fun(MessageIn, Acc) -> + fun({_DSKey, MessageIn}, Acc) -> Message = PreprocFun(MessageIn), publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc) end, diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 6c069b74c..f25f38098 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -412,8 +412,8 @@ consume(It) -> case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of {ok, _NIt, _Msgs = []} -> []; - {ok, NIt, Msgs} -> - Msgs ++ consume(NIt); + {ok, NIt, MsgsAndKeys} -> + [Msg || {_DSKey, Msg} <- MsgsAndKeys] ++ consume(NIt); {ok, end_of_stream} -> [] end. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 6b371cf79..f341a10f4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -28,7 +28,7 @@ -export([store_batch/2, store_batch/3]). %% Message replay API: --export([get_streams/3, make_iterator/4, next/3]). +-export([get_streams/3, make_iterator/4, update_iterator/3, next/3]). %% Misc. API: -export([]). @@ -43,6 +43,7 @@ stream_rank/0, iterator/0, message_id/0, + message_key/0, next_result/1, next_result/0, store_batch_result/0, make_iterator_result/1, make_iterator_result/0, @@ -74,6 +75,8 @@ -type ds_specific_stream() :: term(). +-type message_key() :: binary(). + -type store_batch_result() :: ok | {error, _}. -type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}. @@ -81,7 +84,7 @@ -type make_iterator_result() :: make_iterator_result(iterator()). -type next_result(Iterator) :: - {ok, Iterator, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. + {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | {error, _}. -type next_result() :: next_result(iterator()). @@ -125,6 +128,9 @@ -callback make_iterator(db(), ds_specific_stream(), topic_filter(), time()) -> make_iterator_result(ds_specific_iterator()). +-callback update_iterator(db(), ds_specific_iterator(), message_key()) -> + make_iterator_result(ds_specific_iterator()). + -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator). %%================================================================================ @@ -212,6 +218,11 @@ get_streams(DB, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) -> ?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime). +-spec update_iterator(db(), iterator(), message_key()) -> + make_iterator_result(). +update_iterator(DB, OldIter, DSKey) -> + ?module(DB):update_iterator(DB, OldIter, DSKey). + -spec next(db(), iterator(), pos_integer()) -> next_result(). next(DB, Iter, BatchSize) -> ?module(DB):next(DB, Iter, BatchSize). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 7a26b696d..5c24552f9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -27,6 +27,7 @@ store_batch/3, get_streams/3, make_iterator/4, + update_iterator/3, next/3 ]). @@ -36,6 +37,7 @@ do_store_batch_v1/4, do_get_streams_v1/4, do_make_iterator_v1/5, + do_update_iterator_v2/4, do_next_v1/4 ]). @@ -170,6 +172,30 @@ make_iterator(DB, Stream, TopicFilter, StartTime) -> Err end. +-spec update_iterator( + emqx_ds:db(), + iterator(), + emqx_ds:message_key() +) -> + emqx_ds:make_iterator_result(iterator()). +update_iterator(DB, OldIter, DSKey) -> + #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, + Node = node_of_shard(DB, Shard), + case + emqx_ds_proto_v2:update_iterator( + Node, + DB, + Shard, + StorageIter, + DSKey + ) + of + {ok, Iter} -> + {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; + Err = {error, _} -> + Err + end. + -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(DB, Iter0, BatchSize) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, @@ -236,6 +262,18 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) -> do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). +-spec do_update_iterator_v2( + emqx_ds:db(), + emqx_ds_storage_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + emqx_ds:message_key() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> + emqx_ds_storage_layer:update_iterator( + {DB, Shard}, OldIter, DSKey + ). + -spec do_next_v1( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index c2f533673..4c59a5f62 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -24,7 +24,15 @@ -export([]). %% behavior callbacks: --export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). +-export([ + create/4, + open/5, + store_batch/4, + get_streams/4, + make_iterator/5, + update_iterator/4, + next/4 +]). %% internal exports: -export([format_key/2]). @@ -236,6 +244,20 @@ make_iterator( ?last_seen_key => <<>> }}. +-spec update_iterator( + emqx_ds_storage_layer:shard_id(), + s(), + iterator(), + emqx_ds:message_key() +) -> {ok, iterator()}. +update_iterator( + _Shard, + _Data, + #{?tag := ?IT} = OldIter, + DSKey +) -> + {ok, OldIter#{?last_seen_key => DSKey}}. + next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know @@ -329,7 +351,7 @@ traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> Msg = deserialize(Val), case check_message(Cutoff, It, Msg) of true -> - Acc = [Msg | Acc0], + Acc = [{Key, Msg} | Acc0], traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1); false -> traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N); diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 54530f428..929485c53 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -18,7 +18,15 @@ -behaviour(gen_server). %% Replication layer API: --export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/4, next/3]). +-export([ + open_shard/2, + drop_shard/1, + store_batch/3, + get_streams/3, + make_iterator/4, + update_iterator/3, + next/3 +]). %% gen_server -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -192,6 +200,27 @@ make_iterator( Err end. +-spec update_iterator( + shard_id(), iterator(), emqx_ds:message_key() +) -> + emqx_ds:make_iterator_result(iterator()). +update_iterator( + Shard, + #{?tag := ?IT, ?generation := GenId, ?enc := OldIter}, + DSKey +) -> + #{module := Mod, data := GenData} = generation_get(Shard, GenId), + case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of + {ok, Iter} -> + {ok, #{ + ?tag => ?IT, + ?generation => GenId, + ?enc => Iter + }}; + {error, _} = Err -> + Err + end. + -spec next(shard_id(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 6676faf88..da7ac79f6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -27,7 +27,15 @@ -export([]). %% behavior callbacks: --export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). +-export([ + create/4, + open/5, + store_batch/4, + get_streams/4, + make_iterator/5, + update_iterator/4, + next/4 +]). %% internal exports: -export([]). @@ -97,6 +105,17 @@ make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) -> start_time = StartTime }}. +update_iterator(_Shard, _Data, OldIter, DSKey) -> + #it{ + topic_filter = TopicFilter, + start_time = StartTime + } = OldIter, + {ok, #it{ + topic_filter = TopicFilter, + start_time = StartTime, + last_seen_message_key = DSKey + }}. + next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), @@ -125,7 +144,7 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of true -> - do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [Msg | Acc]); + do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]); false -> do_next(TopicFilter, StartTime, IT, next, NLeft, Key, Acc) end; diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 3b7c36082..cc851fc55 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,13 @@ -include_lib("emqx_utils/include/bpapi.hrl"). %% API: --export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]). +-export([ + drop_db/2, + store_batch/5, + get_streams/5, + make_iterator/6, + next/5 +]). %% behavior callbacks: -export([introduced_in/0]). @@ -65,7 +71,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:iterator(), pos_integer() ) -> - {ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]} + {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]} | {ok, end_of_stream} | {error, _}. next(Node, DB, Shard, Iter, BatchSize) -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl new file mode 100644 index 000000000..73596fffc --- /dev/null +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl @@ -0,0 +1,118 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_proto_v2). + +-behavior(emqx_bpapi). + +-include_lib("emqx_utils/include/bpapi.hrl"). +%% API: +-export([ + drop_db/2, + store_batch/5, + get_streams/5, + make_iterator/6, + next/5, + + %% introduced in v2 + update_iterator/5 +]). + +%% behavior callbacks: +-export([introduced_in/0]). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec drop_db([node()], emqx_ds:db()) -> + [{ok, ok} | erpc:caught_call_exception()]. +drop_db(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]). + +-spec get_streams( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + [{integer(), emqx_ds_storage_layer:stream()}]. +get_streams(Node, DB, Shard, TopicFilter, Time) -> + erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]). + +-spec make_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [ + DB, Shard, Stream, TopicFilter, StartTime + ]). + +-spec next( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + pos_integer() +) -> + {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]} + | {ok, end_of_stream} + | {error, _}. +next(Node, DB, Shard, Iter, BatchSize) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). + +-spec store_batch( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_replication_layer:batch(), + emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). +store_batch(Node, DB, Shard, Batch, Options) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [ + DB, Shard, Batch, Options + ]). + +%%-------------------------------------------------------------------------------- +%% Introduced in V2 +%%-------------------------------------------------------------------------------- + +-spec update_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + emqx_ds:message_key() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +update_iterator(Node, DB, Shard, OldIter, DSKey) -> + erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [ + DB, Shard, OldIter, DSKey + ]). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +introduced_in() -> + "5.5.0". diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 8a46804b0..f44000eeb 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -101,7 +101,7 @@ t_03_smoke_iterate(_Config) -> [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), {ok, Iter, Batch} = iterate(DB, Iter0, 1), - ?assertEqual(Msgs, Batch, {Iter0, Iter}). + ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). %% Verify that iterators survive restart of the application. This is %% an important property, since the lifetime of the iterators is tied @@ -128,7 +128,32 @@ t_04_restart(_Config) -> ok = emqx_ds:open_db(DB, opts()), %% The old iterator should be still operational: {ok, Iter, Batch} = iterate(DB, Iter0, 1), - ?assertEqual(Msgs, Batch, {Iter0, Iter}). + ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). + +%% Check that we can create iterators directly from DS keys. +t_05_update_iterator(_Config) -> + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + TopicFilter = ['#'], + StartTime = 0, + Msgs = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo">>, <<"2">>, 1), + message(<<"bar/bar">>, <<"3">>, 2) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), + Res0 = emqx_ds:next(DB, Iter0, 1), + ?assertMatch({ok, _OldIter, [{_Key0, _Msg0}]}, Res0), + {ok, OldIter, [{Key0, Msg0}]} = Res0, + Res1 = emqx_ds:update_iterator(DB, OldIter, Key0), + ?assertMatch({ok, _Iter1}, Res1), + {ok, Iter1} = Res1, + {ok, FinalIter, Batch} = iterate(DB, Iter1, 1), + AllMsgs = [Msg0 | [Msg || {_Key, Msg} <- Batch]], + ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}), + ok. message(Topic, Payload, PublishedAt) -> #message{ diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 7b733406d..fc6049669 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -64,7 +64,8 @@ t_iterate(_Config) -> begin [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), - {ok, NextIt, Messages} = emqx_ds_storage_layer:next(?SHARD, It, 100), + {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100), + Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys], ?assertEqual( lists:map(fun integer_to_binary/1, Timestamps), payloads(Messages) @@ -249,7 +250,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> {ok, _NextIt, []} -> []; {ok, NextIt, Batch} -> - Batch ++ F(NextIt, N - 1) + [Msg || {_DSKey, Msg} <- Batch] ++ F(NextIt, N - 1) end end, MaxIterations = 1000000,