Merge pull request #11927 from thalesmg/ds-int-tags-m-20231110

fix(ds): use integer for tag values
This commit is contained in:
Thales Macedo Garitezi 2023-11-10 13:25:33 -03:00 committed by GitHub
commit 3537d688bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 27 deletions

View File

@ -50,8 +50,8 @@
%% records over the wire. %% records over the wire.
%% tags: %% tags:
-define(stream, stream). -define(STREAM, 1).
-define(it, it). -define(IT, 2).
%% keys: %% keys:
-define(tag, 1). -define(tag, 1).
@ -68,14 +68,14 @@
%% account. %% account.
-opaque stream() :: -opaque stream() ::
#{ #{
?tag := ?stream, ?tag := ?STREAM,
?shard := emqx_ds_replication_layer:shard_id(), ?shard := emqx_ds_replication_layer:shard_id(),
?enc := emqx_ds_storage_layer:stream() ?enc := emqx_ds_storage_layer:stream()
}. }.
-opaque iterator() :: -opaque iterator() ::
#{ #{
?tag := ?it, ?tag := ?IT,
?shard := emqx_ds_replication_layer:shard_id(), ?shard := emqx_ds_replication_layer:shard_id(),
?enc := emqx_ds_storage_layer:iterator() ?enc := emqx_ds_storage_layer:iterator()
}. }.
@ -133,7 +133,7 @@ get_streams(DB, TopicFilter, StartTime) ->
RankX = Shard, RankX = Shard,
Rank = {RankX, RankY}, Rank = {RankX, RankY},
{Rank, #{ {Rank, #{
?tag => ?stream, ?tag => ?STREAM,
?shard => Shard, ?shard => Shard,
?enc => Stream ?enc => Stream
}} }}
@ -147,18 +147,18 @@ get_streams(DB, TopicFilter, StartTime) ->
-spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator(DB, Stream, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) ->
#{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream, #{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
Node = node_of_shard(DB, Shard), Node = node_of_shard(DB, Shard),
case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} -> Err = {error, _} ->
Err Err
end. end.
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(DB, Iter0, BatchSize) -> next(DB, Iter0, BatchSize) ->
#{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0, #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
Node = node_of_shard(DB, Shard), Node = node_of_shard(DB, Shard),
%% TODO: iterator can contain information that is useful for %% TODO: iterator can contain information that is useful for
%% reconstructing messages sent over the network. For example, %% reconstructing messages sent over the network. For example,

View File

@ -42,8 +42,8 @@
%% records over the wire. %% records over the wire.
%% tags: %% tags:
-define(stream, stream). -define(STREAM, 1).
-define(it, it). -define(IT, 2).
%% keys: %% keys:
-define(tag, 1). -define(tag, 1).
@ -81,13 +81,13 @@
-type stream() :: -type stream() ::
#{ #{
?tag := ?stream, ?tag := ?STREAM,
?storage_key := emqx_ds_lts:msg_storage_key() ?storage_key := emqx_ds_lts:msg_storage_key()
}. }.
-type iterator() :: -type iterator() ::
#{ #{
?tag := ?it, ?tag := ?IT,
?topic_filter := emqx_ds:topic_filter(), ?topic_filter := emqx_ds:topic_filter(),
?start_time := emqx_ds:time(), ?start_time := emqx_ds:time(),
?storage_key := emqx_ds_lts:msg_storage_key(), ?storage_key := emqx_ds_lts:msg_storage_key(),
@ -194,7 +194,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
) -> [stream()]. ) -> [stream()].
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter), Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
[#{?tag => ?stream, ?storage_key => I} || I <- Indexes]. [#{?tag => ?STREAM, ?storage_key => I} || I <- Indexes].
-spec make_iterator( -spec make_iterator(
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),
@ -204,13 +204,13 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
emqx_ds:time() emqx_ds:time()
) -> {ok, iterator()}. ) -> {ok, iterator()}.
make_iterator( make_iterator(
_Shard, _Data, #{?tag := ?stream, ?storage_key := StorageKey}, TopicFilter, StartTime _Shard, _Data, #{?tag := ?STREAM, ?storage_key := StorageKey}, TopicFilter, StartTime
) -> ) ->
%% Note: it's a good idea to keep the iterator structure lean, %% Note: it's a good idea to keep the iterator structure lean,
%% since it can be stored on a remote node that could update its %% since it can be stored on a remote node that could update its
%% code independently from us. %% code independently from us.
{ok, #{ {ok, #{
?tag => ?it, ?tag => ?IT,
?topic_filter => TopicFilter, ?topic_filter => TopicFilter,
?start_time => StartTime, ?start_time => StartTime,
?storage_key => StorageKey, ?storage_key => StorageKey,
@ -225,7 +225,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
next_until(Schema, It, SafeCutoffTime, BatchSize). next_until(Schema, It, SafeCutoffTime, BatchSize).
next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
StartTime >= SafeCutoffTime StartTime >= SafeCutoffTime
-> ->
%% We're in the middle of the current epoch, so we can't yet iterate over it. %% We're in the middle of the current epoch, so we can't yet iterate over it.
@ -235,7 +235,7 @@ next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTim
{ok, It, []}; {ok, It, []};
next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) -> next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) ->
#{ #{
?tag := ?it, ?tag := ?IT,
?start_time := StartTime, ?start_time := StartTime,
?storage_key := {TopicIndex, Varying} ?storage_key := {TopicIndex, Varying}
} = It, } = It,
@ -286,7 +286,7 @@ next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
{ok, It, lists:reverse(Acc)}; {ok, It, lists:reverse(Acc)};
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
inc_counter(), inc_counter(),
#{?tag := ?it, ?last_seen_key := Key0} = It0, #{?tag := ?IT, ?last_seen_key := Key0} = It0,
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
overflow -> overflow ->
{ok, It0, lists:reverse(Acc0)}; {ok, It0, lists:reverse(Acc0)};
@ -346,7 +346,7 @@ check_message(
overflow; overflow;
check_message( check_message(
_Cutoff, _Cutoff,
#{?tag := ?it, ?start_time := StartTime, ?topic_filter := TopicFilter}, #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
#message{timestamp = Timestamp, topic = Topic} #message{timestamp = Timestamp, topic = Topic}
) when Timestamp >= StartTime -> ) when Timestamp >= StartTime ->
emqx_topic:match(emqx_topic:words(Topic), TopicFilter); emqx_topic:match(emqx_topic:words(Topic), TopicFilter);

View File

@ -38,8 +38,8 @@
%% records over the wire. %% records over the wire.
%% tags: %% tags:
-define(stream, stream). -define(STREAM, 1).
-define(it, it). -define(IT, 2).
%% keys: %% keys:
-define(tag, 1). -define(tag, 1).
@ -59,7 +59,7 @@
%% Note: this might be stored permanently on a remote node. %% Note: this might be stored permanently on a remote node.
-opaque stream() :: -opaque stream() ::
#{ #{
?tag := ?stream, ?tag := ?STREAM,
?generation := gen_id(), ?generation := gen_id(),
?enc := term() ?enc := term()
}. }.
@ -67,7 +67,7 @@
%% Note: this might be stored permanently on a remote node. %% Note: this might be stored permanently on a remote node.
-opaque iterator() :: -opaque iterator() ::
#{ #{
?tag := ?it, ?tag := ?IT,
?generation := gen_id(), ?generation := gen_id(),
?enc := term() ?enc := term()
}. }.
@ -165,7 +165,7 @@ get_streams(Shard, TopicFilter, StartTime) ->
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
[ [
{GenId, #{ {GenId, #{
?tag => ?stream, ?tag => ?STREAM,
?generation => GenId, ?generation => GenId,
?enc => Stream ?enc => Stream
}} }}
@ -178,13 +178,13 @@ get_streams(Shard, TopicFilter, StartTime) ->
-spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator( make_iterator(
Shard, #{?tag := ?stream, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
) -> ) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId), #{module := Mod, data := GenData} = generation_get(Shard, GenId),
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{ {ok, #{
?tag => ?it, ?tag => ?IT,
?generation => GenId, ?generation => GenId,
?enc => Iter ?enc => Iter
}}; }};
@ -194,7 +194,7 @@ make_iterator(
-spec next(shard_id(), iterator(), pos_integer()) -> -spec next(shard_id(), iterator(), pos_integer()) ->
emqx_ds:next_result(iterator()). emqx_ds:next_result(iterator()).
next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId), #{module := Mod, data := GenData} = generation_get(Shard, GenId),
Current = generation_current(Shard), Current = generation_current(Shard),
case Mod:next(Shard, GenData, GenIter0, BatchSize) of case Mod:next(Shard, GenData, GenIter0, BatchSize) of