diff --git a/apps/emqx_replay/src/emqx_replay.erl b/apps/emqx_replay/src/emqx_replay.erl new file mode 100644 index 000000000..ee83e35d9 --- /dev/null +++ b/apps/emqx_replay/src/emqx_replay.erl @@ -0,0 +1,47 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_replay). + +%% API: +-export([]). + +-export_type([topic/0, time/0]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%% parsed +-type topic() :: list(binary()). + +%% TODO granularity? +-type time() :: integer(). + +%%================================================================================ +%% API funcions +%%================================================================================ + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_replay/src/emqx_replay_app.erl b/apps/emqx_replay/src/emqx_replay_app.erl index 090299150..bf6fd0b55 100644 --- a/apps/emqx_replay/src/emqx_replay_app.erl +++ b/apps/emqx_replay/src/emqx_replay_app.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,4 +16,7 @@ -module(emqx_replay_app). --export([]). +-export([start/2]). + +start(_Type, _Args) -> + emqx_replay_sup:start_link(). diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl new file mode 100644 index 000000000..0c1eb4171 --- /dev/null +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -0,0 +1,278 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_replay_local_store). + +-behavior(gen_server). + +%% API: +-export([start_link/1]). + +-export([make_iterator/3, store/5, next/1]). + +%% behavior callbacks: +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-export_type([cf_refs/0, gen_id/0, db_write_options/0]). + +-compile({inline, [meta_lookup/2]}). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%% see rocksdb:db_options() +-type options() :: proplists:proplist(). + +-type db_write_options() :: proplists:proplist(). + +-type cf_refs() :: [{_CFName :: string(), _CFRef :: reference()}]. + +-record(generation, { + %% Module that handles data for the generation + module :: module(), + %% Module-specific attributes + data :: term() + % time_range :: {emqx_replay:time(), emqx_replay:time()} +}). + +-record(s, { + zone :: emqx_types:zone(), + db :: rocksdb:db_handle(), + column_families :: cf_refs() +}). + +-record(it, { + module :: module(), + data :: term() +}). + +-type gen_id() :: 0..16#ffff. + +-opaque iterator() :: #it{}. + +%% Contents of the default column family: +%% +%% [{<<"genNN">>, #generation{}}, ..., +%% {<<"current">>, GenID}] + +-define(DEFAULT_CF_OPTS, []). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec start_link(emqx_types:zone()) -> {ok, pid()}. +start_link(Zone) -> + gen_server:start_link(?MODULE, [Zone], []). + +-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) -> + {ok, _TODO} | {error, _TODO}. +make_iterator(Zone, TopicFilter, StartTime) -> + %% TODO: this is not supposed to work like this. Just a mock-up + #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), + case Mod:make_iterator(Data, TopicFilter, StartTime) of + {ok, It} -> + {ok, #it{ + module = Mod, + data = It + }}; + Err -> + Err + end. + +-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> + ok | {error, _TODO}. +store(Zone, GUID, Time, Topic, Msg) -> + %% TODO: this is not supposed to work like this. Just a mock-up + #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), + Mod:store(Data, GUID, Time, Topic, Msg). + +-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. +next(#it{module = Mod, data = It0}) -> + case Mod:next(It0) of + {value, Val, It} -> + {value, Val, #it{module = Mod, data = It}}; + Other -> + Other + end. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +init([Zone]) -> + process_flag(trap_exit, true), + {ok, DBHandle, CFRefs} = open_db(Zone), + S0 = #s{ + zone = Zone, + db = DBHandle, + column_families = CFRefs + }, + S = ensure_current_generation(S0), + read_metadata(S), + {ok, S}. + +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #s{db = DB, zone = Zone}) -> + meta_erase(Zone), + ok = rocksdb:close(DB). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-spec read_metadata(#s{}) -> #s{}. +read_metadata(S) -> + %% TODO: just a mockup to make the existing tests pass + read_metadata(0, S). + +-spec read_metadata(gen_id(), #s{}) -> #s{}. +read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> + Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), + DB = Mod:open(DBHandle, GenId, CFs, Data), + meta_put(Zone, GenId, Gen#generation{data = DB}). + +-spec ensure_current_generation(#s{}) -> #s{}. +ensure_current_generation(S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> + case schema_get_current(DBHandle) of + undefined -> + GenId = 0, + ok = schema_put_current(DBHandle, GenId), + create_new_generation_schema(GenId, S); + _GenId -> + S + end. + +-spec create_new_generation_schema(gen_id(), #s{}) -> #s{}. +create_new_generation_schema( + GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs} +) -> + {Module, Options} = new_generation_config(Zone), + {NewGenData, NewCFs} = Module:create_new(DBHandle, GenId, Options), + NewGen = #generation{ + module = Module, + data = NewGenData + }, + %% TODO: Transaction? Column family creation can't be transactional, anyway. + ok = schema_put_gen(DBHandle, GenId, NewGen), + S#s{column_families = NewCFs ++ CFs}. + +-spec new_generation_config(emqx_types:zone()) -> + {module(), term()}. +new_generation_config(Zone) -> + %% TODO: make a proper HOCON schema and all... + Zones = application:get_env(emqx_replay, zone_config, #{}), + DefaultConf = + #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 8, 32, 16], + max_tau => 5 + }, + maps:get(Zone, Zones, {emqx_replay_message_storage, DefaultConf}). + +-spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. +open_db(Zone) -> + Filename = atom_to_list(Zone), + DBOptions = application:get_env(emqx_replay, db_options, []), + ColumnFamiles = + case rocksdb:list_column_families(Filename, DBOptions) of + {ok, ColumnFamiles0} -> + [{I, []} || I <- ColumnFamiles0]; + % DB is not present. First start + {error, {db_open, _}} -> + [{"default", ?DEFAULT_CF_OPTS}] + end, + case rocksdb:open(Filename, [{create_if_missing, true} | DBOptions], ColumnFamiles) of + {ok, Handle, CFRefs} -> + {CFNames, _} = lists:unzip(ColumnFamiles), + {ok, Handle, lists:zip(CFNames, CFRefs)}; + Error -> + Error + end. + +%% Functions for dealing with the metadata stored persistently in rocksdb + +-define(CURRENT_GEN, <<"current">>). +-define(SCHEMA_WRITE_OPTS, []). +-define(SCHEMA_READ_OPTS, []). + +-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}. +schema_get_gen(DBHandle, GenId) -> + {ok, Bin} = rocksdb:get(DBHandle, gen_rocksdb_key(GenId), ?SCHEMA_READ_OPTS), + binary_to_term(Bin). + +-spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}. +schema_put_gen(DBHandle, GenId, Gen) -> + rocksdb:put(DBHandle, gen_rocksdb_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). + +-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined. +schema_get_current(DBHandle) -> + case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of + {ok, Bin} -> + binary_to_integer(Bin); + not_found -> + undefined + end. + +-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}. +schema_put_current(DBHandle, GenId) -> + rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS). + +-spec gen_rocksdb_key(integer()) -> string(). +gen_rocksdb_key(N) -> + <<"gen", N:32>>. + +-undef(CURRENT_GEN). +-undef(SCHEMA_WRITE_OPTS). +-undef(SCHEMA_READ_OPTS). + +%% Functions for dealing with the runtime zone metadata: + +-define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}). + +-spec meta_lookup(emqx_types:zone(), gen_id()) -> #generation{}. +meta_lookup(Zone, GenId) -> + persistent_term:get(?PERSISTENT_TERM(Zone, GenId)). + +-spec meta_put(emqx_types:zone(), gen_id(), #generation{}) -> ok. +meta_put(Zone, GenId, Gen) -> + persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen). + +-spec meta_erase(emqx_types:zone()) -> ok. +meta_erase(Zone) -> + [ + persistent_term:erase(K) + || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Zone + ], + ok. + +-undef(PERSISTENT_TERM). + +%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. +%% store_cfs(DBHandle, CFRefs) -> +%% lists:foreach( +%% fun({CFName, CFRef}) -> +%% persistent_term:put({self(), CFName}, {DBHandle, CFRef}) +%% end, +%% CFRefs). diff --git a/apps/emqx_replay/src/emqx_replay_local_store_sup.erl b/apps/emqx_replay/src/emqx_replay_local_store_sup.erl new file mode 100644 index 000000000..fb88ef212 --- /dev/null +++ b/apps/emqx_replay/src/emqx_replay_local_store_sup.erl @@ -0,0 +1,74 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_replay_local_store_sup). + +-behavior(supervisor). + +%% API: +-export([start_link/0, start_zone/1, stop_zone/1]). + +%% behavior callbacks: +-export([init/1]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SUP, ?MODULE). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUP}, ?MODULE, []). + +-spec start_zone(emqx_types:zone()) -> supervisor:startchild_ret(). +start_zone(Zone) -> + supervisor:start_child(?SUP, zone_child_spec(Zone)). + +-spec stop_zone(emqx_types:zone()) -> ok | {error, _}. +stop_zone(Zone) -> + ok = supervisor:terminate_child(?SUP, Zone), + ok = supervisor:delete_child(?SUP, Zone). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +init([]) -> + Children = [], + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 + }, + {ok, {SupFlags, Children}}. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-spec zone_child_spec(emqx_types:zone()) -> supervisor:child_spec(). +zone_child_spec(Zone) -> + #{ + id => Zone, + start => {emqx_replay_local_store, start_link, [Zone]}, + shutdown => 5_000, + restart => permanent, + type => worker + }. diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 66668b23b..94f297750 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,80 @@ -module(emqx_replay_message_storage). +%%================================================================================ +%% @doc Description of the schema +%% +%% Let us assume that `T' is a topic and `t' is time. These are the two +%% dimensions used to index messages. They can be viewed as +%% "coordinates" of an MQTT message in a 2D space. +%% +%% Oftentimes, when wildcard subscription is used, keys must be +%% scanned in both dimensions simultaneously. +%% +%% Rocksdb allows to iterate over sorted keys very fast. This means we +%% need to map our two-dimentional keys to a single index that is +%% sorted in a way that helps to iterate over both time and topic +%% without having to do a lot of random seeks. +%% +%% == Mapping of 2D keys to rocksdb keys == +%% +%% We use "zigzag" pattern to store messages, where rocksdb key is +%% composed like like this: +%% +%% |ttttt|TTTTTTTTT|tttt| +%% ^ ^ ^ +%% | | | +%% +-------+ | +---------+ +%% | | | +%% most significant topic hash least significant +%% bits of timestamp bits of timestamp +%% +%% Topic hash is level-aware: each topic level is hashed separately +%% and the resulting hashes are bitwise-concatentated. This allows us +%% to map topics to fixed-length bitstrings while keeping some degree +%% of information about the hierarchy. +%% +%% Next important concept is what we call "tau-interval". It is time +%% interval determined by the number of least significant bits of the +%% timestamp found at the tail of the rocksdb key. +%% +%% The resulting index is a space-filling curve that looks like +%% this in the topic-time 2D space: +%% +%% T ^ ---->------ |---->------ |---->------ +%% | --/ / --/ / --/ +%% | -<-/ | -<-/ | -<-/ +%% | -/ | -/ | -/ +%% | ---->------ | ---->------ | ---->------ +%% | --/ / --/ / --/ +%% | ---/ | ---/ | ---/ +%% | -/ ^ -/ ^ -/ +%% | ---->------ | ---->------ | ---->------ +%% | --/ / --/ / --/ +%% | -<-/ | -<-/ | -<-/ +%% | -/ | -/ | -/ +%% | ---->------| ---->------| ----------> +%% | +%% -+------------+-----------------------------> t +%% tau +%% +%% This structure allows to quickly seek to a the first message that +%% was recorded in a certain tau-interval in a certain topic or a +%% group of topics matching filter like `foo/bar/+/+' or `foo/bar/#`. +%% +%% Due to its structure, for each pair of rocksdb keys K1 and K2, such +%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) > +%% timestamp(K2). +%% That is, replay doesn't reorder messages published in each +%% individual topic. +%% +%% This property doesn't hold between different topics, but it's not deemed +%% a problem right now. +%% +%%================================================================================ + %% API: --export([open/2, close/1]). +-export([create_new/3, open/4]). -export([make_keymapper/1]). -export([store/5]). @@ -55,29 +127,11 @@ %% and _rest of levels_ (if any) get 16 bits. -type bits_per_level() :: [bits(), ...]. -%% see rocksdb:db_options() --type db_options() :: proplists:proplist(). - -%% see rocksdb:cf_options() --type db_cf_options() :: proplists:proplist(). - -%% see rocksdb:write_options() --type db_write_options() :: proplists:proplist(). - -%% see rocksdb:read_options() --type db_read_options() :: proplists:proplist(). - -type options() :: #{ %% Keymapper. keymapper := keymapper(), %% Name and options to use to open specific column family. - column_family => {_Name :: string(), db_cf_options()}, - %% Options to use when opening the DB. - open_options => db_options(), - %% Options to use when writing a message to the DB. - write_options => db_write_options(), - %% Options to use when iterating over messages in the DB. - read_options => db_read_options() + cf_options => emqx_replay_local_store:db_cf_options() }. -define(DEFAULT_COLUMN_FAMILY, {"default", []}). @@ -90,12 +144,18 @@ -define(DEFAULT_WRITE_OPTIONS, [{sync, true}]). -define(DEFAULT_READ_OPTIONS, []). +%% Persistent configuration of the generation, it is used to create db +%% record when the database is reopened +-record(schema, {keymapper :: keymapper()}). + +-type schema() :: #schema{}. + -record(db, { handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), - write_options = [{sync, true}] :: db_write_options(), - read_options = [] :: db_write_options() + write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), + read_options = [] :: emqx_replay_local_store:db_write_options() }). -record(it, { @@ -132,40 +192,33 @@ %% API funcions %%================================================================================ --spec open(file:filename_all(), options()) -> - {ok, db()} | {error, _TODO}. -open(Filename, Options) -> - CFDescriptors = - case maps:get(column_family, Options, undefined) of - CF = {_Name, _} -> - % TODO - % > When opening a DB in a read-write mode, you need to specify all - % > Column Families that currently exist in a DB. If that's not the case, - % > DB::Open call will return Status::InvalidArgument(). - % This probably means that we need the _manager_ (the thing which knows - % about all the column families there is) to hold the responsibility to - % open the database and hold all the handles. - [CF, ?DEFAULT_COLUMN_FAMILY]; - undefined -> - [?DEFAULT_COLUMN_FAMILY] - end, - DBOptions = maps:get(open_options, Options, ?DEFAULT_OPEN_OPTIONS), - case rocksdb:open(Filename, DBOptions, CFDescriptors) of - {ok, Handle, [CFHandle | _]} -> - {ok, #db{ - handle = Handle, - cf = CFHandle, - keymapper = maps:get(keymapper, Options), - write_options = maps:get(write_options, Options, ?DEFAULT_WRITE_OPTIONS), - read_options = maps:get(read_options, Options, ?DEFAULT_READ_OPTIONS) - }}; - Error -> - Error - end. +%% Create a new column family for the generation and a serializable representation of the schema +-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:generation_id(), options()) -> + {schema(), emqx_replay_local_store:cf_refs()}. +create_new(DBHandle, GenId, Options) -> + CFName = data_cf(GenId), + CFOptions = maps:get(cf_options, Options, []), + {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, CFOptions), + Schema = #schema{keymapper = make_keymapper(Options)}, + {Schema, [{CFName, CFHandle}]}. --spec close(db()) -> ok | {error, _}. -close(#db{handle = DB}) -> - rocksdb:close(DB). +%% Reopen the database +-spec open( + rocksdb:db_handle(), + emqx_replay_local_store:generation_id(), + [{_CFName :: string(), _CFHandle :: reference()}], + schema() +) -> + db(). +open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> + CFHandle = proplists:get_value(data_cf(GenId), CFs), + % assert + true = is_reference(CFHandle), + #db{ + handle = DBHandle, + cf = CFHandle, + keymapper = Keymapper + }. -spec make_keymapper(Options) -> keymapper() when Options :: #{ @@ -461,6 +514,11 @@ zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) -> substring(I, Offset, Size) -> (I bsr Offset) band ones(Size). +%% @doc Generate a column family ID for the MQTT messages +-spec data_cf(emqx_replay_local_store:gen_id()) -> string(). +data_cf(GenId) -> + ?MODULE_STRING ++ integer_to_list(GenId). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_replay/src/emqx_replay_sup.erl b/apps/emqx_replay/src/emqx_replay_sup.erl new file mode 100644 index 000000000..a5da13c7a --- /dev/null +++ b/apps/emqx_replay/src/emqx_replay_sup.erl @@ -0,0 +1,64 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_replay_sup). + +-behavior(supervisor). + +%% API: +-export([start_link/0]). + +%% behavior callbacks: +-export([init/1]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SUP, ?MODULE). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUP}, ?MODULE, []). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +init([]) -> + Children = [zone_sup()], + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, + {ok, {SupFlags, Children}}. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +zone_sup() -> + #{ + id => local_store_zone_sup, + start => {emqx_replay_local_store_sup, start_link, []}, + restart => permanent, + type => supervisor, + shutdown => infinity + }. diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index 5608f6008..237823014 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -22,24 +22,29 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("proper/include/proper.hrl"). +-define(ZONE, zone(?FUNCTION_NAME)). + +%% Smoke test for opening and reopening the database +t_open(Config) -> + ok = emqx_replay_local_store_sup:stop_zone(?ZONE), + {ok, _} = emqx_replay_local_store_sup:start_zone(?ZONE). + %% Smoke test of store function t_store(Config) -> - DB = ?config(handle, Config), MessageID = emqx_guid:gen(), PublishedAt = 1000, Topic = [<<"foo">>, <<"bar">>], Payload = <<"message">>, - ?assertMatch(ok, emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload)). + ?assertMatch(ok, emqx_replay_local_store:store(?ZONE, MessageID, PublishedAt, Topic, Payload)). %% Smoke test for iteration through a concrete topic t_iterate(Config) -> - DB = ?config(handle, Config), %% Prepare data: Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]], Timestamps = lists:seq(1, 10), [ - emqx_replay_message_storage:store( - DB, + emqx_replay_local_store:store( + ?ZONE, emqx_guid:gen(), PublishedAt, Topic, @@ -50,7 +55,7 @@ t_iterate(Config) -> %% Iterate through individual topics: [ begin - {ok, It} = emqx_replay_message_storage:make_iterator(DB, Topic, 0), + {ok, It} = emqx_replay_local_store:make_iterator(?ZONE, Topic, 0), Values = iterate(It), ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) end @@ -60,68 +65,67 @@ t_iterate(Config) -> %% Smoke test for iteration with wildcard topic filter t_iterate_wildcard(Config) -> - DB = ?config(handle, Config), %% Prepare data: Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 10), _ = [ - store(DB, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) || Topic <- Topics, PublishedAt <- Timestamps ], ?assertEqual( lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 0)]) ), ?assertEqual( [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 10 + 1)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 10 + 1)]) ), ?assertEqual( lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 5)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 5)]) ), ?assertEqual( lists:sort([ {Topic, PublishedAt} || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)]) ), ?assertEqual( lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+", 0)]) ), ?assertEqual( [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+/bar", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+/bar", 0)]) ), ?assertEqual( lists:sort([ {Topic, PublishedAt} || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "+/bar/#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "+/bar/#", 0)]) ), ?assertEqual( lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 0)]) ), ?assertEqual( [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/+/+", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/+/+", 0)]) ), ok. -store(DB, PublishedAt, Topic, Payload) -> +store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), - emqx_replay_message_storage:store(DB, ID, PublishedAt, parse_topic(Topic), Payload). + emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). iterate(DB, TopicFilter, StartTime) -> - {ok, It} = emqx_replay_message_storage:make_iterator(DB, parse_topic(TopicFilter), StartTime), + {ok, It} = emqx_replay_local_store:make_iterator(DB, parse_topic(TopicFilter), StartTime), iterate(It). iterate(It) -> - case emqx_replay_message_storage:next(It) of + case emqx_replay_local_store:next(It) of {value, Payload, ItNext} -> [Payload | iterate(ItNext)]; none -> @@ -166,7 +170,6 @@ t_prop_hash_bitmask_computes(_) -> ). t_prop_iterate_stored_messages(Config) -> - DB = ?config(handle, Config), ?assertEqual( true, proper:quickcheck( @@ -175,7 +178,7 @@ t_prop_iterate_stored_messages(Config) -> messages(), begin Stream = payload_gen:interleave_streams(Streams), - ok = store_message_stream(DB, Stream), + ok = store_message_stream(?ZONE, Stream), % TODO actually verify some property true end @@ -183,12 +186,12 @@ t_prop_iterate_stored_messages(Config) -> ) ). -store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> +store_message_stream(Zone, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> MessageID = <>, PublishedAt = rand:uniform(ChunkNum), - ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), - store_message_stream(DB, payload_gen:next(Rest)); -store_message_stream(_DB, []) -> + ok = emqx_replay_local_store:store(Zone, MessageID, PublishedAt, Topic, Payload), + store_message_stream(Zone, payload_gen:next(Rest)); +store_message_stream(_Zone, []) -> ok. messages() -> @@ -249,18 +252,12 @@ topic_level(Entropy) -> all() -> emqx_common_test_helpers:all(?MODULE). init_per_testcase(TC, Config) -> - Filename = filename:join(?MODULE_STRING, atom_to_list(TC)), - ok = filelib:ensure_dir(Filename), - {ok, DB} = emqx_replay_message_storage:open(Filename, #{ - column_family => {atom_to_list(TC), []}, - keymapper => emqx_replay_message_storage:make_keymapper(#{ - timestamp_bits => 64, - topic_bits_per_level => [8, 8, 32, 16], - max_tau => 5 - }) - }), - [{handle, DB} | Config]. + {ok, _} = application:ensure_all_started(emqx_replay), + {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), + Config. end_per_testcase(_TC, Config) -> - DB = ?config(handle, Config), - catch emqx_replay_message_storage:close(DB). + ok = application:stop(emqx_replay). + +zone(TC) -> + list_to_atom(?MODULE_STRING ++ atom_to_list(TC)). diff --git a/scripts/check-elixir-applications.exs b/scripts/check-elixir-applications.exs index 42c838199..1e604c69f 100755 --- a/scripts/check-elixir-applications.exs +++ b/scripts/check-elixir-applications.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir defmodule CheckElixirApplications do alias EMQXUmbrella.MixProject diff --git a/scripts/check-elixir-deps-discrepancies.exs b/scripts/check-elixir-deps-discrepancies.exs index 408079d7d..1363219ed 100755 --- a/scripts/check-elixir-deps-discrepancies.exs +++ b/scripts/check-elixir-deps-discrepancies.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir # ensure we have a fresh rebar.lock diff --git a/scripts/check-elixir-emqx-machine-boot-discrepancies.exs b/scripts/check-elixir-emqx-machine-boot-discrepancies.exs index d07e6978f..9ffdc47bf 100755 --- a/scripts/check-elixir-emqx-machine-boot-discrepancies.exs +++ b/scripts/check-elixir-emqx-machine-boot-discrepancies.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir defmodule CheckElixirEMQXMachineBootDiscrepancies do alias EMQXUmbrella.MixProject