diff --git a/apps/emqx_durable_storage/src/emqx_ds_conf.erl b/apps/emqx_durable_storage/src/emqx_ds_conf.erl index e748c359e..db8b14b45 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_conf.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_conf.erl @@ -12,7 +12,7 @@ -export([default_iteration_options/0]). -type backend_config() :: - {emqx_ds_message_storage, emqx_ds_message_storage:options()} + {emqx_ds_message_storage_bitmask, emqx_ds_message_storage_bitmask:options()} | {module(), _Options}. -export_type([backend_config/0]). @@ -30,23 +30,23 @@ shard_config(Shard) -> maps:get(Shard, Shards, DefaultShardConfig). -spec shard_iteration_options(emqx_ds:shard()) -> - emqx_ds_message_storage:iteration_options(). + emqx_ds_message_storage_bitmask:iteration_options(). shard_iteration_options(Shard) -> case shard_config(Shard) of - {emqx_ds_message_storage, Config} -> + {emqx_ds_message_storage_bitmask, Config} -> maps:get(iteration, Config, default_iteration_options()); {_Module, _} -> default_iteration_options() end. --spec default_iteration_options() -> emqx_ds_message_storage:iteration_options(). +-spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options(). default_iteration_options() -> - {emqx_ds_message_storage, Config} = default_shard_config(), + {emqx_ds_message_storage_bitmask, Config} = default_shard_config(), maps:get(iteration, Config). -spec default_shard_config() -> backend_config(). default_shard_config() -> - {emqx_ds_message_storage, #{ + {emqx_ds_message_storage_bitmask, #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 8, 32, 16], epoch => 5, diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl similarity index 99% rename from apps/emqx_durable_storage/src/emqx_ds_message_storage.erl rename to apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 9ebb23726..7adcb8566 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ds_message_storage). +-module(emqx_ds_message_storage_bitmask). %%================================================================================ %% @doc Description of the schema diff --git a/apps/emqx_durable_storage/test/emqx_ds_message_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl similarity index 98% rename from apps/emqx_durable_storage/test/emqx_ds_message_storage_SUITE.erl rename to apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl index cbffcc4a1..599bd6c7b 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_message_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl @@ -1,14 +1,14 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ds_message_storage_SUITE). +-module(emqx_ds_message_storage_bitmask_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("stdlib/include/assert.hrl"). --import(emqx_ds_message_storage, [ +-import(emqx_ds_message_storage_bitmask, [ make_keymapper/1, keymapper_info/1, compute_topic_bitmask/2, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index 9fd93ecfe..054964373 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -12,7 +12,7 @@ -define(SHARD, shard(?FUNCTION_NAME)). -define(DEFAULT_CONFIG, - {emqx_ds_message_storage, #{ + {emqx_ds_message_storage_bitmask, #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 32, 16], epoch => 5, @@ -23,7 +23,7 @@ ). -define(COMPACT_CONFIG, - {emqx_ds_message_storage, #{ + {emqx_ds_message_storage_bitmask, #{ timestamp_bits => 16, topic_bits_per_level => [16, 16], epoch => 10 diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl similarity index 96% rename from apps/emqx_durable_storage/test/props/emqx_ds_message_storage_shim.erl rename to apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index 7f6cf8e64..59668ca01 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ds_message_storage_shim). +-module(emqx_ds_message_storage_bitmask_shim). -export([open/0]). -export([close/1]). diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index 08ae5d21d..7452906b8 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -22,7 +22,7 @@ prop_bitstring_computes() -> Keymapper, keymapper(), ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin - BS = emqx_ds_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), + BS = emqx_ds_message_storage_bitmask:compute_bitstring(Topic, Timestamp, Keymapper), is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper))) end) ). @@ -30,7 +30,7 @@ prop_bitstring_computes() -> prop_topic_bitmask_computes() -> Keymapper = make_keymapper(16, [8, 12, 16], 100), ?FORALL(TopicFilter, topic_filter(), begin - Mask = emqx_ds_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), + Mask = emqx_ds_message_storage_bitmask:compute_topic_bitmask(TopicFilter, Keymapper), % topic bits + timestamp LSBs is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) end). @@ -40,14 +40,14 @@ prop_next_seek_monotonic() -> {TopicFilter, StartTime, Keymapper}, {topic_filter(), pos_integer(), keymapper()}, begin - Filter = emqx_ds_message_storage:make_keyspace_filter( + Filter = emqx_ds_message_storage_bitmask:make_keyspace_filter( {TopicFilter, StartTime}, Keymapper ), ?FORALL( Bitstring, bitstr(get_keymapper_bitsize(Keymapper)), - emqx_ds_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring + emqx_ds_message_storage_bitmask:compute_next_seek(Bitstring, Filter) >= Bitstring ) end ). @@ -56,8 +56,8 @@ prop_next_seek_eq_initial_seek() -> ?FORALL( Filter, keyspace_filter(), - emqx_ds_message_storage:compute_initial_seek(Filter) =:= - emqx_ds_message_storage:compute_next_seek(0, Filter) + emqx_ds_message_storage_bitmask:compute_initial_seek(Filter) =:= + emqx_ds_message_storage_bitmask:compute_next_seek(0, Filter) ). prop_iterate_messages() -> @@ -72,7 +72,7 @@ prop_iterate_messages() -> ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), {DB, Handle} = open_db(Filepath, Options), - Shim = emqx_ds_message_storage_shim:open(), + Shim = emqx_ds_message_storage_bitmask_shim:open(), ok = store_db(DB, Stream), ok = store_shim(Shim, Stream), ?FORALL( @@ -92,7 +92,7 @@ prop_iterate_messages() -> Messages = iterate_db(DB, Iteration), Reference = iterate_shim(Shim, Iteration), ok = close_db(Handle), - ok = emqx_ds_message_storage_shim:close(Shim), + ok = emqx_ds_message_storage_bitmask_shim:close(Shim), ?WHENFAIL( begin io:format(user, " *** Filepath = ~s~n", [Filepath]), @@ -182,7 +182,7 @@ prop_iterate_eq_iterate_with_refresh() -> % PublishedAt = ChunkNum, % MessageID, PublishedAt, Topic % ]), -% ok = emqx_ds_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), +% ok = emqx_ds_message_storage_bitmask:store(DB, MessageID, PublishedAt, Topic, Payload), % store_message_stream(DB, payload_gen:next(Rest)); % store_message_stream(_Zone, []) -> % ok. @@ -191,7 +191,7 @@ store_db(DB, Messages) -> lists:foreach( fun({Topic, Payload = {MessageID, Timestamp, _}}) -> Bin = term_to_binary(Payload), - emqx_ds_message_storage:store(DB, MessageID, Timestamp, Topic, Bin) + emqx_ds_message_storage_bitmask:store(DB, MessageID, Timestamp, Topic, Bin) end, Messages ). @@ -200,7 +200,7 @@ iterate_db(DB, Iteration) -> iterate_db(make_iterator(DB, Iteration)). iterate_db(It) -> - case emqx_ds_message_storage:next(It) of + case emqx_ds_message_storage_bitmask:next(It) of {value, Payload, ItNext} -> [binary_to_term(Payload) | iterate_db(ItNext)]; none -> @@ -208,15 +208,15 @@ iterate_db(It) -> end. make_iterator(DB, Replay) -> - {ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay), + {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay), It. make_iterator(DB, Replay, Options) -> - {ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay, Options), + {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay, Options), It. run_iterator_commands([iterate | Rest], It, Ctx) -> - case emqx_ds_message_storage:next(It) of + case emqx_ds_message_storage_bitmask:next(It) of {value, Payload, ItNext} -> [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)]; none -> @@ -227,8 +227,8 @@ run_iterator_commands([{preserve, restore} | Rest], It, Ctx) -> db := DB, replay := Replay } = Ctx, - Serial = emqx_ds_message_storage:preserve_iterator(It), - {ok, ItNext} = emqx_ds_message_storage:restore_iterator(DB, Replay, Serial), + Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It), + {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Replay, Serial), run_iterator_commands(Rest, ItNext, Ctx); run_iterator_commands([], It, _Ctx) -> iterate_db(It). @@ -237,7 +237,7 @@ store_shim(Shim, Messages) -> lists:foreach( fun({Topic, Payload = {MessageID, Timestamp, _}}) -> Bin = term_to_binary(Payload), - emqx_ds_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin) + emqx_ds_message_storage_bitmask_shim:store(Shim, MessageID, Timestamp, Topic, Bin) end, Messages ). @@ -245,7 +245,7 @@ store_shim(Shim, Messages) -> iterate_shim(Shim, Iteration) -> lists:map( fun binary_to_term/1, - emqx_ds_message_storage_shim:iterate(Shim, Iteration) + emqx_ds_message_storage_bitmask_shim:iterate(Shim, Iteration) ). %%-------------------------------------------------------------------- @@ -254,8 +254,8 @@ iterate_shim(Shim, Iteration) -> open_db(Filepath, Options) -> {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), - {Schema, CFRefs} = emqx_ds_message_storage:create_new(Handle, ?GEN_ID, Options), - DB = emqx_ds_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), + {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options), + DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), {DB, Handle}. close_db(Handle) -> @@ -379,7 +379,7 @@ keyspace_filter() -> ?LET( {TopicFilter, StartTime, Keymapper}, {topic_filter(), pos_integer(), keymapper()}, - emqx_ds_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) + emqx_ds_message_storage_bitmask:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ). messages(Topic) -> @@ -426,14 +426,14 @@ flat(T) -> %%-------------------------------------------------------------------- make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> - emqx_ds_message_storage:make_keymapper(#{ + emqx_ds_message_storage_bitmask:make_keymapper(#{ timestamp_bits => TimestampBits, topic_bits_per_level => TopicBits, epoch => MaxEpoch }). get_keymapper_bitsize(Keymapper) -> - maps:get(bitsize, emqx_ds_message_storage:keymapper_info(Keymapper)). + maps:get(bitsize, emqx_ds_message_storage_bitmask:keymapper_info(Keymapper)). -spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). interleave(Seqs, Rng) ->