refactor(ds): message_storage -> message_storage_bitmask

This commit is contained in:
ieQu1 2023-05-17 16:32:50 +02:00
parent a4219db163
commit a343cdb1d5
6 changed files with 35 additions and 35 deletions

View File

@ -12,7 +12,7 @@
-export([default_iteration_options/0]).
-type backend_config() ::
{emqx_ds_message_storage, emqx_ds_message_storage:options()}
{emqx_ds_message_storage_bitmask, emqx_ds_message_storage_bitmask:options()}
| {module(), _Options}.
-export_type([backend_config/0]).
@ -30,23 +30,23 @@ shard_config(Shard) ->
maps:get(Shard, Shards, DefaultShardConfig).
-spec shard_iteration_options(emqx_ds:shard()) ->
emqx_ds_message_storage:iteration_options().
emqx_ds_message_storage_bitmask:iteration_options().
shard_iteration_options(Shard) ->
case shard_config(Shard) of
{emqx_ds_message_storage, Config} ->
{emqx_ds_message_storage_bitmask, Config} ->
maps:get(iteration, Config, default_iteration_options());
{_Module, _} ->
default_iteration_options()
end.
-spec default_iteration_options() -> emqx_ds_message_storage:iteration_options().
-spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options().
default_iteration_options() ->
{emqx_ds_message_storage, Config} = default_shard_config(),
{emqx_ds_message_storage_bitmask, Config} = default_shard_config(),
maps:get(iteration, Config).
-spec default_shard_config() -> backend_config().
default_shard_config() ->
{emqx_ds_message_storage, #{
{emqx_ds_message_storage_bitmask, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 8, 32, 16],
epoch => 5,

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_message_storage).
-module(emqx_ds_message_storage_bitmask).
%%================================================================================
%% @doc Description of the schema

View File

@ -1,14 +1,14 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_message_storage_SUITE).
-module(emqx_ds_message_storage_bitmask_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("stdlib/include/assert.hrl").
-import(emqx_ds_message_storage, [
-import(emqx_ds_message_storage_bitmask, [
make_keymapper/1,
keymapper_info/1,
compute_topic_bitmask/2,

View File

@ -12,7 +12,7 @@
-define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG,
{emqx_ds_message_storage, #{
{emqx_ds_message_storage_bitmask, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5,
@ -23,7 +23,7 @@
).
-define(COMPACT_CONFIG,
{emqx_ds_message_storage, #{
{emqx_ds_message_storage_bitmask, #{
timestamp_bits => 16,
topic_bits_per_level => [16, 16],
epoch => 10

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_message_storage_shim).
-module(emqx_ds_message_storage_bitmask_shim).
-export([open/0]).
-export([close/1]).

View File

@ -22,7 +22,7 @@ prop_bitstring_computes() ->
Keymapper,
keymapper(),
?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
BS = emqx_ds_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
BS = emqx_ds_message_storage_bitmask:compute_bitstring(Topic, Timestamp, Keymapper),
is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper)))
end)
).
@ -30,7 +30,7 @@ prop_bitstring_computes() ->
prop_topic_bitmask_computes() ->
Keymapper = make_keymapper(16, [8, 12, 16], 100),
?FORALL(TopicFilter, topic_filter(), begin
Mask = emqx_ds_message_storage:compute_topic_bitmask(TopicFilter, Keymapper),
Mask = emqx_ds_message_storage_bitmask:compute_topic_bitmask(TopicFilter, Keymapper),
% topic bits + timestamp LSBs
is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
end).
@ -40,14 +40,14 @@ prop_next_seek_monotonic() ->
{TopicFilter, StartTime, Keymapper},
{topic_filter(), pos_integer(), keymapper()},
begin
Filter = emqx_ds_message_storage:make_keyspace_filter(
Filter = emqx_ds_message_storage_bitmask:make_keyspace_filter(
{TopicFilter, StartTime},
Keymapper
),
?FORALL(
Bitstring,
bitstr(get_keymapper_bitsize(Keymapper)),
emqx_ds_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring
emqx_ds_message_storage_bitmask:compute_next_seek(Bitstring, Filter) >= Bitstring
)
end
).
@ -56,8 +56,8 @@ prop_next_seek_eq_initial_seek() ->
?FORALL(
Filter,
keyspace_filter(),
emqx_ds_message_storage:compute_initial_seek(Filter) =:=
emqx_ds_message_storage:compute_next_seek(0, Filter)
emqx_ds_message_storage_bitmask:compute_initial_seek(Filter) =:=
emqx_ds_message_storage_bitmask:compute_next_seek(0, Filter)
).
prop_iterate_messages() ->
@ -72,7 +72,7 @@ prop_iterate_messages() ->
?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin
Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)),
{DB, Handle} = open_db(Filepath, Options),
Shim = emqx_ds_message_storage_shim:open(),
Shim = emqx_ds_message_storage_bitmask_shim:open(),
ok = store_db(DB, Stream),
ok = store_shim(Shim, Stream),
?FORALL(
@ -92,7 +92,7 @@ prop_iterate_messages() ->
Messages = iterate_db(DB, Iteration),
Reference = iterate_shim(Shim, Iteration),
ok = close_db(Handle),
ok = emqx_ds_message_storage_shim:close(Shim),
ok = emqx_ds_message_storage_bitmask_shim:close(Shim),
?WHENFAIL(
begin
io:format(user, " *** Filepath = ~s~n", [Filepath]),
@ -182,7 +182,7 @@ prop_iterate_eq_iterate_with_refresh() ->
% PublishedAt = ChunkNum,
% MessageID, PublishedAt, Topic
% ]),
% ok = emqx_ds_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload),
% ok = emqx_ds_message_storage_bitmask:store(DB, MessageID, PublishedAt, Topic, Payload),
% store_message_stream(DB, payload_gen:next(Rest));
% store_message_stream(_Zone, []) ->
% ok.
@ -191,7 +191,7 @@ store_db(DB, Messages) ->
lists:foreach(
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
Bin = term_to_binary(Payload),
emqx_ds_message_storage:store(DB, MessageID, Timestamp, Topic, Bin)
emqx_ds_message_storage_bitmask:store(DB, MessageID, Timestamp, Topic, Bin)
end,
Messages
).
@ -200,7 +200,7 @@ iterate_db(DB, Iteration) ->
iterate_db(make_iterator(DB, Iteration)).
iterate_db(It) ->
case emqx_ds_message_storage:next(It) of
case emqx_ds_message_storage_bitmask:next(It) of
{value, Payload, ItNext} ->
[binary_to_term(Payload) | iterate_db(ItNext)];
none ->
@ -208,15 +208,15 @@ iterate_db(It) ->
end.
make_iterator(DB, Replay) ->
{ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay),
{ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay),
It.
make_iterator(DB, Replay, Options) ->
{ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay, Options),
{ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay, Options),
It.
run_iterator_commands([iterate | Rest], It, Ctx) ->
case emqx_ds_message_storage:next(It) of
case emqx_ds_message_storage_bitmask:next(It) of
{value, Payload, ItNext} ->
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)];
none ->
@ -227,8 +227,8 @@ run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
db := DB,
replay := Replay
} = Ctx,
Serial = emqx_ds_message_storage:preserve_iterator(It),
{ok, ItNext} = emqx_ds_message_storage:restore_iterator(DB, Replay, Serial),
Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It),
{ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Replay, Serial),
run_iterator_commands(Rest, ItNext, Ctx);
run_iterator_commands([], It, _Ctx) ->
iterate_db(It).
@ -237,7 +237,7 @@ store_shim(Shim, Messages) ->
lists:foreach(
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
Bin = term_to_binary(Payload),
emqx_ds_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
emqx_ds_message_storage_bitmask_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
end,
Messages
).
@ -245,7 +245,7 @@ store_shim(Shim, Messages) ->
iterate_shim(Shim, Iteration) ->
lists:map(
fun binary_to_term/1,
emqx_ds_message_storage_shim:iterate(Shim, Iteration)
emqx_ds_message_storage_bitmask_shim:iterate(Shim, Iteration)
).
%%--------------------------------------------------------------------
@ -254,8 +254,8 @@ iterate_shim(Shim, Iteration) ->
open_db(Filepath, Options) ->
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
{Schema, CFRefs} = emqx_ds_message_storage:create_new(Handle, ?GEN_ID, Options),
DB = emqx_ds_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
{Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options),
DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
{DB, Handle}.
close_db(Handle) ->
@ -379,7 +379,7 @@ keyspace_filter() ->
?LET(
{TopicFilter, StartTime, Keymapper},
{topic_filter(), pos_integer(), keymapper()},
emqx_ds_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
emqx_ds_message_storage_bitmask:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
).
messages(Topic) ->
@ -426,14 +426,14 @@ flat(T) ->
%%--------------------------------------------------------------------
make_keymapper(TimestampBits, TopicBits, MaxEpoch) ->
emqx_ds_message_storage:make_keymapper(#{
emqx_ds_message_storage_bitmask:make_keymapper(#{
timestamp_bits => TimestampBits,
topic_bits_per_level => TopicBits,
epoch => MaxEpoch
}).
get_keymapper_bitsize(Keymapper) ->
maps:get(bitsize, emqx_ds_message_storage:keymapper_info(Keymapper)).
maps:get(bitsize, emqx_ds_message_storage_bitmask:keymapper_info(Keymapper)).
-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}).
interleave(Seqs, Rng) ->