diff --git a/apps/emqx_replay/src/emqx_replay_conf.erl b/apps/emqx_replay/src/emqx_replay_conf.erl index b7d472918..8f7105312 100644 --- a/apps/emqx_replay/src/emqx_replay_conf.erl +++ b/apps/emqx_replay/src/emqx_replay_conf.erl @@ -33,7 +33,7 @@ zone_config(Zone) -> #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 8, 32, 16], - max_tau => 5 + epoch => 5 }, DefaultZoneConfig = application:get_env( ?APP, default_zone_config, {emqx_replay_message_storage, DefaultConf} diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index f2b45d221..484cd7ae6 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -43,13 +43,14 @@ %% | | | %% most significant topic hash least significant %% bits of timestamp bits of timestamp +%% (a.k.a epoch) (a.k.a time offset) %% %% Topic hash is level-aware: each topic level is hashed separately %% and the resulting hashes are bitwise-concatentated. This allows us %% to map topics to fixed-length bitstrings while keeping some degree %% of information about the hierarchy. %% -%% Next important concept is what we call "tau-interval". It is time +%% Next important concept is what we call "epoch". It is time %% interval determined by the number of least significant bits of the %% timestamp found at the tail of the rocksdb key. %% @@ -71,11 +72,11 @@ %% | ---->------| ---->------| ----------> %% | %% -+------------+-----------------------------> t -%% tau +%% epoch %% %% This structure allows to quickly seek to a the first message that -%% was recorded in a certain tau-interval in a certain topic or a -%% group of topics matching filter like `foo/bar/+/+' or `foo/bar/#`. +%% was recorded in a certain epoch in a certain topic or a +%% group of topics matching filter like `foo/bar/#`. %% %% Due to its structure, for each pair of rocksdb keys K1 and K2, such %% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) > @@ -176,7 +177,7 @@ -record(keymapper, { source :: [bitsource(), ...], bitsize :: bits(), - tau :: non_neg_integer() + epoch :: non_neg_integer() }). -type bitsource() :: @@ -229,14 +230,14 @@ open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> %% Number of bits in a key allocated to each level in a message topic. topic_bits_per_level := bits_per_level(), %% Maximum granularity of iteration over time. - max_tau := time() + epoch := time() }. make_keymapper(#{ timestamp_bits := TimestampBits, topic_bits_per_level := BitsPerLevel, - max_tau := MaxTau + epoch := MaxEpoch }) -> - TimestampLSBs = floor(math:log2(MaxTau)), + TimestampLSBs = floor(math:log2(MaxEpoch)), TimestampMSBs = TimestampBits - TimestampLSBs, NLevels = length(BitsPerLevel), {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel), @@ -249,7 +250,7 @@ make_keymapper(#{ #keymapper{ source = Source, bitsize = lists:sum([S || {_, _, S} <- Source]), - tau = 1 bsl TimestampLSBs + epoch = 1 bsl TimestampLSBs }. -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> @@ -540,12 +541,12 @@ make_keymapper_test_() -> {timestamp, 0, 9} ], bitsize = 46, - tau = 512 + epoch = 512 }, make_keymapper(#{ timestamp_bits => 32, topic_bits_per_level => [2, 4, 8], - max_tau => 1000 + epoch => 1000 }) ), ?_assertEqual( @@ -555,12 +556,12 @@ make_keymapper_test_() -> {hash, levels, 16} ], bitsize = 48, - tau = 1 + epoch = 1 }, make_keymapper(#{ timestamp_bits => 32, topic_bits_per_level => [16], - max_tau => 1 + epoch => 1 }) ) ]. diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index 237823014..f604f8d63 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -143,7 +143,7 @@ t_prop_topic_hash_computes(_) -> Keymapper = emqx_replay_message_storage:make_keymapper(#{ timestamp_bits => 32, topic_bits_per_level => [8, 12, 16, 24], - max_tau => 10000 + epoch => 10000 }), ?assert( proper:quickcheck( @@ -158,7 +158,7 @@ t_prop_hash_bitmask_computes(_) -> Keymapper = emqx_replay_message_storage:make_keymapper(#{ timestamp_bits => 16, topic_bits_per_level => [8, 12, 16], - max_tau => 100 + epoch => 100 }), ?assert( proper:quickcheck(