From 088de9476cd267f2d611ed33cbe1dfb628d8c96d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 10 Nov 2023 10:01:41 -0300 Subject: [PATCH] fix(ds): use integer for tag values Follow up to https://github.com/emqx/emqx/pull/11906#discussion_r1389115973 --- .../src/emqx_ds_replication_layer.erl | 16 +++++++------- .../src/emqx_ds_storage_bitfield_lts.erl | 22 +++++++++---------- .../src/emqx_ds_storage_layer.erl | 16 +++++++------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 54a946436..a06af104d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -50,8 +50,8 @@ %% records over the wire. %% tags: --define(stream, stream). --define(it, it). +-define(STREAM, 1). +-define(IT, 2). %% keys: -define(tag, 1). @@ -68,14 +68,14 @@ %% account. -opaque stream() :: #{ - ?tag := ?stream, + ?tag := ?STREAM, ?shard := emqx_ds_replication_layer:shard_id(), ?enc := emqx_ds_storage_layer:stream() }. -opaque iterator() :: #{ - ?tag := ?it, + ?tag := ?IT, ?shard := emqx_ds_replication_layer:shard_id(), ?enc := emqx_ds_storage_layer:iterator() }. @@ -133,7 +133,7 @@ get_streams(DB, TopicFilter, StartTime) -> RankX = Shard, Rank = {RankX, RankY}, {Rank, #{ - ?tag => ?stream, + ?tag => ?STREAM, ?shard => Shard, ?enc => Stream }} @@ -147,18 +147,18 @@ get_streams(DB, TopicFilter, StartTime) -> -spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). make_iterator(DB, Stream, TopicFilter, StartTime) -> - #{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream, + #{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream, Node = node_of_shard(DB, Shard), case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> - {ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}}; + {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> Err end. -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(DB, Iter0, BatchSize) -> - #{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0, + #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, Node = node_of_shard(DB, Shard), %% TODO: iterator can contain information that is useful for %% reconstructing messages sent over the network. For example, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 50b6af5b6..2d4949919 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -42,8 +42,8 @@ %% records over the wire. %% tags: --define(stream, stream). --define(it, it). +-define(STREAM, 1). +-define(IT, 2). %% keys: -define(tag, 1). @@ -81,13 +81,13 @@ -type stream() :: #{ - ?tag := ?stream, + ?tag := ?STREAM, ?storage_key := emqx_ds_lts:msg_storage_key() }. -type iterator() :: #{ - ?tag := ?it, + ?tag := ?IT, ?topic_filter := emqx_ds:topic_filter(), ?start_time := emqx_ds:time(), ?storage_key := emqx_ds_lts:msg_storage_key(), @@ -194,7 +194,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> ) -> [stream()]. get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter), - [#{?tag => ?stream, ?storage_key => I} || I <- Indexes]. + [#{?tag => ?STREAM, ?storage_key => I} || I <- Indexes]. -spec make_iterator( emqx_ds_storage_layer:shard_id(), @@ -204,13 +204,13 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> emqx_ds:time() ) -> {ok, iterator()}. make_iterator( - _Shard, _Data, #{?tag := ?stream, ?storage_key := StorageKey}, TopicFilter, StartTime + _Shard, _Data, #{?tag := ?STREAM, ?storage_key := StorageKey}, TopicFilter, StartTime ) -> %% Note: it's a good idea to keep the iterator structure lean, %% since it can be stored on a remote node that could update its %% code independently from us. {ok, #{ - ?tag => ?it, + ?tag => ?IT, ?topic_filter => TopicFilter, ?start_time => StartTime, ?storage_key => StorageKey, @@ -225,7 +225,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, next_until(Schema, It, SafeCutoffTime, BatchSize). -next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when +next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when StartTime >= SafeCutoffTime -> %% We're in the middle of the current epoch, so we can't yet iterate over it. @@ -235,7 +235,7 @@ next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTim {ok, It, []}; next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) -> #{ - ?tag := ?it, + ?tag := ?IT, ?start_time := StartTime, ?storage_key := {TopicIndex, Varying} } = It, @@ -286,7 +286,7 @@ next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> inc_counter(), - #{?tag := ?it, ?last_seen_key := Key0} = It0, + #{?tag := ?IT, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> {ok, It0, lists:reverse(Acc0)}; @@ -346,7 +346,7 @@ check_message( overflow; check_message( _Cutoff, - #{?tag := ?it, ?start_time := StartTime, ?topic_filter := TopicFilter}, + #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, #message{timestamp = Timestamp, topic = Topic} ) when Timestamp >= StartTime -> emqx_topic:match(emqx_topic:words(Topic), TopicFilter); diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 8c2e55510..0fe719dbc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -38,8 +38,8 @@ %% records over the wire. %% tags: --define(stream, stream). --define(it, it). +-define(STREAM, 1). +-define(IT, 2). %% keys: -define(tag, 1). @@ -59,7 +59,7 @@ %% Note: this might be stored permanently on a remote node. -opaque stream() :: #{ - ?tag := ?stream, + ?tag := ?STREAM, ?generation := gen_id(), ?enc := term() }. @@ -67,7 +67,7 @@ %% Note: this might be stored permanently on a remote node. -opaque iterator() :: #{ - ?tag := ?it, + ?tag := ?IT, ?generation := gen_id(), ?enc := term() }. @@ -165,7 +165,7 @@ get_streams(Shard, TopicFilter, StartTime) -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), [ {GenId, #{ - ?tag => ?stream, + ?tag => ?STREAM, ?generation => GenId, ?enc => Stream }} @@ -178,13 +178,13 @@ get_streams(Shard, TopicFilter, StartTime) -> -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). make_iterator( - Shard, #{?tag := ?stream, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime + Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime ) -> #{module := Mod, data := GenData} = generation_get(Shard, GenId), case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{ - ?tag => ?it, + ?tag => ?IT, ?generation => GenId, ?enc => Iter }}; @@ -194,7 +194,7 @@ make_iterator( -spec next(shard_id(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> +next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> #{module := Mod, data := GenData} = generation_get(Shard, GenId), Current = generation_current(Shard), case Mod:next(Shard, GenData, GenIter0, BatchSize) of