From 66d043becd0374075bdc9564784f3c527e534e51 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Dec 2023 18:08:08 -0300 Subject: [PATCH] feat(ds): introduce `update_iterator` callback --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_durable_storage/src/emqx_ds.erl | 10 +- .../src/emqx_ds_replication_layer.erl | 38 ++++++ .../src/emqx_ds_storage_bitfield_lts.erl | 24 +++- .../src/emqx_ds_storage_layer.erl | 31 ++++- .../src/emqx_ds_storage_reference.erl | 21 +++- .../src/proto/emqx_ds_proto_v1.erl | 8 +- .../src/proto/emqx_ds_proto_v2.erl | 118 ++++++++++++++++++ .../test/emqx_ds_SUITE.erl | 25 ++++ 9 files changed, 271 insertions(+), 5 deletions(-) create mode 100644 apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index e2c726fe5..208c34e30 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -19,6 +19,7 @@ {emqx_delayed,1}. {emqx_delayed,2}. {emqx_ds,1}. +{emqx_ds,2}. {emqx_eviction_agent,1}. {emqx_eviction_agent,2}. {emqx_exhook,1}. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index d9e4e4b5a..192066089 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -28,7 +28,7 @@ -export([store_batch/2, store_batch/3]). %% Message replay API: --export([get_streams/3, make_iterator/4, next/3]). +-export([get_streams/3, make_iterator/4, update_iterator/3, next/3]). %% Misc. API: -export([]). @@ -128,6 +128,9 @@ -callback make_iterator(db(), ds_specific_stream(), topic_filter(), time()) -> make_iterator_result(ds_specific_iterator()). +-callback update_iterator(db(), ds_specific_iterator(), message_key()) -> + make_iterator_result(ds_specific_iterator()). + -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator). %%================================================================================ @@ -215,6 +218,11 @@ get_streams(DB, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) -> ?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime). +-spec update_iterator(db(), iterator(), message_key()) -> + make_iterator_result(). +update_iterator(DB, OldIter, DSKey) -> + ?module(DB):update_iterator(DB, OldIter, DSKey). + -spec next(db(), iterator(), pos_integer()) -> next_result(). next(DB, Iter, BatchSize) -> ?module(DB):next(DB, Iter, BatchSize). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 7a26b696d..5c24552f9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -27,6 +27,7 @@ store_batch/3, get_streams/3, make_iterator/4, + update_iterator/3, next/3 ]). @@ -36,6 +37,7 @@ do_store_batch_v1/4, do_get_streams_v1/4, do_make_iterator_v1/5, + do_update_iterator_v2/4, do_next_v1/4 ]). @@ -170,6 +172,30 @@ make_iterator(DB, Stream, TopicFilter, StartTime) -> Err end. +-spec update_iterator( + emqx_ds:db(), + iterator(), + emqx_ds:message_key() +) -> + emqx_ds:make_iterator_result(iterator()). +update_iterator(DB, OldIter, DSKey) -> + #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, + Node = node_of_shard(DB, Shard), + case + emqx_ds_proto_v2:update_iterator( + Node, + DB, + Shard, + StorageIter, + DSKey + ) + of + {ok, Iter} -> + {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; + Err = {error, _} -> + Err + end. + -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(DB, Iter0, BatchSize) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, @@ -236,6 +262,18 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) -> do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). +-spec do_update_iterator_v2( + emqx_ds:db(), + emqx_ds_storage_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + emqx_ds:message_key() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> + emqx_ds_storage_layer:update_iterator( + {DB, Shard}, OldIter, DSKey + ). + -spec do_next_v1( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index b4422083a..4c59a5f62 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -24,7 +24,15 @@ -export([]). %% behavior callbacks: --export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). +-export([ + create/4, + open/5, + store_batch/4, + get_streams/4, + make_iterator/5, + update_iterator/4, + next/4 +]). %% internal exports: -export([format_key/2]). @@ -236,6 +244,20 @@ make_iterator( ?last_seen_key => <<>> }}. +-spec update_iterator( + emqx_ds_storage_layer:shard_id(), + s(), + iterator(), + emqx_ds:message_key() +) -> {ok, iterator()}. +update_iterator( + _Shard, + _Data, + #{?tag := ?IT} = OldIter, + DSKey +) -> + {ok, OldIter#{?last_seen_key => DSKey}}. + next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 54530f428..929485c53 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -18,7 +18,15 @@ -behaviour(gen_server). %% Replication layer API: --export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/4, next/3]). +-export([ + open_shard/2, + drop_shard/1, + store_batch/3, + get_streams/3, + make_iterator/4, + update_iterator/3, + next/3 +]). %% gen_server -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -192,6 +200,27 @@ make_iterator( Err end. +-spec update_iterator( + shard_id(), iterator(), emqx_ds:message_key() +) -> + emqx_ds:make_iterator_result(iterator()). +update_iterator( + Shard, + #{?tag := ?IT, ?generation := GenId, ?enc := OldIter}, + DSKey +) -> + #{module := Mod, data := GenData} = generation_get(Shard, GenId), + case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of + {ok, Iter} -> + {ok, #{ + ?tag => ?IT, + ?generation => GenId, + ?enc => Iter + }}; + {error, _} = Err -> + Err + end. + -spec next(shard_id(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 37719a38f..da7ac79f6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -27,7 +27,15 @@ -export([]). %% behavior callbacks: --export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). +-export([ + create/4, + open/5, + store_batch/4, + get_streams/4, + make_iterator/5, + update_iterator/4, + next/4 +]). %% internal exports: -export([]). @@ -97,6 +105,17 @@ make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) -> start_time = StartTime }}. +update_iterator(_Shard, _Data, OldIter, DSKey) -> + #it{ + topic_filter = TopicFilter, + start_time = StartTime + } = OldIter, + {ok, #it{ + topic_filter = TopicFilter, + start_time = StartTime, + last_seen_message_key = DSKey + }}. + next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 243ce230e..cc851fc55 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,13 @@ -include_lib("emqx_utils/include/bpapi.hrl"). %% API: --export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]). +-export([ + drop_db/2, + store_batch/5, + get_streams/5, + make_iterator/6, + next/5 +]). %% behavior callbacks: -export([introduced_in/0]). diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl new file mode 100644 index 000000000..73596fffc --- /dev/null +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl @@ -0,0 +1,118 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_proto_v2). + +-behavior(emqx_bpapi). + +-include_lib("emqx_utils/include/bpapi.hrl"). +%% API: +-export([ + drop_db/2, + store_batch/5, + get_streams/5, + make_iterator/6, + next/5, + + %% introduced in v2 + update_iterator/5 +]). + +%% behavior callbacks: +-export([introduced_in/0]). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec drop_db([node()], emqx_ds:db()) -> + [{ok, ok} | erpc:caught_call_exception()]. +drop_db(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]). + +-spec get_streams( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + [{integer(), emqx_ds_storage_layer:stream()}]. +get_streams(Node, DB, Shard, TopicFilter, Time) -> + erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]). + +-spec make_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [ + DB, Shard, Stream, TopicFilter, StartTime + ]). + +-spec next( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + pos_integer() +) -> + {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]} + | {ok, end_of_stream} + | {error, _}. +next(Node, DB, Shard, Iter, BatchSize) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). + +-spec store_batch( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_replication_layer:batch(), + emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). +store_batch(Node, DB, Shard, Batch, Options) -> + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [ + DB, Shard, Batch, Options + ]). + +%%-------------------------------------------------------------------------------- +%% Introduced in V2 +%%-------------------------------------------------------------------------------- + +-spec update_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + emqx_ds:message_key() +) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. +update_iterator(Node, DB, Shard, OldIter, DSKey) -> + erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [ + DB, Shard, OldIter, DSKey + ]). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +introduced_in() -> + "5.5.0". diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index c5af38def..f44000eeb 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -130,6 +130,31 @@ t_04_restart(_Config) -> {ok, Iter, Batch} = iterate(DB, Iter0, 1), ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). +%% Check that we can create iterators directly from DS keys. +t_05_update_iterator(_Config) -> + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + TopicFilter = ['#'], + StartTime = 0, + Msgs = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo">>, <<"2">>, 1), + message(<<"bar/bar">>, <<"3">>, 2) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), + Res0 = emqx_ds:next(DB, Iter0, 1), + ?assertMatch({ok, _OldIter, [{_Key0, _Msg0}]}, Res0), + {ok, OldIter, [{Key0, Msg0}]} = Res0, + Res1 = emqx_ds:update_iterator(DB, OldIter, Key0), + ?assertMatch({ok, _Iter1}, Res1), + {ok, Iter1} = Res1, + {ok, FinalIter, Batch} = iterate(DB, Iter1, 1), + AllMsgs = [Msg0 | [Msg || {_Key, Msg} <- Batch]], + ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}), + ok. + message(Topic, Payload, PublishedAt) -> #message{ topic = Topic,