feat(ds): introduce `update_iterator` callback

This commit is contained in:
Thales Macedo Garitezi 2023-12-06 18:08:08 -03:00
parent 2a6d72878f
commit 66d043becd
9 changed files with 271 additions and 5 deletions

View File

@ -19,6 +19,7 @@
{emqx_delayed,1}. {emqx_delayed,1}.
{emqx_delayed,2}. {emqx_delayed,2}.
{emqx_ds,1}. {emqx_ds,1}.
{emqx_ds,2}.
{emqx_eviction_agent,1}. {emqx_eviction_agent,1}.
{emqx_eviction_agent,2}. {emqx_eviction_agent,2}.
{emqx_exhook,1}. {emqx_exhook,1}.

View File

@ -28,7 +28,7 @@
-export([store_batch/2, store_batch/3]). -export([store_batch/2, store_batch/3]).
%% Message replay API: %% Message replay API:
-export([get_streams/3, make_iterator/4, next/3]). -export([get_streams/3, make_iterator/4, update_iterator/3, next/3]).
%% Misc. API: %% Misc. API:
-export([]). -export([]).
@ -128,6 +128,9 @@
-callback make_iterator(db(), ds_specific_stream(), topic_filter(), time()) -> -callback make_iterator(db(), ds_specific_stream(), topic_filter(), time()) ->
make_iterator_result(ds_specific_iterator()). make_iterator_result(ds_specific_iterator()).
-callback update_iterator(db(), ds_specific_iterator(), message_key()) ->
make_iterator_result(ds_specific_iterator()).
-callback next(db(), Iterator, pos_integer()) -> next_result(Iterator). -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
%%================================================================================ %%================================================================================
@ -215,6 +218,11 @@ get_streams(DB, TopicFilter, StartTime) ->
make_iterator(DB, Stream, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) ->
?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime). ?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime).
-spec update_iterator(db(), iterator(), message_key()) ->
make_iterator_result().
update_iterator(DB, OldIter, DSKey) ->
?module(DB):update_iterator(DB, OldIter, DSKey).
-spec next(db(), iterator(), pos_integer()) -> next_result(). -spec next(db(), iterator(), pos_integer()) -> next_result().
next(DB, Iter, BatchSize) -> next(DB, Iter, BatchSize) ->
?module(DB):next(DB, Iter, BatchSize). ?module(DB):next(DB, Iter, BatchSize).

View File

@ -27,6 +27,7 @@
store_batch/3, store_batch/3,
get_streams/3, get_streams/3,
make_iterator/4, make_iterator/4,
update_iterator/3,
next/3 next/3
]). ]).
@ -36,6 +37,7 @@
do_store_batch_v1/4, do_store_batch_v1/4,
do_get_streams_v1/4, do_get_streams_v1/4,
do_make_iterator_v1/5, do_make_iterator_v1/5,
do_update_iterator_v2/4,
do_next_v1/4 do_next_v1/4
]). ]).
@ -170,6 +172,30 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
Err Err
end. end.
-spec update_iterator(
emqx_ds:db(),
iterator(),
emqx_ds:message_key()
) ->
emqx_ds:make_iterator_result(iterator()).
update_iterator(DB, OldIter, DSKey) ->
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
Node = node_of_shard(DB, Shard),
case
emqx_ds_proto_v2:update_iterator(
Node,
DB,
Shard,
StorageIter,
DSKey
)
of
{ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} ->
Err
end.
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(DB, Iter0, BatchSize) -> next(DB, Iter0, BatchSize) ->
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
@ -236,6 +262,18 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) -> do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
-spec do_update_iterator_v2(
emqx_ds:db(),
emqx_ds_storage_layer:shard_id(),
emqx_ds_storage_layer:iterator(),
emqx_ds:message_key()
) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
emqx_ds_storage_layer:update_iterator(
{DB, Shard}, OldIter, DSKey
).
-spec do_next_v1( -spec do_next_v1(
emqx_ds:db(), emqx_ds:db(),
emqx_ds_replication_layer:shard_id(), emqx_ds_replication_layer:shard_id(),

View File

@ -24,7 +24,15 @@
-export([]). -export([]).
%% behavior callbacks: %% behavior callbacks:
-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). -export([
create/4,
open/5,
store_batch/4,
get_streams/4,
make_iterator/5,
update_iterator/4,
next/4
]).
%% internal exports: %% internal exports:
-export([format_key/2]). -export([format_key/2]).
@ -236,6 +244,20 @@ make_iterator(
?last_seen_key => <<>> ?last_seen_key => <<>>
}}. }}.
-spec update_iterator(
emqx_ds_storage_layer:shard_id(),
s(),
iterator(),
emqx_ds:message_key()
) -> {ok, iterator()}.
update_iterator(
_Shard,
_Data,
#{?tag := ?IT} = OldIter,
DSKey
) ->
{ok, OldIter#{?last_seen_key => DSKey}}.
next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
%% Compute safe cutoff time. %% Compute safe cutoff time.
%% It's the point in time where the last complete epoch ends, so we need to know %% It's the point in time where the last complete epoch ends, so we need to know

View File

@ -18,7 +18,15 @@
-behaviour(gen_server). -behaviour(gen_server).
%% Replication layer API: %% Replication layer API:
-export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/4, next/3]). -export([
open_shard/2,
drop_shard/1,
store_batch/3,
get_streams/3,
make_iterator/4,
update_iterator/3,
next/3
]).
%% gen_server %% gen_server
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -192,6 +200,27 @@ make_iterator(
Err Err
end. end.
-spec update_iterator(
shard_id(), iterator(), emqx_ds:message_key()
) ->
emqx_ds:make_iterator_result(iterator()).
update_iterator(
Shard,
#{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
DSKey
) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
{ok, Iter} ->
{ok, #{
?tag => ?IT,
?generation => GenId,
?enc => Iter
}};
{error, _} = Err ->
Err
end.
-spec next(shard_id(), iterator(), pos_integer()) -> -spec next(shard_id(), iterator(), pos_integer()) ->
emqx_ds:next_result(iterator()). emqx_ds:next_result(iterator()).
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->

View File

@ -27,7 +27,15 @@
-export([]). -export([]).
%% behavior callbacks: %% behavior callbacks:
-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). -export([
create/4,
open/5,
store_batch/4,
get_streams/4,
make_iterator/5,
update_iterator/4,
next/4
]).
%% internal exports: %% internal exports:
-export([]). -export([]).
@ -97,6 +105,17 @@ make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) ->
start_time = StartTime start_time = StartTime
}}. }}.
update_iterator(_Shard, _Data, OldIter, DSKey) ->
#it{
topic_filter = TopicFilter,
start_time = StartTime
} = OldIter,
{ok, #it{
topic_filter = TopicFilter,
start_time = StartTime,
last_seen_message_key = DSKey
}}.
next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
#it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
{ok, ITHandle} = rocksdb:iterator(DB, CF, []), {ok, ITHandle} = rocksdb:iterator(DB, CF, []),

View File

@ -19,7 +19,13 @@
-include_lib("emqx_utils/include/bpapi.hrl"). -include_lib("emqx_utils/include/bpapi.hrl").
%% API: %% API:
-export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]). -export([
drop_db/2,
store_batch/5,
get_streams/5,
make_iterator/6,
next/5
]).
%% behavior callbacks: %% behavior callbacks:
-export([introduced_in/0]). -export([introduced_in/0]).

View File

@ -0,0 +1,118 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_proto_v2).
-behavior(emqx_bpapi).
-include_lib("emqx_utils/include/bpapi.hrl").
%% API:
-export([
drop_db/2,
store_batch/5,
get_streams/5,
make_iterator/6,
next/5,
%% introduced in v2
update_iterator/5
]).
%% behavior callbacks:
-export([introduced_in/0]).
%%================================================================================
%% API funcions
%%================================================================================
-spec drop_db([node()], emqx_ds:db()) ->
[{ok, ok} | erpc:caught_call_exception()].
drop_db(Node, DB) ->
erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
-spec get_streams(
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds:topic_filter(),
emqx_ds:time()
) ->
[{integer(), emqx_ds_storage_layer:stream()}].
get_streams(Node, DB, Shard, TopicFilter, Time) ->
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
-spec make_iterator(
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:stream(),
emqx_ds:topic_filter(),
emqx_ds:time()
) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
DB, Shard, Stream, TopicFilter, StartTime
]).
-spec next(
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:iterator(),
pos_integer()
) ->
{ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]}
| {ok, end_of_stream}
| {error, _}.
next(Node, DB, Shard, Iter, BatchSize) ->
emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
-spec store_batch(
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_replication_layer:batch(),
emqx_ds:message_store_opts()
) ->
emqx_ds:store_batch_result().
store_batch(Node, DB, Shard, Batch, Options) ->
emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
DB, Shard, Batch, Options
]).
%%--------------------------------------------------------------------------------
%% Introduced in V2
%%--------------------------------------------------------------------------------
-spec update_iterator(
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:iterator(),
emqx_ds:message_key()
) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
update_iterator(Node, DB, Shard, OldIter, DSKey) ->
erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
DB, Shard, OldIter, DSKey
]).
%%================================================================================
%% behavior callbacks
%%================================================================================
introduced_in() ->
"5.5.0".

View File

@ -130,6 +130,31 @@ t_04_restart(_Config) ->
{ok, Iter, Batch} = iterate(DB, Iter0, 1), {ok, Iter, Batch} = iterate(DB, Iter0, 1),
?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}).
%% Check that we can create iterators directly from DS keys.
t_05_update_iterator(_Config) ->
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
TopicFilter = ['#'],
StartTime = 0,
Msgs = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo">>, <<"2">>, 1),
message(<<"bar/bar">>, <<"3">>, 2)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
Res0 = emqx_ds:next(DB, Iter0, 1),
?assertMatch({ok, _OldIter, [{_Key0, _Msg0}]}, Res0),
{ok, OldIter, [{Key0, Msg0}]} = Res0,
Res1 = emqx_ds:update_iterator(DB, OldIter, Key0),
?assertMatch({ok, _Iter1}, Res1),
{ok, Iter1} = Res1,
{ok, FinalIter, Batch} = iterate(DB, Iter1, 1),
AllMsgs = [Msg0 | [Msg || {_Key, Msg} <- Batch]],
?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
ok.
message(Topic, Payload, PublishedAt) -> message(Topic, Payload, PublishedAt) ->
#message{ #message{
topic = Topic, topic = Topic,