refactor(ds): tau -> epoch
This commit is contained in:
parent
120d4e66ae
commit
adcbf40d27
|
@ -33,7 +33,7 @@ zone_config(Zone) ->
|
||||||
#{
|
#{
|
||||||
timestamp_bits => 64,
|
timestamp_bits => 64,
|
||||||
topic_bits_per_level => [8, 8, 8, 32, 16],
|
topic_bits_per_level => [8, 8, 8, 32, 16],
|
||||||
max_tau => 5
|
epoch => 5
|
||||||
},
|
},
|
||||||
DefaultZoneConfig = application:get_env(
|
DefaultZoneConfig = application:get_env(
|
||||||
?APP, default_zone_config, {emqx_replay_message_storage, DefaultConf}
|
?APP, default_zone_config, {emqx_replay_message_storage, DefaultConf}
|
||||||
|
|
|
@ -43,13 +43,14 @@
|
||||||
%% | | |
|
%% | | |
|
||||||
%% most significant topic hash least significant
|
%% most significant topic hash least significant
|
||||||
%% bits of timestamp bits of timestamp
|
%% bits of timestamp bits of timestamp
|
||||||
|
%% (a.k.a epoch) (a.k.a time offset)
|
||||||
%%
|
%%
|
||||||
%% Topic hash is level-aware: each topic level is hashed separately
|
%% Topic hash is level-aware: each topic level is hashed separately
|
||||||
%% and the resulting hashes are bitwise-concatentated. This allows us
|
%% and the resulting hashes are bitwise-concatentated. This allows us
|
||||||
%% to map topics to fixed-length bitstrings while keeping some degree
|
%% to map topics to fixed-length bitstrings while keeping some degree
|
||||||
%% of information about the hierarchy.
|
%% of information about the hierarchy.
|
||||||
%%
|
%%
|
||||||
%% Next important concept is what we call "tau-interval". It is time
|
%% Next important concept is what we call "epoch". It is time
|
||||||
%% interval determined by the number of least significant bits of the
|
%% interval determined by the number of least significant bits of the
|
||||||
%% timestamp found at the tail of the rocksdb key.
|
%% timestamp found at the tail of the rocksdb key.
|
||||||
%%
|
%%
|
||||||
|
@ -71,11 +72,11 @@
|
||||||
%% | ---->------| ---->------| ---------->
|
%% | ---->------| ---->------| ---------->
|
||||||
%% |
|
%% |
|
||||||
%% -+------------+-----------------------------> t
|
%% -+------------+-----------------------------> t
|
||||||
%% tau
|
%% epoch
|
||||||
%%
|
%%
|
||||||
%% This structure allows to quickly seek to a the first message that
|
%% This structure allows to quickly seek to a the first message that
|
||||||
%% was recorded in a certain tau-interval in a certain topic or a
|
%% was recorded in a certain epoch in a certain topic or a
|
||||||
%% group of topics matching filter like `foo/bar/+/+' or `foo/bar/#`.
|
%% group of topics matching filter like `foo/bar/#`.
|
||||||
%%
|
%%
|
||||||
%% Due to its structure, for each pair of rocksdb keys K1 and K2, such
|
%% Due to its structure, for each pair of rocksdb keys K1 and K2, such
|
||||||
%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) >
|
%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) >
|
||||||
|
@ -176,7 +177,7 @@
|
||||||
-record(keymapper, {
|
-record(keymapper, {
|
||||||
source :: [bitsource(), ...],
|
source :: [bitsource(), ...],
|
||||||
bitsize :: bits(),
|
bitsize :: bits(),
|
||||||
tau :: non_neg_integer()
|
epoch :: non_neg_integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type bitsource() ::
|
-type bitsource() ::
|
||||||
|
@ -229,14 +230,14 @@ open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
||||||
%% Number of bits in a key allocated to each level in a message topic.
|
%% Number of bits in a key allocated to each level in a message topic.
|
||||||
topic_bits_per_level := bits_per_level(),
|
topic_bits_per_level := bits_per_level(),
|
||||||
%% Maximum granularity of iteration over time.
|
%% Maximum granularity of iteration over time.
|
||||||
max_tau := time()
|
epoch := time()
|
||||||
}.
|
}.
|
||||||
make_keymapper(#{
|
make_keymapper(#{
|
||||||
timestamp_bits := TimestampBits,
|
timestamp_bits := TimestampBits,
|
||||||
topic_bits_per_level := BitsPerLevel,
|
topic_bits_per_level := BitsPerLevel,
|
||||||
max_tau := MaxTau
|
epoch := MaxEpoch
|
||||||
}) ->
|
}) ->
|
||||||
TimestampLSBs = floor(math:log2(MaxTau)),
|
TimestampLSBs = floor(math:log2(MaxEpoch)),
|
||||||
TimestampMSBs = TimestampBits - TimestampLSBs,
|
TimestampMSBs = TimestampBits - TimestampLSBs,
|
||||||
NLevels = length(BitsPerLevel),
|
NLevels = length(BitsPerLevel),
|
||||||
{LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
|
{LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
|
||||||
|
@ -249,7 +250,7 @@ make_keymapper(#{
|
||||||
#keymapper{
|
#keymapper{
|
||||||
source = Source,
|
source = Source,
|
||||||
bitsize = lists:sum([S || {_, _, S} <- Source]),
|
bitsize = lists:sum([S || {_, _, S} <- Source]),
|
||||||
tau = 1 bsl TimestampLSBs
|
epoch = 1 bsl TimestampLSBs
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
|
-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
|
||||||
|
@ -540,12 +541,12 @@ make_keymapper_test_() ->
|
||||||
{timestamp, 0, 9}
|
{timestamp, 0, 9}
|
||||||
],
|
],
|
||||||
bitsize = 46,
|
bitsize = 46,
|
||||||
tau = 512
|
epoch = 512
|
||||||
},
|
},
|
||||||
make_keymapper(#{
|
make_keymapper(#{
|
||||||
timestamp_bits => 32,
|
timestamp_bits => 32,
|
||||||
topic_bits_per_level => [2, 4, 8],
|
topic_bits_per_level => [2, 4, 8],
|
||||||
max_tau => 1000
|
epoch => 1000
|
||||||
})
|
})
|
||||||
),
|
),
|
||||||
?_assertEqual(
|
?_assertEqual(
|
||||||
|
@ -555,12 +556,12 @@ make_keymapper_test_() ->
|
||||||
{hash, levels, 16}
|
{hash, levels, 16}
|
||||||
],
|
],
|
||||||
bitsize = 48,
|
bitsize = 48,
|
||||||
tau = 1
|
epoch = 1
|
||||||
},
|
},
|
||||||
make_keymapper(#{
|
make_keymapper(#{
|
||||||
timestamp_bits => 32,
|
timestamp_bits => 32,
|
||||||
topic_bits_per_level => [16],
|
topic_bits_per_level => [16],
|
||||||
max_tau => 1
|
epoch => 1
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
].
|
].
|
||||||
|
|
|
@ -143,7 +143,7 @@ t_prop_topic_hash_computes(_) ->
|
||||||
Keymapper = emqx_replay_message_storage:make_keymapper(#{
|
Keymapper = emqx_replay_message_storage:make_keymapper(#{
|
||||||
timestamp_bits => 32,
|
timestamp_bits => 32,
|
||||||
topic_bits_per_level => [8, 12, 16, 24],
|
topic_bits_per_level => [8, 12, 16, 24],
|
||||||
max_tau => 10000
|
epoch => 10000
|
||||||
}),
|
}),
|
||||||
?assert(
|
?assert(
|
||||||
proper:quickcheck(
|
proper:quickcheck(
|
||||||
|
@ -158,7 +158,7 @@ t_prop_hash_bitmask_computes(_) ->
|
||||||
Keymapper = emqx_replay_message_storage:make_keymapper(#{
|
Keymapper = emqx_replay_message_storage:make_keymapper(#{
|
||||||
timestamp_bits => 16,
|
timestamp_bits => 16,
|
||||||
topic_bits_per_level => [8, 12, 16],
|
topic_bits_per_level => [8, 12, 16],
|
||||||
max_tau => 100
|
epoch => 100
|
||||||
}),
|
}),
|
||||||
?assert(
|
?assert(
|
||||||
proper:quickcheck(
|
proper:quickcheck(
|
||||||
|
|
Loading…
Reference in New Issue