From 1159f99432373a1a1f8ac15590f8ab722ee5c186 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 May 2023 15:46:29 +0200 Subject: [PATCH] refactor(ds): emqx_replay -> emqx_ds --- apps/emqx_durable_storage/README.md | 2 +- .../{emqx_replay.app.src => emqx_ds.app.src} | 4 +- .../{emqx_replay_app.erl => emqx_ds_app.erl} | 4 +- ...{emqx_replay_conf.erl => emqx_ds_conf.erl} | 22 +++---- ...ocal_store.erl => emqx_ds_local_store.erl} | 58 ++++++++-------- ...re_sup.erl => emqx_ds_local_store_sup.erl} | 10 +-- ...torage.erl => emqx_ds_message_storage.erl} | 36 +++++----- .../{emqx_replay.erl => emqx_ds_replay.erl} | 2 +- .../{emqx_replay_sup.erl => emqx_ds_sup.erl} | 4 +- ...UITE.erl => emqx_ds_local_store_SUITE.erl} | 66 +++++++++---------- ....erl => emqx_ds_message_storage_SUITE.erl} | 4 +- ...m.erl => emqx_ds_message_storage_shim.erl} | 4 +- .../props/prop_replay_message_storage.erl | 46 ++++++------- 13 files changed, 131 insertions(+), 131 deletions(-) rename apps/emqx_durable_storage/src/{emqx_replay.app.src => emqx_ds.app.src} (81%) rename apps/emqx_durable_storage/src/{emqx_replay_app.erl => emqx_ds_app.erl} (81%) rename apps/emqx_durable_storage/src/{emqx_replay_conf.erl => emqx_ds_conf.erl} (71%) rename apps/emqx_durable_storage/src/{emqx_replay_local_store.erl => emqx_ds_local_store.erl} (88%) rename apps/emqx_durable_storage/src/{emqx_replay_local_store_sup.erl => emqx_ds_local_store_sup.erl} (85%) rename apps/emqx_durable_storage/src/{emqx_replay_message_storage.erl => emqx_ds_message_storage.erl} (96%) rename apps/emqx_durable_storage/src/{emqx_replay.erl => emqx_ds_replay.erl} (98%) rename apps/emqx_durable_storage/src/{emqx_replay_sup.erl => emqx_ds_sup.erl} (94%) rename apps/emqx_durable_storage/test/{emqx_replay_local_store_SUITE.erl => emqx_ds_local_store_SUITE.erl} (76%) rename apps/emqx_durable_storage/test/{emqx_replay_message_storage_SUITE.erl => emqx_ds_message_storage_SUITE.erl} (98%) rename apps/emqx_durable_storage/test/props/{emqx_replay_message_storage_shim.erl => emqx_ds_message_storage_shim.erl} (92%) diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index 50e539cdc..7de43bee0 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -1,6 +1,6 @@ # EMQX Replay -`emqx_replay` is a durable storage for MQTT messages within EMQX. +`emqx_ds` is a durable storage for MQTT messages within EMQX. It implements the following scenarios: - Persisting messages published by clients - diff --git a/apps/emqx_durable_storage/src/emqx_replay.app.src b/apps/emqx_durable_storage/src/emqx_ds.app.src similarity index 81% rename from apps/emqx_durable_storage/src/emqx_replay.app.src rename to apps/emqx_durable_storage/src/emqx_ds.app.src index 9c00a78ca..5a45c08d6 100644 --- a/apps/emqx_durable_storage/src/emqx_replay.app.src +++ b/apps/emqx_durable_storage/src/emqx_ds.app.src @@ -1,11 +1,11 @@ %% -*- mode: erlang -*- -{application, emqx_replay, [ +{application, emqx_ds, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! {vsn, "0.1.0"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc]}, - {mod, {emqx_replay_app, []}}, + {mod, {emqx_ds_app, []}}, {env, []} ]}. diff --git a/apps/emqx_durable_storage/src/emqx_replay_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl similarity index 81% rename from apps/emqx_durable_storage/src/emqx_replay_app.erl rename to apps/emqx_durable_storage/src/emqx_ds_app.erl index 17de0f28c..858855b6f 100644 --- a/apps/emqx_durable_storage/src/emqx_replay_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -2,9 +2,9 @@ %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_app). +-module(emqx_ds_app). -export([start/2]). start(_Type, _Args) -> - emqx_replay_sup:start_link(). + emqx_ds_sup:start_link(). diff --git a/apps/emqx_durable_storage/src/emqx_replay_conf.erl b/apps/emqx_durable_storage/src/emqx_ds_conf.erl similarity index 71% rename from apps/emqx_durable_storage/src/emqx_replay_conf.erl rename to apps/emqx_durable_storage/src/emqx_ds_conf.erl index 99405dfda..f5761e8a0 100644 --- a/apps/emqx_durable_storage/src/emqx_replay_conf.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_conf.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_conf). +-module(emqx_ds_conf). %% TODO: make a proper HOCON schema and all... @@ -12,7 +12,7 @@ -export([default_iteration_options/0]). -type backend_config() :: - {emqx_replay_message_storage, emqx_replay_message_storage:options()} + {emqx_ds_message_storage, emqx_ds_message_storage:options()} | {module(), _Options}. -export_type([backend_config/0]). @@ -21,32 +21,32 @@ %% API funcions %%================================================================================ --define(APP, emqx_replay). +-define(APP, emqx_ds). --spec shard_config(emqx_replay:shard()) -> backend_config(). +-spec shard_config(emqx_ds:shard()) -> backend_config(). shard_config(Shard) -> DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()), Shards = application:get_env(?APP, shard_config, #{}), maps:get(Shard, Shards, DefaultShardConfig). --spec shard_iteration_options(emqx_replay:shard()) -> - emqx_replay_message_storage:iteration_options(). +-spec shard_iteration_options(emqx_ds:shard()) -> + emqx_ds_message_storage:iteration_options(). shard_iteration_options(Shard) -> case shard_config(Shard) of - {emqx_replay_message_storage, Config} -> + {emqx_ds_message_storage, Config} -> maps:get(iteration, Config, default_iteration_options()); {_Module, _} -> default_iteration_options() end. --spec default_iteration_options() -> emqx_replay_message_storage:iteration_options(). +-spec default_iteration_options() -> emqx_ds_message_storage:iteration_options(). default_iteration_options() -> - {emqx_replay_message_storage, Config} = default_shard_config(), + {emqx_ds_message_storage, Config} = default_shard_config(), maps:get(iteration, Config). -spec default_shard_config() -> backend_config(). default_shard_config() -> - {emqx_replay_message_storage, #{ + {emqx_ds_message_storage, #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 8, 32, 16], epoch => 5, @@ -55,6 +55,6 @@ default_shard_config() -> } }}. --spec db_options() -> emqx_replay_local_store:db_options(). +-spec db_options() -> emqx_ds_local_store:db_options(). db_options() -> application:get_env(?APP, db_options, []). diff --git a/apps/emqx_durable_storage/src/emqx_replay_local_store.erl b/apps/emqx_durable_storage/src/emqx_ds_local_store.erl similarity index 88% rename from apps/emqx_durable_storage/src/emqx_replay_local_store.erl rename to apps/emqx_durable_storage/src/emqx_ds_local_store.erl index c8041297b..45845d714 100644 --- a/apps/emqx_durable_storage/src/emqx_replay_local_store.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_local_store.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_local_store). +-module(emqx_ds_local_store). -behaviour(gen_server). @@ -43,20 +43,20 @@ %% When should this generation become active? %% This generation should only contain messages timestamped no earlier than that. %% The very first generation will have `since` equal 0. - since := emqx_replay:time() + since := emqx_ds:time() }. -record(s, { - shard :: emqx_replay:shard(), + shard :: emqx_ds:shard(), db :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle(), cf_generations :: cf_refs() }). -record(it, { - shard :: emqx_replay:shard(), + shard :: emqx_ds:shard(), gen :: gen_id(), - replay :: emqx_replay:replay(), + replay :: emqx_ds:replay(), module :: module(), data :: term() }). @@ -91,16 +91,16 @@ -callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> {_Schema, cf_refs()}. --callback open(emqx_replay:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> +-callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> term(). --callback store(_Schema, binary(), emqx_replay:time(), emqx_replay:topic(), binary()) -> +-callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. --callback make_iterator(_Schema, emqx_replay:replay()) -> +-callback make_iterator(_Schema, emqx_ds:replay()) -> {ok, _It} | {error, _}. --callback restore_iterator(_Schema, emqx_replay:replay(), binary()) -> {ok, _It} | {error, _}. +-callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. -callback preserve_iterator(_Schema, _It) -> term(). @@ -110,24 +110,24 @@ %% API funcions %%================================================================================ --spec start_link(emqx_replay:shard()) -> {ok, pid()}. +-spec start_link(emqx_ds:shard()) -> {ok, pid()}. start_link(Shard) -> gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []). --spec create_generation(emqx_replay:shard(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> +-spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> {ok, gen_id()} | {error, nonmonotonic}. create_generation(Shard, Since, Config = {_Module, _Options}) -> gen_server:call(?REF(Shard), {create_generation, Since, Config}). -spec store( - emqx_replay:shard(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary() + emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary() ) -> ok | {error, _}. store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_replay:shard(), emqx_replay:replay()) -> +-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(Shard, Replay = {_, StartTime}) -> {GenId, Gen} = meta_lookup_gen(Shard, StartTime), @@ -155,12 +155,12 @@ next(It = #it{module = Mod, data = ItData}) -> end end. --spec preserve_iterator(iterator(), emqx_replay:replay_id()) -> +-spec preserve_iterator(iterator(), emqx_ds:replay_id()) -> ok | {error, _TODO}. preserve_iterator(It = #it{}, ReplayID) -> iterator_put_state(ReplayID, It). --spec restore_iterator(emqx_replay:shard(), emqx_replay:replay_id()) -> +-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(Shard, ReplayID) -> case iterator_get_state(Shard, ReplayID) of @@ -172,7 +172,7 @@ restore_iterator(Shard, ReplayID) -> Error end. --spec discard_iterator(emqx_replay:shard(), emqx_replay:replay_id()) -> +-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> ok | {error, _TODO}. discard_iterator(Shard, ReplayID) -> iterator_delete(Shard, ReplayID). @@ -229,14 +229,14 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> - Config = emqx_replay_conf:shard_config(Shard), + Config = emqx_ds_conf:shard_config(Shard), {ok, _, NS} = create_new_gen(0, Config, S), NS; _GenId -> S end. --spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> +-spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> {ok, gen_id(), state()} | {error, nonmonotonic}. create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> GenId = get_next_id(meta_get_current(Shard)), @@ -253,7 +253,7 @@ create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> Error end. --spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> +-spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> {ok, generation(), state()}. create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) -> % TODO: Backend implementation should ensure idempotency. @@ -265,13 +265,13 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_replay:shard()) -> {ok, state()} | {error, _TODO}. +-spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}. open_db(Shard) -> Filename = binary_to_list(Shard), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} - | emqx_replay_conf:db_options() + | emqx_ds_conf:db_options() ], ExistingCFs = case rocksdb:list_column_families(Filename, DBOptions) of @@ -425,7 +425,7 @@ schema_gen_key(N) -> -define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}). --spec meta_register_gen(emqx_replay:shard(), gen_id(), generation()) -> ok. +-spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok. meta_register_gen(Shard, GenId, Gen) -> Gs = case GenId > 0 of @@ -435,7 +435,7 @@ meta_register_gen(Shard, GenId, Gen) -> ok = meta_put(Shard, GenId, [Gen | Gs]), ok = meta_put(Shard, current, GenId). --spec meta_lookup_gen(emqx_replay:shard(), emqx_replay:time()) -> {gen_id(), generation()}. +-spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}. meta_lookup_gen(Shard, Time) -> % TODO % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning @@ -449,30 +449,30 @@ find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> find_gen(Time, GenId, [_Gen | Rest]) -> find_gen(Time, GenId - 1, Rest). --spec meta_get_gen(emqx_replay:shard(), gen_id()) -> generation() | undefined. +-spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined. meta_get_gen(Shard, GenId) -> case meta_lookup(Shard, GenId, []) of [Gen | _Older] -> Gen; [] -> undefined end. --spec meta_get_current(emqx_replay:shard()) -> gen_id() | undefined. +-spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined. meta_get_current(Shard) -> meta_lookup(Shard, current, undefined). --spec meta_lookup(emqx_replay:shard(), _K) -> _V. +-spec meta_lookup(emqx_ds:shard(), _K) -> _V. meta_lookup(Shard, K) -> persistent_term:get(?PERSISTENT_TERM(Shard, K)). --spec meta_lookup(emqx_replay:shard(), _K, Default) -> _V | Default. +-spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default. meta_lookup(Shard, K, Default) -> persistent_term:get(?PERSISTENT_TERM(Shard, K), Default). --spec meta_put(emqx_replay:shard(), _K, _V) -> ok. +-spec meta_put(emqx_ds:shard(), _K, _V) -> ok. meta_put(Shard, K, V) -> persistent_term:put(?PERSISTENT_TERM(Shard, K), V). --spec meta_erase(emqx_replay:shard()) -> ok. +-spec meta_erase(emqx_ds:shard()) -> ok. meta_erase(Shard) -> [ persistent_term:erase(K) diff --git a/apps/emqx_durable_storage/src/emqx_replay_local_store_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_local_store_sup.erl similarity index 85% rename from apps/emqx_durable_storage/src/emqx_replay_local_store_sup.erl rename to apps/emqx_durable_storage/src/emqx_ds_local_store_sup.erl index 13f20c6bc..aad50992d 100644 --- a/apps/emqx_durable_storage/src/emqx_replay_local_store_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_local_store_sup.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_local_store_sup). +-module(emqx_ds_local_store_sup). -behaviour(supervisor). @@ -25,11 +25,11 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_replay:shard()) -> supervisor:startchild_ret(). +-spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret(). start_shard(Shard) -> supervisor:start_child(?SUP, shard_child_spec(Shard)). --spec stop_shard(emqx_replay:shard()) -> ok | {error, _}. +-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> ok = supervisor:terminate_child(?SUP, Shard), ok = supervisor:delete_child(?SUP, Shard). @@ -51,11 +51,11 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_replay:shard()) -> supervisor:child_spec(). +-spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec(). shard_child_spec(Shard) -> #{ id => Shard, - start => {emqx_replay_local_store, start_link, [Shard]}, + start => {emqx_ds_local_store, start_link, [Shard]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/src/emqx_replay_message_storage.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl similarity index 96% rename from apps/emqx_durable_storage/src/emqx_replay_message_storage.erl rename to apps/emqx_durable_storage/src/emqx_ds_message_storage.erl index f4d6c8e66..a9427b1f6 100644 --- a/apps/emqx_durable_storage/src/emqx_replay_message_storage.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_message_storage). +-module(emqx_ds_message_storage). %%================================================================================ %% @doc Description of the schema @@ -128,8 +128,8 @@ %% Type declarations %%================================================================================ --type topic() :: emqx_replay:topic(). --type time() :: emqx_replay:time(). +-type topic() :: emqx_ds:topic(). +-type time() :: emqx_ds:time(). %% Number of bits -type bits() :: non_neg_integer(). @@ -152,7 +152,7 @@ iteration => iteration_options(), - cf_options => emqx_replay_local_store:db_cf_options() + cf_options => emqx_ds_local_store:db_cf_options() }. -type iteration_options() :: #{ @@ -170,12 +170,12 @@ -opaque schema() :: #schema{}. -record(db, { - shard :: emqx_replay:shard(), + shard :: emqx_ds:shard(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), - write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), - read_options = [] :: emqx_replay_local_store:db_write_options() + write_options = [{sync, true}] :: emqx_ds_local_store:db_write_options(), + read_options = [] :: emqx_ds_local_store:db_write_options() }). -record(it, { @@ -221,8 +221,8 @@ %%================================================================================ %% Create a new column family for the generation and a serializable representation of the schema --spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) -> - {schema(), emqx_replay_local_store:cf_refs()}. +-spec create_new(rocksdb:db_handle(), emqx_ds_local_store:gen_id(), options()) -> + {schema(), emqx_ds_local_store:cf_refs()}. create_new(DBHandle, GenId, Options) -> CFName = data_cf(GenId), CFOptions = maps:get(cf_options, Options, []), @@ -232,10 +232,10 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( - emqx_replay:shard(), + emqx_ds:shard(), rocksdb:db_handle(), - emqx_replay_local_store:gen_id(), - emqx_replay_local_store:cf_refs(), + emqx_ds_local_store:gen_id(), + emqx_ds_local_store:cf_refs(), schema() ) -> db(). @@ -277,13 +277,13 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_replay:replay()) -> +-spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> - Options = emqx_replay_conf:shard_iteration_options(DB#db.shard), + Options = emqx_ds_conf:shard_iteration_options(DB#db.shard), make_iterator(DB, Replay, Options). --spec make_iterator(db(), emqx_replay:replay(), iteration_options()) -> +-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. @@ -337,7 +337,7 @@ preserve_iterator(#it{cursor = Cursor}) -> }, term_to_binary(State). --spec restore_iterator(db(), emqx_replay:replay(), binary()) -> +-spec restore_iterator(db(), emqx_ds:replay(), binary()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> State = binary_to_term(Serial), @@ -419,7 +419,7 @@ hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). --spec make_keyspace_filter(emqx_replay:replay(), keymapper()) -> keyspace_filter(). +-spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter(). make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), @@ -710,7 +710,7 @@ substring(I, Offset, Size) -> (I bsr Offset) band ones(Size). %% @doc Generate a column family ID for the MQTT messages --spec data_cf(emqx_replay_local_store:gen_id()) -> [char()]. +-spec data_cf(emqx_ds_local_store:gen_id()) -> [char()]. data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). diff --git a/apps/emqx_durable_storage/src/emqx_replay.erl b/apps/emqx_durable_storage/src/emqx_ds_replay.erl similarity index 98% rename from apps/emqx_durable_storage/src/emqx_replay.erl rename to apps/emqx_durable_storage/src/emqx_ds_replay.erl index f1ea7beac..db49c368d 100644 --- a/apps/emqx_durable_storage/src/emqx_replay.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replay.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay). +-module(emqx_ds_replay). %% API: -export([]). diff --git a/apps/emqx_durable_storage/src/emqx_replay_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl similarity index 94% rename from apps/emqx_durable_storage/src/emqx_replay_sup.erl rename to apps/emqx_durable_storage/src/emqx_ds_sup.erl index 945a71180..ebd022632 100644 --- a/apps/emqx_durable_storage/src/emqx_replay_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_sup). +-module(emqx_ds_sup). -behaviour(supervisor). @@ -45,7 +45,7 @@ init([]) -> shard_sup() -> #{ id => local_store_shard_sup, - start => {emqx_replay_local_store_sup, start_link, []}, + start => {emqx_ds_local_store_sup, start_link, []}, restart => permanent, type => supervisor, shutdown => infinity diff --git a/apps/emqx_durable_storage/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_local_store_SUITE.erl similarity index 76% rename from apps/emqx_durable_storage/test/emqx_replay_local_store_SUITE.erl rename to apps/emqx_durable_storage/test/emqx_ds_local_store_SUITE.erl index da6fef09d..d59c4571e 100644 --- a/apps/emqx_durable_storage/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_local_store_SUITE.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_local_store_SUITE). +-module(emqx_ds_local_store_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -12,7 +12,7 @@ -define(SHARD, shard(?FUNCTION_NAME)). -define(DEFAULT_CONFIG, - {emqx_replay_message_storage, #{ + {emqx_ds_message_storage, #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 32, 16], epoch => 5, @@ -23,7 +23,7 @@ ). -define(COMPACT_CONFIG, - {emqx_replay_message_storage, #{ + {emqx_ds_message_storage, #{ timestamp_bits => 16, topic_bits_per_level => [16, 16], epoch => 10 @@ -32,8 +32,8 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> - ok = emqx_replay_local_store_sup:stop_shard(?SHARD), - {ok, _} = emqx_replay_local_store_sup:start_shard(?SHARD). + ok = emqx_ds_local_store_sup:stop_shard(?SHARD), + {ok, _} = emqx_ds_local_store_sup:start_shard(?SHARD). %% Smoke test of store function t_store(_Config) -> @@ -41,7 +41,7 @@ t_store(_Config) -> PublishedAt = 1000, Topic = [<<"foo">>, <<"bar">>], Payload = <<"message">>, - ?assertMatch(ok, emqx_replay_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). + ?assertMatch(ok, emqx_ds_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> @@ -49,7 +49,7 @@ t_iterate(_Config) -> Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]], Timestamps = lists:seq(1, 10), [ - emqx_replay_local_store:store( + emqx_ds_local_store:store( ?SHARD, emqx_guid:gen(), PublishedAt, @@ -61,7 +61,7 @@ t_iterate(_Config) -> %% Iterate through individual topics: [ begin - {ok, It} = emqx_replay_local_store:make_iterator(?SHARD, {Topic, 0}), + {ok, It} = emqx_ds_local_store:make_iterator(?SHARD, {Topic, 0}), Values = iterate(It), ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) end @@ -136,16 +136,16 @@ t_iterate_long_tail_wildcard(_Config) -> ). t_create_gen(_Config) -> - {ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), ?assertEqual( {error, nonmonotonic}, - emqx_replay_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) + emqx_ds_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) ), ?assertEqual( {error, nonmonotonic}, - emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) + emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) ), - {ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), Topics = ["foo/bar", "foo/bar/baz"], Timestamps = lists:seq(1, 100), [ @@ -154,9 +154,9 @@ t_create_gen(_Config) -> ]. t_iterate_multigen(_Config) -> - {ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 100), _ = [ @@ -180,9 +180,9 @@ t_iterate_multigen(_Config) -> t_iterate_multigen_preserve_restore(_Config) -> ReplayID = atom_to_binary(?FUNCTION_NAME), - {ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a/bar"], Timestamps = lists:seq(1, 100), TopicFilter = "foo/#", @@ -194,12 +194,12 @@ t_iterate_multigen_preserve_restore(_Config) -> It0 = iterator(?SHARD, TopicFilter, 0), {It1, Res10} = iterate(It0, 10), % preserve mid-generation - ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID), - {ok, It2} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID), + ok = emqx_ds_local_store:preserve_iterator(It1, ReplayID), + {ok, It2} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID), {It3, Res100} = iterate(It2, 88), % preserve on the generation boundary - ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID), - {ok, It4} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID), + ok = emqx_ds_local_store:preserve_iterator(It3, ReplayID), + {ok, It4} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID), {It5, Res200} = iterate(It4, 1000), ?assertEqual(none, It5), ?assertEqual( @@ -208,22 +208,22 @@ t_iterate_multigen_preserve_restore(_Config) -> ), ?assertEqual( ok, - emqx_replay_local_store:discard_iterator(?SHARD, ReplayID) + emqx_ds_local_store:discard_iterator(?SHARD, ReplayID) ), ?assertEqual( {error, not_found}, - emqx_replay_local_store:restore_iterator(?SHARD, ReplayID) + emqx_ds_local_store:restore_iterator(?SHARD, ReplayID) ). store(Shard, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), - emqx_replay_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). + emqx_ds_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). iterate(DB, TopicFilter, StartTime) -> iterate(iterator(DB, TopicFilter, StartTime)). iterate(It) -> - case emqx_replay_local_store:next(It) of + case emqx_ds_local_store:next(It) of {value, Payload, ItNext} -> [Payload | iterate(ItNext)]; none -> @@ -233,7 +233,7 @@ iterate(It) -> iterate(It, 0) -> {It, []}; iterate(It, N) -> - case emqx_replay_local_store:next(It) of + case emqx_ds_local_store:next(It) of {value, Payload, ItNext} -> {ItFinal, Ps} = iterate(ItNext, N - 1), {ItFinal, [Payload | Ps]}; @@ -242,7 +242,7 @@ iterate(It, N) -> end. iterator(DB, TopicFilter, StartTime) -> - {ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), + {ok, It} = emqx_ds_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), It. parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> @@ -255,22 +255,22 @@ parse_topic(Topic) -> all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(emqx_replay), + {ok, _} = application:ensure_all_started(emqx_ds), Config. end_per_suite(_Config) -> - ok = application:stop(emqx_replay). + ok = application:stop(emqx_ds). init_per_testcase(TC, Config) -> ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_replay_local_store_sup:start_shard(shard(TC)), + {ok, _} = emqx_ds_local_store_sup:start_shard(shard(TC)), Config. end_per_testcase(TC, _Config) -> - ok = emqx_replay_local_store_sup:stop_shard(shard(TC)). + ok = emqx_ds_local_store_sup:stop_shard(shard(TC)). shard(TC) -> list_to_binary(lists:concat([?MODULE, "_", TC])). set_shard_config(Shard, Config) -> - ok = application:set_env(emqx_replay, shard_config, #{Shard => Config}). + ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}). diff --git a/apps/emqx_durable_storage/test/emqx_replay_message_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_message_storage_SUITE.erl similarity index 98% rename from apps/emqx_durable_storage/test/emqx_replay_message_storage_SUITE.erl rename to apps/emqx_durable_storage/test/emqx_ds_message_storage_SUITE.erl index a26579299..cbffcc4a1 100644 --- a/apps/emqx_durable_storage/test/emqx_replay_message_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_message_storage_SUITE.erl @@ -1,14 +1,14 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_message_storage_SUITE). +-module(emqx_ds_message_storage_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("stdlib/include/assert.hrl"). --import(emqx_replay_message_storage, [ +-import(emqx_ds_message_storage, [ make_keymapper/1, keymapper_info/1, compute_topic_bitmask/2, diff --git a/apps/emqx_durable_storage/test/props/emqx_replay_message_storage_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_shim.erl similarity index 92% rename from apps/emqx_durable_storage/test/props/emqx_replay_message_storage_shim.erl rename to apps/emqx_durable_storage/test/props/emqx_ds_message_storage_shim.erl index f8e5c33d9..7f6cf8e64 100644 --- a/apps/emqx_durable_storage/test/props/emqx_replay_message_storage_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_shim.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_replay_message_storage_shim). +-module(emqx_ds_message_storage_shim). -export([open/0]). -export([close/1]). @@ -29,7 +29,7 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) -> true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), ok. --spec iterate(t(), emqx_replay:replay()) -> +-spec iterate(t(), emqx_ds:replay()) -> [binary()]. iterate(Tab, {TopicFilter, StartTime}) -> ets:foldr( diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index 7713a66a6..08ae5d21d 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -22,7 +22,7 @@ prop_bitstring_computes() -> Keymapper, keymapper(), ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin - BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), + BS = emqx_ds_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper))) end) ). @@ -30,7 +30,7 @@ prop_bitstring_computes() -> prop_topic_bitmask_computes() -> Keymapper = make_keymapper(16, [8, 12, 16], 100), ?FORALL(TopicFilter, topic_filter(), begin - Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), + Mask = emqx_ds_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), % topic bits + timestamp LSBs is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) end). @@ -40,14 +40,14 @@ prop_next_seek_monotonic() -> {TopicFilter, StartTime, Keymapper}, {topic_filter(), pos_integer(), keymapper()}, begin - Filter = emqx_replay_message_storage:make_keyspace_filter( + Filter = emqx_ds_message_storage:make_keyspace_filter( {TopicFilter, StartTime}, Keymapper ), ?FORALL( Bitstring, bitstr(get_keymapper_bitsize(Keymapper)), - emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring + emqx_ds_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring ) end ). @@ -56,8 +56,8 @@ prop_next_seek_eq_initial_seek() -> ?FORALL( Filter, keyspace_filter(), - emqx_replay_message_storage:compute_initial_seek(Filter) =:= - emqx_replay_message_storage:compute_next_seek(0, Filter) + emqx_ds_message_storage:compute_initial_seek(Filter) =:= + emqx_ds_message_storage:compute_next_seek(0, Filter) ). prop_iterate_messages() -> @@ -72,7 +72,7 @@ prop_iterate_messages() -> ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), {DB, Handle} = open_db(Filepath, Options), - Shim = emqx_replay_message_storage_shim:open(), + Shim = emqx_ds_message_storage_shim:open(), ok = store_db(DB, Stream), ok = store_shim(Shim, Stream), ?FORALL( @@ -92,7 +92,7 @@ prop_iterate_messages() -> Messages = iterate_db(DB, Iteration), Reference = iterate_shim(Shim, Iteration), ok = close_db(Handle), - ok = emqx_replay_message_storage_shim:close(Shim), + ok = emqx_ds_message_storage_shim:close(Shim), ?WHENFAIL( begin io:format(user, " *** Filepath = ~s~n", [Filepath]), @@ -182,7 +182,7 @@ prop_iterate_eq_iterate_with_refresh() -> % PublishedAt = ChunkNum, % MessageID, PublishedAt, Topic % ]), -% ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), +% ok = emqx_ds_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), % store_message_stream(DB, payload_gen:next(Rest)); % store_message_stream(_Zone, []) -> % ok. @@ -191,7 +191,7 @@ store_db(DB, Messages) -> lists:foreach( fun({Topic, Payload = {MessageID, Timestamp, _}}) -> Bin = term_to_binary(Payload), - emqx_replay_message_storage:store(DB, MessageID, Timestamp, Topic, Bin) + emqx_ds_message_storage:store(DB, MessageID, Timestamp, Topic, Bin) end, Messages ). @@ -200,7 +200,7 @@ iterate_db(DB, Iteration) -> iterate_db(make_iterator(DB, Iteration)). iterate_db(It) -> - case emqx_replay_message_storage:next(It) of + case emqx_ds_message_storage:next(It) of {value, Payload, ItNext} -> [binary_to_term(Payload) | iterate_db(ItNext)]; none -> @@ -208,15 +208,15 @@ iterate_db(It) -> end. make_iterator(DB, Replay) -> - {ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay), + {ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay), It. make_iterator(DB, Replay, Options) -> - {ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay, Options), + {ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay, Options), It. run_iterator_commands([iterate | Rest], It, Ctx) -> - case emqx_replay_message_storage:next(It) of + case emqx_ds_message_storage:next(It) of {value, Payload, ItNext} -> [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)]; none -> @@ -227,8 +227,8 @@ run_iterator_commands([{preserve, restore} | Rest], It, Ctx) -> db := DB, replay := Replay } = Ctx, - Serial = emqx_replay_message_storage:preserve_iterator(It), - {ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Replay, Serial), + Serial = emqx_ds_message_storage:preserve_iterator(It), + {ok, ItNext} = emqx_ds_message_storage:restore_iterator(DB, Replay, Serial), run_iterator_commands(Rest, ItNext, Ctx); run_iterator_commands([], It, _Ctx) -> iterate_db(It). @@ -237,7 +237,7 @@ store_shim(Shim, Messages) -> lists:foreach( fun({Topic, Payload = {MessageID, Timestamp, _}}) -> Bin = term_to_binary(Payload), - emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin) + emqx_ds_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin) end, Messages ). @@ -245,7 +245,7 @@ store_shim(Shim, Messages) -> iterate_shim(Shim, Iteration) -> lists:map( fun binary_to_term/1, - emqx_replay_message_storage_shim:iterate(Shim, Iteration) + emqx_ds_message_storage_shim:iterate(Shim, Iteration) ). %%-------------------------------------------------------------------- @@ -254,8 +254,8 @@ iterate_shim(Shim, Iteration) -> open_db(Filepath, Options) -> {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), - {Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options), - DB = emqx_replay_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), + {Schema, CFRefs} = emqx_ds_message_storage:create_new(Handle, ?GEN_ID, Options), + DB = emqx_ds_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), {DB, Handle}. close_db(Handle) -> @@ -379,7 +379,7 @@ keyspace_filter() -> ?LET( {TopicFilter, StartTime, Keymapper}, {topic_filter(), pos_integer(), keymapper()}, - emqx_replay_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) + emqx_ds_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ). messages(Topic) -> @@ -426,14 +426,14 @@ flat(T) -> %%-------------------------------------------------------------------- make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> - emqx_replay_message_storage:make_keymapper(#{ + emqx_ds_message_storage:make_keymapper(#{ timestamp_bits => TimestampBits, topic_bits_per_level => TopicBits, epoch => MaxEpoch }). get_keymapper_bitsize(Keymapper) -> - maps:get(bitsize, emqx_replay_message_storage:keymapper_info(Keymapper)). + maps:get(bitsize, emqx_ds_message_storage:keymapper_info(Keymapper)). -spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). interleave(Seqs, Rng) ->