Merge pull request #11906 from thalesmg/ds-avoid-records-m-20231108

chore(ds): avoid using records in persistence / rpc
This commit is contained in:
Thales Macedo Garitezi 2023-11-09 14:25:27 -03:00 committed by GitHub
commit ee87f90b0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 148 additions and 79 deletions

View File

@ -136,7 +136,7 @@ fetch(_SessionId, Inflight, _Streams = [], _N, Acc) ->
{lists:reverse(Acc), Inflight}; {lists:reverse(Acc), Inflight};
fetch(_SessionId, Inflight, _Streams, 0, Acc) -> fetch(_SessionId, Inflight, _Streams, 0, Acc) ->
{lists:reverse(Acc), Inflight}; {lists:reverse(Acc), Inflight};
fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishes0) -> fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) ->
#inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0, #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0,
ItBegin = get_last_iterator(SessionId, Stream, Ranges0), ItBegin = get_last_iterator(SessionId, Stream, Ranges0),
{ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N), {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N),
@ -166,6 +166,7 @@ fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishe
fetch(SessionId, Inflight1, Streams, N, Publishes) fetch(SessionId, Inflight1, Streams, N, Publishes)
end. end.
-spec update_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream(), emqx_ds:iterator()) -> ok.
update_iterator(SessionId, Stream, Iterator) -> update_iterator(SessionId, Stream, Iterator) ->
mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}). mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}).
@ -177,13 +178,20 @@ get_last_iterator(SessionId, Stream, Ranges) ->
Next Next
end. end.
-spec get_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream()) -> emqx_ds:iterator().
get_iterator(SessionId, Stream) -> get_iterator(SessionId, Stream) ->
Id = {SessionId, Stream}, Id = {SessionId, Stream},
[#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id), [#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id),
It. It.
-spec get_streams(emqx_persistent_session_ds:id()) -> [emqx_ds:stream()].
get_streams(SessionId) -> get_streams(SessionId) ->
mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId). lists:map(
fun(#ds_stream{stream = Stream}) ->
Stream
end,
mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId)
).
%% Packet ID as defined by MQTT protocol is a 16-bit integer in range %% Packet ID as defined by MQTT protocol is a 16-bit integer in range
%% 1..FFFF. This function translates internal session sequence number %% 1..FFFF. This function translates internal session sequence number

View File

@ -97,8 +97,6 @@
props := map() props := map()
}. }.
%% -type session() :: #session{}.
-type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type timestamp() :: emqx_utils_calendar:epoch_millisecond().
-type topic() :: emqx_types:topic(). -type topic() :: emqx_types:topic().
-type clientinfo() :: emqx_types:clientinfo(). -type clientinfo() :: emqx_types:clientinfo().

View File

@ -36,6 +36,7 @@
stream :: emqx_ds:stream(), stream :: emqx_ds:stream(),
rank :: emqx_ds:stream_rank() rank :: emqx_ds:stream_rank()
}). }).
-type ds_stream() :: #ds_stream{}.
-record(ds_iter, { -record(ds_iter, {
id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}, id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()},

View File

@ -43,30 +43,41 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
%% records over the wire.
%% tags:
-define(stream, stream).
-define(it, it).
%% keys:
-define(tag, 1).
-define(shard, 2).
-define(enc, 3).
-type db() :: emqx_ds:db(). -type db() :: emqx_ds:db().
-type shard_id() :: {db(), atom()}. -type shard_id() :: {db(), atom()}.
%% This record enapsulates the stream entity from the replication %% This enapsulates the stream entity from the replication level.
%% level.
%% %%
%% TODO: currently the stream is hardwired to only support the %% TODO: currently the stream is hardwired to only support the
%% internal rocksdb storage. In the future we want to add another %% internal rocksdb storage. In the future we want to add another
%% implementations for emqx_ds, so this type has to take this into %% implementations for emqx_ds, so this type has to take this into
%% account. %% account.
-record(stream, { -opaque stream() ::
shard :: emqx_ds_replication_layer:shard_id(), #{
enc :: emqx_ds_storage_layer:stream() ?tag := ?stream,
}). ?shard := emqx_ds_replication_layer:shard_id(),
?enc := emqx_ds_storage_layer:stream()
}.
-opaque stream() :: #stream{}. -opaque iterator() ::
#{
-record(iterator, { ?tag := ?it,
shard :: emqx_ds_replication_layer:shard_id(), ?shard := emqx_ds_replication_layer:shard_id(),
enc :: enqx_ds_storage_layer:iterator() ?enc := emqx_ds_storage_layer:iterator()
}). }.
-opaque iterator() :: #iterator{}.
-type message_id() :: emqx_ds_storage_layer:message_id(). -type message_id() :: emqx_ds_storage_layer:message_id().
@ -124,9 +135,10 @@ get_streams(DB, TopicFilter, StartTime) ->
fun({RankY, Stream}) -> fun({RankY, Stream}) ->
RankX = Shard, RankX = Shard,
Rank = {RankX, RankY}, Rank = {RankX, RankY},
{Rank, #stream{ {Rank, #{
shard = Shard, ?tag => ?stream,
enc = Stream ?shard => Shard,
?enc => Stream
}} }}
end, end,
Streams Streams
@ -138,18 +150,18 @@ get_streams(DB, TopicFilter, StartTime) ->
-spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator(Stream, TopicFilter, StartTime) -> make_iterator(Stream, TopicFilter, StartTime) ->
#stream{shard = Shard, enc = StorageStream} = Stream, #{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream,
Node = node_of_shard(Shard), Node = node_of_shard(Shard),
case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #iterator{shard = Shard, enc = Iter}}; {ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}};
Err = {error, _} -> Err = {error, _} ->
Err Err
end. end.
-spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(Iter0, BatchSize) -> next(Iter0, BatchSize) ->
#iterator{shard = Shard, enc = StorageIter0} = Iter0, #{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0,
Node = node_of_shard(Shard), Node = node_of_shard(Shard),
%% TODO: iterator can contain information that is useful for %% TODO: iterator can contain information that is useful for
%% reconstructing messages sent over the network. For example, %% reconstructing messages sent over the network. For example,
@ -161,7 +173,7 @@ next(Iter0, BatchSize) ->
%% replication layer. Or, perhaps, in the logic layer. %% replication layer. Or, perhaps, in the logic layer.
case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of
{ok, StorageIter, Batch} -> {ok, StorageIter, Batch} ->
Iter = #iterator{shard = Shard, enc = StorageIter}, Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch}; {ok, Iter, Batch};
Other -> Other ->
Other Other
@ -184,14 +196,14 @@ do_drop_shard_v1(Shard) ->
emqx_ds_storage_layer:drop_shard(Shard). emqx_ds_storage_layer:drop_shard(Shard).
-spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{integer(), _Stream}]. [{integer(), emqx_ds_storage_layer:stream()}].
do_get_streams_v1(Shard, TopicFilter, StartTime) -> do_get_streams_v1(Shard, TopicFilter, StartTime) ->
emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
-spec do_make_iterator_v1( -spec do_make_iterator_v1(
shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time() shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time()
) -> ) ->
{ok, iterator()} | {error, _}. {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) -> do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime). emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).

View File

@ -38,6 +38,20 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
%% records over the wire.
%% tags:
-define(stream, stream).
-define(it, it).
%% keys:
-define(tag, 1).
-define(topic_filter, 2).
-define(start_time, 3).
-define(storage_key, 4).
-define(last_seen_key, 5).
-type options() :: -type options() ::
#{ #{
bits_per_wildcard_level => pos_integer(), bits_per_wildcard_level => pos_integer(),
@ -65,18 +79,20 @@
-type s() :: #s{}. -type s() :: #s{}.
-record(stream, { -type stream() ::
storage_key :: emqx_ds_lts:msg_storage_key() #{
}). ?tag := ?stream,
?storage_key := emqx_ds_lts:msg_storage_key()
}.
-record(it, { -type iterator() ::
topic_filter :: emqx_ds:topic_filter(), #{
start_time :: emqx_ds:time(), ?tag := ?it,
storage_key :: emqx_ds_lts:msg_storage_key(), ?topic_filter := emqx_ds:topic_filter(),
last_seen_key = <<>> :: binary() ?start_time := emqx_ds:time(),
}). ?storage_key := emqx_ds_lts:msg_storage_key(),
?last_seen_key := binary()
-type iterator() :: #it{}. }.
-define(COUNTER, emqx_ds_storage_bitfield_lts_counter). -define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
@ -170,18 +186,35 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
Messages Messages
). ).
-spec get_streams(
emqx_ds_replication_layer:shard_id(),
s(),
emqx_ds:topic_filter(),
emqx_ds:time()
) -> [stream()].
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter), Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
[#stream{storage_key = I} || I <- Indexes]. [#{?tag => ?stream, ?storage_key => I} || I <- Indexes].
make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) -> -spec make_iterator(
emqx_ds_replication_layer:shard_id(),
s(),
stream(),
emqx_ds:topic_filter(),
emqx_ds:time()
) -> {ok, iterator()}.
make_iterator(
_Shard, _Data, #{?tag := ?stream, ?storage_key := StorageKey}, TopicFilter, StartTime
) ->
%% Note: it's a good idea to keep the iterator structure lean, %% Note: it's a good idea to keep the iterator structure lean,
%% since it can be stored on a remote node that could update its %% since it can be stored on a remote node that could update its
%% code independently from us. %% code independently from us.
{ok, #it{ {ok, #{
topic_filter = TopicFilter, ?tag => ?it,
start_time = StartTime, ?topic_filter => TopicFilter,
storage_key = StorageKey ?start_time => StartTime,
?storage_key => StorageKey,
?last_seen_key => <<>>
}}. }}.
next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
@ -192,16 +225,19 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
next_until(Schema, It, SafeCutoffTime, BatchSize). next_until(Schema, It, SafeCutoffTime, BatchSize).
next_until(_Schema, It, SafeCutoffTime, _BatchSize) when It#it.start_time >= SafeCutoffTime -> next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
StartTime >= SafeCutoffTime
->
%% We're in the middle of the current epoch, so we can't yet iterate over it. %% We're in the middle of the current epoch, so we can't yet iterate over it.
%% It would be unsafe otherwise: messages can be stored in the current epoch %% It would be unsafe otherwise: messages can be stored in the current epoch
%% concurrently with iterating over it. They can end up earlier (in the iteration %% concurrently with iterating over it. They can end up earlier (in the iteration
%% order) due to the nature of keymapping, potentially causing us to miss them. %% order) due to the nature of keymapping, potentially causing us to miss them.
{ok, It, []}; {ok, It, []};
next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) -> next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) ->
#it{ #{
start_time = StartTime, ?tag := ?it,
storage_key = {TopicIndex, Varying} ?start_time := StartTime,
?storage_key := {TopicIndex, Varying}
} = It, } = It,
%% Make filter: %% Make filter:
Inequations = [ Inequations = [
@ -250,7 +286,7 @@ next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
{ok, It, lists:reverse(Acc)}; {ok, It, lists:reverse(Acc)};
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
inc_counter(), inc_counter(),
#it{last_seen_key = Key0} = It0, #{?tag := ?it, ?last_seen_key := Key0} = It0,
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
overflow -> overflow ->
{ok, It0, lists:reverse(Acc0)}; {ok, It0, lists:reverse(Acc0)};
@ -268,7 +304,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
end. end.
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
It = It0#it{last_seen_key = Key}, It = It0#{?last_seen_key := Key},
case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
true -> true ->
Msg = deserialize(Val), Msg = deserialize(Val),
@ -310,7 +346,7 @@ check_message(
overflow; overflow;
check_message( check_message(
_Cutoff, _Cutoff,
#it{start_time = StartTime, topic_filter = TopicFilter}, #{?tag := ?it, ?start_time := StartTime, ?topic_filter := TopicFilter},
#message{timestamp = Timestamp, topic = Topic} #message{timestamp = Timestamp, topic = Topic}
) when Timestamp >= StartTime -> ) when Timestamp >= StartTime ->
emqx_topic:match(emqx_topic:words(Topic), TopicFilter); emqx_topic:match(emqx_topic:words(Topic), TopicFilter);

View File

@ -34,6 +34,18 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
%% records over the wire.
%% tags:
-define(stream, stream).
-define(it, it).
%% keys:
-define(tag, 1).
-define(generation, 2).
-define(enc, 3).
-type prototype() :: -type prototype() ::
{emqx_ds_storage_reference, emqx_ds_storage_reference:options()} {emqx_ds_storage_reference, emqx_ds_storage_reference:options()}
| {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}. | {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}.
@ -44,23 +56,21 @@
-type gen_id() :: 0..16#ffff. -type gen_id() :: 0..16#ffff.
%% Note: this record might be stored permanently on a remote node. %% Note: this might be stored permanently on a remote node.
-record(stream, { -opaque stream() ::
generation :: gen_id(), #{
enc :: _EncapsulatedData, ?tag := ?stream,
misc = #{} :: map() ?generation := gen_id(),
}). ?enc := term()
}.
-opaque stream() :: #stream{}. %% Note: this might be stored permanently on a remote node.
-opaque iterator() ::
%% Note: this record might be stored permanently on a remote node. #{
-record(it, { ?tag := ?it,
generation :: gen_id(), ?generation := gen_id(),
enc :: _EncapsulatedData, ?enc := term()
misc = #{} :: map() }.
}).
-opaque iterator() :: #it{}.
%%%% Generation: %%%% Generation:
@ -154,9 +164,10 @@ get_streams(Shard, TopicFilter, StartTime) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId), #{module := Mod, data := GenData} = generation_get(Shard, GenId),
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
[ [
{GenId, #stream{ {GenId, #{
generation = GenId, ?tag => ?stream,
enc = Stream ?generation => GenId,
?enc => Stream
}} }}
|| Stream <- Streams || Stream <- Streams
] ]
@ -166,13 +177,16 @@ get_streams(Shard, TopicFilter, StartTime) ->
-spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, StartTime) -> make_iterator(
Shard, #{?tag := ?stream, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId), #{module := Mod, data := GenData} = generation_get(Shard, GenId),
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #it{ {ok, #{
generation = GenId, ?tag => ?it,
enc = Iter ?generation => GenId,
?enc => Iter
}}; }};
{error, _} = Err -> {error, _} = Err ->
Err Err
@ -180,7 +194,7 @@ make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, Sta
-spec next(shard_id(), iterator(), pos_integer()) -> -spec next(shard_id(), iterator(), pos_integer()) ->
emqx_ds:next_result(iterator()). emqx_ds:next_result(iterator()).
next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) -> next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId), #{module := Mod, data := GenData} = generation_get(Shard, GenId),
Current = generation_current(Shard), Current = generation_current(Shard),
case Mod:next(Shard, GenData, GenIter0, BatchSize) of case Mod:next(Shard, GenData, GenIter0, BatchSize) of
@ -190,7 +204,7 @@ next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) ->
%% the stream has been fully replayed. %% the stream has been fully replayed.
{ok, end_of_stream}; {ok, end_of_stream};
{ok, GenIter, Batch} -> {ok, GenIter, Batch} ->
{ok, Iter#it{enc = GenIter}, Batch}; {ok, Iter#{?enc := GenIter}, Batch};
Error = {error, _} -> Error = {error, _} ->
Error Error
end. end.

View File

@ -41,7 +41,7 @@ drop_shard(Node, Shard) ->
-spec get_streams( -spec get_streams(
node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
) -> ) ->
[{integer(), emqx_ds_replication_layer:stream()}]. [{integer(), emqx_ds_storage_layer:stream()}].
get_streams(Node, Shard, TopicFilter, Time) -> get_streams(Node, Shard, TopicFilter, Time) ->
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
@ -52,7 +52,7 @@ get_streams(Node, Shard, TopicFilter, Time) ->
emqx_ds:topic_filter(), emqx_ds:topic_filter(),
emqx_ds:time() emqx_ds:time()
) -> ) ->
{ok, emqx_ds_replication_layer:iterator()} | {error, _}. {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [ erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
Shard, Stream, TopicFilter, StartTime Shard, Stream, TopicFilter, StartTime