feat(ds): Add metadata storage and supervision tree

This commit is contained in:
ieQu1 2023-01-01 14:34:25 +01:00
parent 5e30a5d3dd
commit b5bb77dd58
10 changed files with 623 additions and 102 deletions

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_replay).
%% API:
-export([]).
-export_type([topic/0, time/0]).
%%================================================================================
%% Type declarations
%%================================================================================
%% parsed
-type topic() :: list(binary()).
%% TODO granularity?
-type time() :: integer().
%%================================================================================
%% API funcions
%%================================================================================
%%================================================================================
%% behavior callbacks
%%================================================================================
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================
%% Internal functions
%%================================================================================

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,4 +16,7 @@
-module(emqx_replay_app).
-export([]).
-export([start/2]).
start(_Type, _Args) ->
emqx_replay_sup:start_link().

View File

@ -0,0 +1,278 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_replay_local_store).
-behavior(gen_server).
%% API:
-export([start_link/1]).
-export([make_iterator/3, store/5, next/1]).
%% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export_type([cf_refs/0, gen_id/0, db_write_options/0]).
-compile({inline, [meta_lookup/2]}).
%%================================================================================
%% Type declarations
%%================================================================================
%% see rocksdb:db_options()
-type options() :: proplists:proplist().
-type db_write_options() :: proplists:proplist().
-type cf_refs() :: [{_CFName :: string(), _CFRef :: reference()}].
-record(generation, {
%% Module that handles data for the generation
module :: module(),
%% Module-specific attributes
data :: term()
% time_range :: {emqx_replay:time(), emqx_replay:time()}
}).
-record(s, {
zone :: emqx_types:zone(),
db :: rocksdb:db_handle(),
column_families :: cf_refs()
}).
-record(it, {
module :: module(),
data :: term()
}).
-type gen_id() :: 0..16#ffff.
-opaque iterator() :: #it{}.
%% Contents of the default column family:
%%
%% [{<<"genNN">>, #generation{}}, ...,
%% {<<"current">>, GenID}]
-define(DEFAULT_CF_OPTS, []).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link(emqx_types:zone()) -> {ok, pid()}.
start_link(Zone) ->
gen_server:start_link(?MODULE, [Zone], []).
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) ->
{ok, _TODO} | {error, _TODO}.
make_iterator(Zone, TopicFilter, StartTime) ->
%% TODO: this is not supposed to work like this. Just a mock-up
#generation{module = Mod, data = Data} = meta_lookup(Zone, 0),
case Mod:make_iterator(Data, TopicFilter, StartTime) of
{ok, It} ->
{ok, #it{
module = Mod,
data = It
}};
Err ->
Err
end.
-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
ok | {error, _TODO}.
store(Zone, GUID, Time, Topic, Msg) ->
%% TODO: this is not supposed to work like this. Just a mock-up
#generation{module = Mod, data = Data} = meta_lookup(Zone, 0),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(#it{module = Mod, data = It0}) ->
case Mod:next(It0) of
{value, Val, It} ->
{value, Val, #it{module = Mod, data = It}};
Other ->
Other
end.
%%================================================================================
%% behavior callbacks
%%================================================================================
init([Zone]) ->
process_flag(trap_exit, true),
{ok, DBHandle, CFRefs} = open_db(Zone),
S0 = #s{
zone = Zone,
db = DBHandle,
column_families = CFRefs
},
S = ensure_current_generation(S0),
read_metadata(S),
{ok, S}.
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, #s{db = DB, zone = Zone}) ->
meta_erase(Zone),
ok = rocksdb:close(DB).
%%================================================================================
%% Internal functions
%%================================================================================
-spec read_metadata(#s{}) -> #s{}.
read_metadata(S) ->
%% TODO: just a mockup to make the existing tests pass
read_metadata(0, S).
-spec read_metadata(gen_id(), #s{}) -> #s{}.
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId),
DB = Mod:open(DBHandle, GenId, CFs, Data),
meta_put(Zone, GenId, Gen#generation{data = DB}).
-spec ensure_current_generation(#s{}) -> #s{}.
ensure_current_generation(S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
case schema_get_current(DBHandle) of
undefined ->
GenId = 0,
ok = schema_put_current(DBHandle, GenId),
create_new_generation_schema(GenId, S);
_GenId ->
S
end.
-spec create_new_generation_schema(gen_id(), #s{}) -> #s{}.
create_new_generation_schema(
GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}
) ->
{Module, Options} = new_generation_config(Zone),
{NewGenData, NewCFs} = Module:create_new(DBHandle, GenId, Options),
NewGen = #generation{
module = Module,
data = NewGenData
},
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, NewGen),
S#s{column_families = NewCFs ++ CFs}.
-spec new_generation_config(emqx_types:zone()) ->
{module(), term()}.
new_generation_config(Zone) ->
%% TODO: make a proper HOCON schema and all...
Zones = application:get_env(emqx_replay, zone_config, #{}),
DefaultConf =
#{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 8, 32, 16],
max_tau => 5
},
maps:get(Zone, Zones, {emqx_replay_message_storage, DefaultConf}).
-spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
open_db(Zone) ->
Filename = atom_to_list(Zone),
DBOptions = application:get_env(emqx_replay, db_options, []),
ColumnFamiles =
case rocksdb:list_column_families(Filename, DBOptions) of
{ok, ColumnFamiles0} ->
[{I, []} || I <- ColumnFamiles0];
% DB is not present. First start
{error, {db_open, _}} ->
[{"default", ?DEFAULT_CF_OPTS}]
end,
case rocksdb:open(Filename, [{create_if_missing, true} | DBOptions], ColumnFamiles) of
{ok, Handle, CFRefs} ->
{CFNames, _} = lists:unzip(ColumnFamiles),
{ok, Handle, lists:zip(CFNames, CFRefs)};
Error ->
Error
end.
%% Functions for dealing with the metadata stored persistently in rocksdb
-define(CURRENT_GEN, <<"current">>).
-define(SCHEMA_WRITE_OPTS, []).
-define(SCHEMA_READ_OPTS, []).
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}.
schema_get_gen(DBHandle, GenId) ->
{ok, Bin} = rocksdb:get(DBHandle, gen_rocksdb_key(GenId), ?SCHEMA_READ_OPTS),
binary_to_term(Bin).
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}.
schema_put_gen(DBHandle, GenId, Gen) ->
rocksdb:put(DBHandle, gen_rocksdb_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined.
schema_get_current(DBHandle) ->
case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of
{ok, Bin} ->
binary_to_integer(Bin);
not_found ->
undefined
end.
-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}.
schema_put_current(DBHandle, GenId) ->
rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS).
-spec gen_rocksdb_key(integer()) -> string().
gen_rocksdb_key(N) ->
<<"gen", N:32>>.
-undef(CURRENT_GEN).
-undef(SCHEMA_WRITE_OPTS).
-undef(SCHEMA_READ_OPTS).
%% Functions for dealing with the runtime zone metadata:
-define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}).
-spec meta_lookup(emqx_types:zone(), gen_id()) -> #generation{}.
meta_lookup(Zone, GenId) ->
persistent_term:get(?PERSISTENT_TERM(Zone, GenId)).
-spec meta_put(emqx_types:zone(), gen_id(), #generation{}) -> ok.
meta_put(Zone, GenId, Gen) ->
persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen).
-spec meta_erase(emqx_types:zone()) -> ok.
meta_erase(Zone) ->
[
persistent_term:erase(K)
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Zone
],
ok.
-undef(PERSISTENT_TERM).
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
%% store_cfs(DBHandle, CFRefs) ->
%% lists:foreach(
%% fun({CFName, CFRef}) ->
%% persistent_term:put({self(), CFName}, {DBHandle, CFRef})
%% end,
%% CFRefs).

View File

@ -0,0 +1,74 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_replay_local_store_sup).
-behavior(supervisor).
%% API:
-export([start_link/0, start_zone/1, stop_zone/1]).
%% behavior callbacks:
-export([init/1]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(SUP, ?MODULE).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
-spec start_zone(emqx_types:zone()) -> supervisor:startchild_ret().
start_zone(Zone) ->
supervisor:start_child(?SUP, zone_child_spec(Zone)).
-spec stop_zone(emqx_types:zone()) -> ok | {error, _}.
stop_zone(Zone) ->
ok = supervisor:terminate_child(?SUP, Zone),
ok = supervisor:delete_child(?SUP, Zone).
%%================================================================================
%% behavior callbacks
%%================================================================================
init([]) ->
Children = [],
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 10
},
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal functions
%%================================================================================
-spec zone_child_spec(emqx_types:zone()) -> supervisor:child_spec().
zone_child_spec(Zone) ->
#{
id => Zone,
start => {emqx_replay_local_store, start_link, [Zone]},
shutdown => 5_000,
restart => permanent,
type => worker
}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,80 @@
-module(emqx_replay_message_storage).
%%================================================================================
%% @doc Description of the schema
%%
%% Let us assume that `T' is a topic and `t' is time. These are the two
%% dimensions used to index messages. They can be viewed as
%% "coordinates" of an MQTT message in a 2D space.
%%
%% Oftentimes, when wildcard subscription is used, keys must be
%% scanned in both dimensions simultaneously.
%%
%% Rocksdb allows to iterate over sorted keys very fast. This means we
%% need to map our two-dimentional keys to a single index that is
%% sorted in a way that helps to iterate over both time and topic
%% without having to do a lot of random seeks.
%%
%% == Mapping of 2D keys to rocksdb keys ==
%%
%% We use "zigzag" pattern to store messages, where rocksdb key is
%% composed like like this:
%%
%% |ttttt|TTTTTTTTT|tttt|
%% ^ ^ ^
%% | | |
%% +-------+ | +---------+
%% | | |
%% most significant topic hash least significant
%% bits of timestamp bits of timestamp
%%
%% Topic hash is level-aware: each topic level is hashed separately
%% and the resulting hashes are bitwise-concatentated. This allows us
%% to map topics to fixed-length bitstrings while keeping some degree
%% of information about the hierarchy.
%%
%% Next important concept is what we call "tau-interval". It is time
%% interval determined by the number of least significant bits of the
%% timestamp found at the tail of the rocksdb key.
%%
%% The resulting index is a space-filling curve that looks like
%% this in the topic-time 2D space:
%%
%% T ^ ---->------ |---->------ |---->------
%% | --/ / --/ / --/
%% | -<-/ | -<-/ | -<-/
%% | -/ | -/ | -/
%% | ---->------ | ---->------ | ---->------
%% | --/ / --/ / --/
%% | ---/ | ---/ | ---/
%% | -/ ^ -/ ^ -/
%% | ---->------ | ---->------ | ---->------
%% | --/ / --/ / --/
%% | -<-/ | -<-/ | -<-/
%% | -/ | -/ | -/
%% | ---->------| ---->------| ---------->
%% |
%% -+------------+-----------------------------> t
%% tau
%%
%% This structure allows to quickly seek to a the first message that
%% was recorded in a certain tau-interval in a certain topic or a
%% group of topics matching filter like `foo/bar/+/+' or `foo/bar/#`.
%%
%% Due to its structure, for each pair of rocksdb keys K1 and K2, such
%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) >
%% timestamp(K2).
%% That is, replay doesn't reorder messages published in each
%% individual topic.
%%
%% This property doesn't hold between different topics, but it's not deemed
%% a problem right now.
%%
%%================================================================================
%% API:
-export([open/2, close/1]).
-export([create_new/3, open/4]).
-export([make_keymapper/1]).
-export([store/5]).
@ -55,29 +127,11 @@
%% and _rest of levels_ (if any) get 16 bits.
-type bits_per_level() :: [bits(), ...].
%% see rocksdb:db_options()
-type db_options() :: proplists:proplist().
%% see rocksdb:cf_options()
-type db_cf_options() :: proplists:proplist().
%% see rocksdb:write_options()
-type db_write_options() :: proplists:proplist().
%% see rocksdb:read_options()
-type db_read_options() :: proplists:proplist().
-type options() :: #{
%% Keymapper.
keymapper := keymapper(),
%% Name and options to use to open specific column family.
column_family => {_Name :: string(), db_cf_options()},
%% Options to use when opening the DB.
open_options => db_options(),
%% Options to use when writing a message to the DB.
write_options => db_write_options(),
%% Options to use when iterating over messages in the DB.
read_options => db_read_options()
cf_options => emqx_replay_local_store:db_cf_options()
}.
-define(DEFAULT_COLUMN_FAMILY, {"default", []}).
@ -90,12 +144,18 @@
-define(DEFAULT_WRITE_OPTIONS, [{sync, true}]).
-define(DEFAULT_READ_OPTIONS, []).
%% Persistent configuration of the generation, it is used to create db
%% record when the database is reopened
-record(schema, {keymapper :: keymapper()}).
-type schema() :: #schema{}.
-record(db, {
handle :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle(),
keymapper :: keymapper(),
write_options = [{sync, true}] :: db_write_options(),
read_options = [] :: db_write_options()
write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(),
read_options = [] :: emqx_replay_local_store:db_write_options()
}).
-record(it, {
@ -132,40 +192,33 @@
%% API funcions
%%================================================================================
-spec open(file:filename_all(), options()) ->
{ok, db()} | {error, _TODO}.
open(Filename, Options) ->
CFDescriptors =
case maps:get(column_family, Options, undefined) of
CF = {_Name, _} ->
% TODO
% > When opening a DB in a read-write mode, you need to specify all
% > Column Families that currently exist in a DB. If that's not the case,
% > DB::Open call will return Status::InvalidArgument().
% This probably means that we need the _manager_ (the thing which knows
% about all the column families there is) to hold the responsibility to
% open the database and hold all the handles.
[CF, ?DEFAULT_COLUMN_FAMILY];
undefined ->
[?DEFAULT_COLUMN_FAMILY]
end,
DBOptions = maps:get(open_options, Options, ?DEFAULT_OPEN_OPTIONS),
case rocksdb:open(Filename, DBOptions, CFDescriptors) of
{ok, Handle, [CFHandle | _]} ->
{ok, #db{
handle = Handle,
cf = CFHandle,
keymapper = maps:get(keymapper, Options),
write_options = maps:get(write_options, Options, ?DEFAULT_WRITE_OPTIONS),
read_options = maps:get(read_options, Options, ?DEFAULT_READ_OPTIONS)
}};
Error ->
Error
end.
%% Create a new column family for the generation and a serializable representation of the schema
-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:generation_id(), options()) ->
{schema(), emqx_replay_local_store:cf_refs()}.
create_new(DBHandle, GenId, Options) ->
CFName = data_cf(GenId),
CFOptions = maps:get(cf_options, Options, []),
{ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, CFOptions),
Schema = #schema{keymapper = make_keymapper(Options)},
{Schema, [{CFName, CFHandle}]}.
-spec close(db()) -> ok | {error, _}.
close(#db{handle = DB}) ->
rocksdb:close(DB).
%% Reopen the database
-spec open(
rocksdb:db_handle(),
emqx_replay_local_store:generation_id(),
[{_CFName :: string(), _CFHandle :: reference()}],
schema()
) ->
db().
open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
CFHandle = proplists:get_value(data_cf(GenId), CFs),
% assert
true = is_reference(CFHandle),
#db{
handle = DBHandle,
cf = CFHandle,
keymapper = Keymapper
}.
-spec make_keymapper(Options) -> keymapper() when
Options :: #{
@ -461,6 +514,11 @@ zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
substring(I, Offset, Size) ->
(I bsr Offset) band ones(Size).
%% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_replay_local_store:gen_id()) -> string().
data_cf(GenId) ->
?MODULE_STRING ++ integer_to_list(GenId).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

View File

@ -0,0 +1,64 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_replay_sup).
-behavior(supervisor).
%% API:
-export([start_link/0]).
%% behavior callbacks:
-export([init/1]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(SUP, ?MODULE).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
%%================================================================================
%% behavior callbacks
%%================================================================================
init([]) ->
Children = [zone_sup()],
SupFlags = #{
strategy => one_for_all,
intensity => 0,
period => 1
},
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal functions
%%================================================================================
zone_sup() ->
#{
id => local_store_zone_sup,
start => {emqx_replay_local_store_sup, start_link, []},
restart => permanent,
type => supervisor,
shutdown => infinity
}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -22,24 +22,29 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("proper/include/proper.hrl").
-define(ZONE, zone(?FUNCTION_NAME)).
%% Smoke test for opening and reopening the database
t_open(Config) ->
ok = emqx_replay_local_store_sup:stop_zone(?ZONE),
{ok, _} = emqx_replay_local_store_sup:start_zone(?ZONE).
%% Smoke test of store function
t_store(Config) ->
DB = ?config(handle, Config),
MessageID = emqx_guid:gen(),
PublishedAt = 1000,
Topic = [<<"foo">>, <<"bar">>],
Payload = <<"message">>,
?assertMatch(ok, emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload)).
?assertMatch(ok, emqx_replay_local_store:store(?ZONE, MessageID, PublishedAt, Topic, Payload)).
%% Smoke test for iteration through a concrete topic
t_iterate(Config) ->
DB = ?config(handle, Config),
%% Prepare data:
Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]],
Timestamps = lists:seq(1, 10),
[
emqx_replay_message_storage:store(
DB,
emqx_replay_local_store:store(
?ZONE,
emqx_guid:gen(),
PublishedAt,
Topic,
@ -50,7 +55,7 @@ t_iterate(Config) ->
%% Iterate through individual topics:
[
begin
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Topic, 0),
{ok, It} = emqx_replay_local_store:make_iterator(?ZONE, Topic, 0),
Values = iterate(It),
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
end
@ -60,68 +65,67 @@ t_iterate(Config) ->
%% Smoke test for iteration with wildcard topic filter
t_iterate_wildcard(Config) ->
DB = ?config(handle, Config),
%% Prepare data:
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 10),
_ = [
store(DB, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 10 + 1)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 10 + 1)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 5)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 5)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)])
),
?assertEqual(
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+/bar", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+/bar", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "+/bar/#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "+/bar/#", 0)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/+/+", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/+/+", 0)])
),
ok.
store(DB, PublishedAt, Topic, Payload) ->
store(Zone, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_message_storage:store(DB, ID, PublishedAt, parse_topic(Topic), Payload).
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
iterate(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, parse_topic(TopicFilter), StartTime),
{ok, It} = emqx_replay_local_store:make_iterator(DB, parse_topic(TopicFilter), StartTime),
iterate(It).
iterate(It) ->
case emqx_replay_message_storage:next(It) of
case emqx_replay_local_store:next(It) of
{value, Payload, ItNext} ->
[Payload | iterate(ItNext)];
none ->
@ -166,7 +170,6 @@ t_prop_hash_bitmask_computes(_) ->
).
t_prop_iterate_stored_messages(Config) ->
DB = ?config(handle, Config),
?assertEqual(
true,
proper:quickcheck(
@ -175,7 +178,7 @@ t_prop_iterate_stored_messages(Config) ->
messages(),
begin
Stream = payload_gen:interleave_streams(Streams),
ok = store_message_stream(DB, Stream),
ok = store_message_stream(?ZONE, Stream),
% TODO actually verify some property
true
end
@ -183,12 +186,12 @@ t_prop_iterate_stored_messages(Config) ->
)
).
store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
store_message_stream(Zone, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
MessageID = <<ChunkNum:32>>,
PublishedAt = rand:uniform(ChunkNum),
ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload),
store_message_stream(DB, payload_gen:next(Rest));
store_message_stream(_DB, []) ->
ok = emqx_replay_local_store:store(Zone, MessageID, PublishedAt, Topic, Payload),
store_message_stream(Zone, payload_gen:next(Rest));
store_message_stream(_Zone, []) ->
ok.
messages() ->
@ -249,18 +252,12 @@ topic_level(Entropy) ->
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(TC, Config) ->
Filename = filename:join(?MODULE_STRING, atom_to_list(TC)),
ok = filelib:ensure_dir(Filename),
{ok, DB} = emqx_replay_message_storage:open(Filename, #{
column_family => {atom_to_list(TC), []},
keymapper => emqx_replay_message_storage:make_keymapper(#{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
max_tau => 5
})
}),
[{handle, DB} | Config].
{ok, _} = application:ensure_all_started(emqx_replay),
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
Config.
end_per_testcase(_TC, Config) ->
DB = ?config(handle, Config),
catch emqx_replay_message_storage:close(DB).
ok = application:stop(emqx_replay).
zone(TC) ->
list_to_atom(?MODULE_STRING ++ atom_to_list(TC)).

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir
#! /usr/bin/env elixir
defmodule CheckElixirApplications do
alias EMQXUmbrella.MixProject

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir
#! /usr/bin/env elixir
# ensure we have a fresh rebar.lock

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir
#! /usr/bin/env elixir
defmodule CheckElixirEMQXMachineBootDiscrepancies do
alias EMQXUmbrella.MixProject