Compare commits

...

50 Commits

Author SHA1 Message Date
ieQu1 5aaba1d567
Merge pull request #10703 from ieQu1/replay
Sync master and refactor
2023-05-15 13:58:13 +02:00
ieQu1 330a255c3b chore: Add callback definitions to local_store 2023-05-15 12:05:18 +02:00
ieQu1 e7f773b6ae chore: behavior -> behaviour 2023-05-15 11:39:48 +02:00
ieQu1 62c27677b2 chore: Change license to BSL 2023-05-15 11:21:57 +02:00
ieQu1 f79dd16672 refactor(replay): zone -> shard
Also bump erlang-rocksdb version
2023-05-15 11:21:57 +02:00
ieQu1 b5443c2981 chore: Sync master 2023-05-15 11:19:03 +02:00
Andrew Mayorov 6423083895
Merge pull request #9947 from feat/EMQX-8744/iterators-cf
feat(replay): allow to preserve / restore iterators in the db
2023-04-11 15:20:59 +03:00
Andrew Mayorov cfd23d76d3
test(replay): verify preserve / restore works with stored iterators 2023-04-10 15:27:49 +03:00
Andrew Mayorov 1f033f92b5
feat(replay): allow to preserve / restore iterators in the db
So that we could guarantee replay consistency / availability under
the assumption that nodes may be restarted or even lost occasionally.
2023-04-10 15:27:49 +03:00
Andrew Mayorov c7aeb98466
Merge pull request #9797 from feat/rocksdb-replay-queue/cross-gen-iteration
feat: make iteration fully generation-aware
2023-01-18 16:48:50 +04:00
Andrew Mayorov 3c13dd38f6
feat: make `create_generation` safer against bad input 2023-01-18 11:11:49 +03:00
Andrew Mayorov 418ecbcbbc
feat: make iteration fully generation-aware 2023-01-18 11:11:49 +03:00
Andrew Mayorov e42f009d3f
chore: simplify iteration-related typespecs 2023-01-18 11:11:49 +03:00
Andrew Mayorov 4e36456843
feat: allow to create new storage generations 2023-01-18 11:11:46 +03:00
Andrew Mayorov a0f97ede67
Merge pull request #9740 from feat/rocksdb-replay-queue/iterator-refresh
feat: enable periodic iterator refresh
2023-01-18 12:10:51 +04:00
Andrew Mayorov 16736eca0f
fix: correct typespec 2023-01-16 20:18:23 +03:00
Andrew Mayorov 464db76a52
feat: wire iteration options up to the app config 2023-01-16 20:18:22 +03:00
Andrew Mayorov d950efc9fa
test: split unit tests off into a full-fledged suite 2023-01-12 17:59:55 +03:00
Andrew Mayorov b7566ab7e7
test: provide more general `keymapper_info/1` 2023-01-12 17:59:15 +03:00
Andrew Mayorov 0d495c97c8
chore: rename testsuite to reflect test subject better 2023-01-12 17:12:45 +03:00
Andrew Mayorov d504d415e6
feat: enable periodic iterator refresh
This might be helpful during replays taking multiple tens of seconds so
that underlying iterators won't hold onto in-memory / on-disk data
structures for too long, preventing rocksdb from recycling them.
2023-01-12 16:03:39 +03:00
Andrew Mayorov 5f5cc27697
Merge pull request #9695 from keynslug/feat/rocksdb-replay-queue/serializable-iterators
feat: add an ability to preserve and restore iterators
2023-01-12 11:36:48 +04:00
Andrew Mayorov f338aeb3f2
refactor: use inline functions instead of macros where applicable 2023-01-11 18:45:21 +03:00
Andrew Mayorov d65112eeac
fix: clear bitmask of topic filter tail containing wildcards 2023-01-06 13:58:39 +03:00
Andrew Mayorov 41bfebf9e0
test: proptest that iteration is exhaustive
Compare iteration results against what an extremely simplified model
produces.
2023-01-06 13:58:39 +03:00
Andrew Mayorov 5e633321db
test: scale up number of messages per topic in proptests 2023-01-06 13:58:39 +03:00
Andrew Mayorov 43225d20a6
test: use `_build/test/proper` as a scratch dir for testruns 2023-01-06 13:58:38 +03:00
Andrew Mayorov aba48c488e
test: add a proptest on iterator preserve / restore
Which verifies that preservation and restoration of iterators does not
affect the outcome of an iteration (under the precondition that the
state of database is constant during an iteration).
2023-01-05 22:52:08 +03:00
Andrew Mayorov 7fd14fb404
feat: add an ability to preserve and restore iterators
This will allow to persist iteration state and to periodically recreate
iterators during long replays.
2023-01-05 22:48:10 +03:00
Andrew Mayorov d6ee23e5b3
test: move proptests into a separate module
Following conventions. Also add few proptests on keyspace filters.
2023-01-04 22:05:09 +03:00
Andrew Mayorov 4b8dbca232
refactor: introduce keyspace filter concept
So we could conveniently test it separately.
2023-01-04 22:02:53 +03:00
Andrew Mayorov 3de384e806
Merge pull request #9669 from keynslug/chore/rocksdb-replay-queue/simplify-next-seek
chore: attempt to make compute_next_seek's logic clearer
2023-01-04 11:40:24 +04:00
Andrew Mayorov d5941c568b
refactor: rename `compute_hash_*` → `compute_topic_*` 2023-01-03 18:53:19 +03:00
Andrew Mayorov b300601a65
chore: assign CODEOWNER 2023-01-03 18:34:42 +03:00
Andrew Mayorov bea0dc22eb
chore: drop few unused macrodefinitions 2023-01-03 18:30:43 +03:00
Andrew Mayorov e248a18fd4
chore: attempt to make `compute_next_seek`'s logic clearer 2023-01-03 18:29:06 +03:00
ieQu1 917c8635e1
fix(replay): Fix dialyzer warnings 2023-01-03 14:11:04 +03:00
ieQu1 cb97a9abd1
refactor(replay): tau -> epoch 2023-01-03 14:09:02 +03:00
ieQu1 a9c036b058
refactor(replay): Factor out configuration to a separate module 2023-01-03 14:08:55 +03:00
ieQu1 f5e2d2f66d
refactor(replay): Introduce bitwise_concat function 2023-01-03 14:08:47 +03:00
ieQu1 ff145ecc43
feat(replay): Add metadata storage and supervision tree 2023-01-03 14:08:40 +03:00
Andrew Mayorov 0cfeee0df7
feat: implement keyspace partitioning across time 2023-01-03 14:07:14 +03:00
Andrew Mayorov 83467e7174
feat: allow to specify message store options
* Keymapper
* Column family name + DB options
* DB write / read options
2023-01-03 14:07:12 +03:00
Andrew Mayorov 7e13753ea5
test: add basic property tests 2023-01-03 14:05:29 +03:00
Andrew Mayorov fcc8a4bcce
refactor: rename function consistently 2023-01-03 14:05:29 +03:00
Andrew Mayorov 8707504245
feat(iter): wildcard smoke tests 2023-01-03 14:05:29 +03:00
Andrew Mayorov 5e612c910c
feat(iter): wip interator next 2023-01-03 14:05:29 +03:00
ieQu1 d99a347654
test(rocksdb): Add testcase 2023-01-03 14:05:29 +03:00
ieQu1 3248f396e0
feat(rocksdb): WIP 2023-01-03 14:05:29 +03:00
Andrew Mayorov 52964e2bfa
feat(replay): storage layer wip 2023-01-03 14:05:23 +03:00
22 changed files with 2967 additions and 7 deletions

2
.github/CODEOWNERS vendored
View File

@ -18,7 +18,7 @@
/apps/emqx_rule_engine/ @emqx/emqx-review-board @kjellwinblad
/apps/emqx_slow_subs/ @emqx/emqx-review-board @lafirest
/apps/emqx_statsd/ @emqx/emqx-review-board @JimMoen
/apps/emqx_replay @emqx/emqx-review-board @ieQu1
## CI
/deploy/ @emqx/emqx-review-board @Rory-Z

View File

@ -26,7 +26,7 @@
-include("bpapi.hrl").
%%================================================================================
%% behavior callbacks
%% behaviour callbacks
%%================================================================================
introduced_in() ->

94
apps/emqx_replay/BSL.txt Normal file
View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2027-06-01
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,37 @@
# EMQX Replay
`emqx_replay` is a durable storage for MQTT messages within EMQX.
It implements the following scenarios:
- Persisting messages published by clients
-
> 0. App overview introduction
> 1. let people know what your project can do specifically. Is it a base
> library dependency, or what kind of functionality is provided to the user?
> 2. Provide context and add a link to any reference visitors might be
> unfamiliar with.
> 3. Design details, implementation technology architecture, Roadmap, etc.
# [Features] - [Optional]
> A List of features your application provided. If the feature is quite simple, just
> list in the previous section.
# Limitation
TBD
# Documentation links
TBD
# Usage
TBD
# Configurations
TBD
# HTTP APIs
# Other
TBD
# Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).

View File

@ -0,0 +1,11 @@
%% -*- mode: erlang -*-
{application, emqx_replay, [
{description, "Message persistence and subscription replays for EMQX"},
% strict semver, bump manually!
{vsn, "0.1.0"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rocksdb, gproc]},
{mod, {emqx_replay_app, []}},
{env, []}
]}.

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay).
%% API:
-export([]).
-export_type([topic/0, time/0, shard/0]).
-export_type([replay_id/0, replay/0]).
%%================================================================================
%% Type declarations
%%================================================================================
%% parsed
-type topic() :: list(binary()).
-type shard() :: binary().
%% Timestamp
%% Earliest possible timestamp is 0.
%% TODO granularity?
-type time() :: non_neg_integer().
-type replay_id() :: binary().
-type replay() :: {
_TopicFilter :: topic(),
_StartTime :: time()
}.
%%================================================================================
%% API funcions
%%================================================================================
%%================================================================================
%% behaviour callbacks
%%================================================================================
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================
%% Internal functions
%%================================================================================

View File

@ -0,0 +1,10 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_app).
-export([start/2]).
start(_Type, _Args) ->
emqx_replay_sup:start_link().

View File

@ -0,0 +1,60 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_conf).
%% TODO: make a proper HOCON schema and all...
%% API:
-export([shard_config/1, db_options/0]).
-export([shard_iteration_options/1]).
-export([default_iteration_options/0]).
-type backend_config() ::
{emqx_replay_message_storage, emqx_replay_message_storage:options()}
| {module(), _Options}.
-export_type([backend_config/0]).
%%================================================================================
%% API funcions
%%================================================================================
-define(APP, emqx_replay).
-spec shard_config(emqx_replay:shard()) -> backend_config().
shard_config(Shard) ->
DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()),
Shards = application:get_env(?APP, shard_config, #{}),
maps:get(Shard, Shards, DefaultShardConfig).
-spec shard_iteration_options(emqx_replay:shard()) ->
emqx_replay_message_storage:iteration_options().
shard_iteration_options(Shard) ->
case shard_config(Shard) of
{emqx_replay_message_storage, Config} ->
maps:get(iteration, Config, default_iteration_options());
{_Module, _} ->
default_iteration_options()
end.
-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options().
default_iteration_options() ->
{emqx_replay_message_storage, Config} = default_shard_config(),
maps:get(iteration, Config).
-spec default_shard_config() -> backend_config().
default_shard_config() ->
{emqx_replay_message_storage, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 100}
}
}}.
-spec db_options() -> emqx_replay_local_store:db_options().
db_options() ->
application:get_env(?APP, db_options, []).

View File

@ -0,0 +1,505 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_local_store).
-behaviour(gen_server).
%% API:
-export([start_link/1]).
-export([create_generation/3]).
-export([store/5]).
-export([make_iterator/2, next/1]).
-export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]).
%% behaviour callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]).
-compile({inline, [meta_lookup/2]}).
%%================================================================================
%% Type declarations
%%================================================================================
%% see rocksdb:db_options()
% -type options() :: proplists:proplist().
-type db_write_options() :: proplists:proplist().
-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
%% Message storage generation
%% Keep in mind that instances of this type are persisted in long-term storage.
-type generation() :: #{
%% Module that handles data for the generation
module := module(),
%% Module-specific data defined at generation creation time
data := term(),
%% When should this generation become active?
%% This generation should only contain messages timestamped no earlier than that.
%% The very first generation will have `since` equal 0.
since := emqx_replay:time()
}.
-record(s, {
shard :: emqx_replay:shard(),
db :: rocksdb:db_handle(),
cf_iterator :: rocksdb:cf_handle(),
cf_generations :: cf_refs()
}).
-record(it, {
shard :: emqx_replay:shard(),
gen :: gen_id(),
replay :: emqx_replay:replay(),
module :: module(),
data :: term()
}).
-type gen_id() :: 0..16#ffff.
-opaque state() :: #s{}.
-opaque iterator() :: #it{}.
%% Contents of the default column family:
%%
%% [{<<"genNN">>, #generation{}}, ...,
%% {<<"current">>, GenID}]
-define(DEFAULT_CF, "default").
-define(DEFAULT_CF_OPTS, []).
-define(ITERATOR_CF, "$iterators").
%% TODO
%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`.
%% 2. Supposedly might be compressed _very_ effectively.
%% 3. `inplace_update_support`?
-define(ITERATOR_CF_OPTS, []).
-define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}).
%%================================================================================
%% Callbacks
%%================================================================================
-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
{_Schema, cf_refs()}.
-callback open(emqx_replay:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
term().
-callback store(_Schema, binary(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
ok | {error, _}.
-callback make_iterator(_Schema, emqx_replay:replay()) ->
{ok, _It} | {error, _}.
-callback restore_iterator(_Schema, emqx_replay:replay(), binary()) -> {ok, _It} | {error, _}.
-callback preserve_iterator(_Schema, _It) -> term().
-callback next(It) -> {value, binary(), It} | none | {error, closed}.
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link(emqx_replay:shard()) -> {ok, pid()}.
start_link(Shard) ->
gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []).
-spec create_generation(emqx_replay:shard(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
{ok, gen_id()} | {error, nonmonotonic}.
create_generation(Shard, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(Shard), {create_generation, Since, Config}).
-spec store(
emqx_replay:shard(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()
) ->
ok | {error, _}.
store(Shard, GUID, Time, Topic, Msg) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec make_iterator(emqx_replay:shard(), emqx_replay:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(Shard, Replay = {_, StartTime}) ->
{GenId, Gen} = meta_lookup_gen(Shard, StartTime),
open_iterator(Gen, #it{
shard = Shard,
gen = GenId,
replay = Replay
}).
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(It = #it{module = Mod, data = ItData}) ->
case Mod:next(ItData) of
{value, Val, ItDataNext} ->
{value, Val, It#it{data = ItDataNext}};
{error, _} = Error ->
Error;
none ->
case open_next_iterator(It) of
{ok, ItNext} ->
next(ItNext);
{error, _} = Error ->
Error;
none ->
none
end
end.
-spec preserve_iterator(iterator(), emqx_replay:replay_id()) ->
ok | {error, _TODO}.
preserve_iterator(It = #it{}, ReplayID) ->
iterator_put_state(ReplayID, It).
-spec restore_iterator(emqx_replay:shard(), emqx_replay:replay_id()) ->
{ok, iterator()} | {error, _TODO}.
restore_iterator(Shard, ReplayID) ->
case iterator_get_state(Shard, ReplayID) of
{ok, Serial} ->
restore_iterator_state(Shard, Serial);
not_found ->
{error, not_found};
{error, _Reason} = Error ->
Error
end.
-spec discard_iterator(emqx_replay:shard(), emqx_replay:replay_id()) ->
ok | {error, _TODO}.
discard_iterator(Shard, ReplayID) ->
iterator_delete(Shard, ReplayID).
%%================================================================================
%% behaviour callbacks
%%================================================================================
init([Shard]) ->
process_flag(trap_exit, true),
{ok, S0} = open_db(Shard),
S = ensure_current_generation(S0),
ok = populate_metadata(S),
{ok, S}.
handle_call({create_generation, Since, Config}, _From, S) ->
case create_new_gen(Since, Config, S) of
{ok, GenId, NS} ->
{reply, {ok, GenId}, NS};
{error, _} = Error ->
{reply, Error, S}
end;
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, #s{db = DB, shard = Shard}) ->
meta_erase(Shard),
ok = rocksdb:close(DB).
%%================================================================================
%% Internal functions
%%================================================================================
-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
-spec populate_metadata(state()) -> ok.
populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) ->
ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
Current = schema_get_current(DBHandle),
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
-spec populate_metadata(gen_id(), state()) -> ok.
populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
meta_register_gen(Shard, GenId, Gen).
-spec ensure_current_generation(state()) -> state().
ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) ->
case schema_get_current(DBHandle) of
undefined ->
Config = emqx_replay_conf:shard_config(Shard),
{ok, _, NS} = create_new_gen(0, Config, S),
NS;
_GenId ->
S
end.
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, gen_id(), state()} | {error, nonmonotonic}.
create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Shard)),
GenId = get_next_id(schema_get_current(DBHandle)),
case is_gen_valid(Shard, GenId, Since) of
ok ->
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, Gen),
ok = schema_put_current(DBHandle, GenId),
ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)),
{ok, GenId, NS};
{error, _} = Error ->
Error
end.
-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, generation(), state()}.
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
% TODO: Backend implementation should ensure idempotency.
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
Gen = #{
module => Module,
data => Schema,
since => Since
},
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-spec open_db(emqx_replay:shard()) -> {ok, state()} | {error, _TODO}.
open_db(Shard) ->
Filename = binary_to_list(Shard),
DBOptions = [
{create_if_missing, true},
{create_missing_column_families, true}
| emqx_replay_conf:db_options()
],
ExistingCFs =
case rocksdb:list_column_families(Filename, DBOptions) of
{ok, CFs} ->
[{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
% DB is not present. First start
{error, {db_open, _}} ->
[]
end,
ColumnFamilies = [
{?DEFAULT_CF, ?DEFAULT_CF_OPTS},
{?ITERATOR_CF, ?ITERATOR_CF_OPTS}
| ExistingCFs
],
case rocksdb:open(Filename, DBOptions, ColumnFamilies) of
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
{CFNames, _} = lists:unzip(ExistingCFs),
{ok, #s{
shard = Shard,
db = DBHandle,
cf_iterator = CFIterator,
cf_generations = lists:zip(CFNames, CFRefs)
}};
Error ->
Error
end.
-spec open_gen(gen_id(), generation(), state()) -> generation().
open_gen(
GenId,
Gen = #{module := Mod, data := Data},
#s{shard = Shard, db = DBHandle, cf_generations = CFs}
) ->
DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
Gen#{data := DB}.
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
open_next_iterator(It = #it{shard = Shard, gen = GenId}) ->
open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}).
open_next_iterator(undefined, _It) ->
none;
open_next_iterator(Gen = #{}, It) ->
open_iterator(Gen, It).
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
case Mod:make_iterator(Data, It#it.replay) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
Err
end.
-spec open_restore_iterator(generation(), iterator(), binary()) ->
{ok, iterator()} | {error, _Reason}.
open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) ->
case Mod:restore_iterator(Data, Replay, Serial) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
Err
end.
%%
-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>).
-define(ITERATION_WRITE_OPTS, []).
-define(ITERATION_READ_OPTS, []).
iterator_get_state(Shard, ReplayID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
iterator_put_state(ID, It = #it{shard = Shard}) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
Serial = preserve_iterator_state(It),
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
iterator_delete(Shard, ID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
preserve_iterator_state(#it{
gen = Gen,
replay = {TopicFilter, StartTime},
module = Mod,
data = ItData
}) ->
term_to_binary(#{
v => 1,
gen => Gen,
filter => TopicFilter,
start => StartTime,
st => Mod:preserve_iterator(ItData)
}).
restore_iterator_state(Shard, Serial) when is_binary(Serial) ->
restore_iterator_state(Shard, binary_to_term(Serial));
restore_iterator_state(
Shard,
#{
v := 1,
gen := Gen,
filter := TopicFilter,
start := StartTime,
st := State
}
) ->
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
%% Functions for dealing with the metadata stored persistently in rocksdb
-define(CURRENT_GEN, <<"current">>).
-define(SCHEMA_WRITE_OPTS, []).
-define(SCHEMA_READ_OPTS, []).
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation().
schema_get_gen(DBHandle, GenId) ->
{ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS),
binary_to_term(Bin).
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}.
schema_put_gen(DBHandle, GenId, Gen) ->
rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined.
schema_get_current(DBHandle) ->
case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of
{ok, Bin} ->
binary_to_integer(Bin);
not_found ->
undefined
end.
-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}.
schema_put_current(DBHandle, GenId) ->
rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS).
-spec schema_gen_key(integer()) -> binary().
schema_gen_key(N) ->
<<"gen", N:32>>.
-undef(CURRENT_GEN).
-undef(SCHEMA_WRITE_OPTS).
-undef(SCHEMA_READ_OPTS).
%% Functions for dealing with the runtime shard metadata:
-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}).
-spec meta_register_gen(emqx_replay:shard(), gen_id(), generation()) -> ok.
meta_register_gen(Shard, GenId, Gen) ->
Gs =
case GenId > 0 of
true -> meta_lookup(Shard, GenId - 1);
false -> []
end,
ok = meta_put(Shard, GenId, [Gen | Gs]),
ok = meta_put(Shard, current, GenId).
-spec meta_lookup_gen(emqx_replay:shard(), emqx_replay:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Shard, Time) ->
% TODO
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
% towards a "no".
Current = meta_lookup(Shard, current),
Gens = meta_lookup(Shard, Current),
find_gen(Time, Current, Gens).
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
{GenId, Gen};
find_gen(Time, GenId, [_Gen | Rest]) ->
find_gen(Time, GenId - 1, Rest).
-spec meta_get_gen(emqx_replay:shard(), gen_id()) -> generation() | undefined.
meta_get_gen(Shard, GenId) ->
case meta_lookup(Shard, GenId, []) of
[Gen | _Older] -> Gen;
[] -> undefined
end.
-spec meta_get_current(emqx_replay:shard()) -> gen_id() | undefined.
meta_get_current(Shard) ->
meta_lookup(Shard, current, undefined).
-spec meta_lookup(emqx_replay:shard(), _K) -> _V.
meta_lookup(Shard, K) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K)).
-spec meta_lookup(emqx_replay:shard(), _K, Default) -> _V | Default.
meta_lookup(Shard, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
-spec meta_put(emqx_replay:shard(), _K, _V) -> ok.
meta_put(Shard, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
-spec meta_erase(emqx_replay:shard()) -> ok.
meta_erase(Shard) ->
[
persistent_term:erase(K)
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard
],
ok.
-undef(PERSISTENT_TERM).
get_next_id(undefined) -> 0;
get_next_id(GenId) -> GenId + 1.
is_gen_valid(Shard, GenId, Since) when GenId > 0 ->
[GenPrev | _] = meta_lookup(Shard, GenId - 1),
case GenPrev of
#{since := SincePrev} when Since > SincePrev ->
ok;
#{} ->
{error, nonmonotonic}
end;
is_gen_valid(_Shard, 0, 0) ->
ok.
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
%% store_cfs(DBHandle, CFRefs) ->
%% lists:foreach(
%% fun({CFName, CFRef}) ->
%% persistent_term:put({self(), CFName}, {DBHandle, CFRef})
%% end,
%% CFRefs).

View File

@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_local_store_sup).
-behaviour(supervisor).
%% API:
-export([start_link/0, start_shard/1, stop_shard/1]).
%% behaviour callbacks:
-export([init/1]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(SUP, ?MODULE).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
-spec start_shard(emqx_replay:shard()) -> supervisor:startchild_ret().
start_shard(Shard) ->
supervisor:start_child(?SUP, shard_child_spec(Shard)).
-spec stop_shard(emqx_replay:shard()) -> ok | {error, _}.
stop_shard(Shard) ->
ok = supervisor:terminate_child(?SUP, Shard),
ok = supervisor:delete_child(?SUP, Shard).
%%================================================================================
%% behaviour callbacks
%%================================================================================
init([]) ->
Children = [],
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 10
},
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal functions
%%================================================================================
-spec shard_child_spec(emqx_replay:shard()) -> supervisor:child_spec().
shard_child_spec(Shard) ->
#{
id => Shard,
start => {emqx_replay_local_store, start_link, [Shard]},
shutdown => 5_000,
restart => permanent,
type => worker
}.

View File

@ -0,0 +1,731 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_message_storage).
%%================================================================================
%% @doc Description of the schema
%%
%% Let us assume that `T' is a topic and `t' is time. These are the two
%% dimensions used to index messages. They can be viewed as
%% "coordinates" of an MQTT message in a 2D space.
%%
%% Oftentimes, when wildcard subscription is used, keys must be
%% scanned in both dimensions simultaneously.
%%
%% Rocksdb allows to iterate over sorted keys very fast. This means we
%% need to map our two-dimentional keys to a single index that is
%% sorted in a way that helps to iterate over both time and topic
%% without having to do a lot of random seeks.
%%
%% == Mapping of 2D keys to rocksdb keys ==
%%
%% We use "zigzag" pattern to store messages, where rocksdb key is
%% composed like like this:
%%
%% |ttttt|TTTTTTTTT|tttt|
%% ^ ^ ^
%% | | |
%% +-------+ | +---------+
%% | | |
%% most significant topic hash least significant
%% bits of timestamp bits of timestamp
%% (a.k.a epoch) (a.k.a time offset)
%%
%% Topic hash is level-aware: each topic level is hashed separately
%% and the resulting hashes are bitwise-concatentated. This allows us
%% to map topics to fixed-length bitstrings while keeping some degree
%% of information about the hierarchy.
%%
%% Next important concept is what we call "epoch". It is time
%% interval determined by the number of least significant bits of the
%% timestamp found at the tail of the rocksdb key.
%%
%% The resulting index is a space-filling curve that looks like
%% this in the topic-time 2D space:
%%
%% T ^ ---->------ |---->------ |---->------
%% | --/ / --/ / --/
%% | -<-/ | -<-/ | -<-/
%% | -/ | -/ | -/
%% | ---->------ | ---->------ | ---->------
%% | --/ / --/ / --/
%% | ---/ | ---/ | ---/
%% | -/ ^ -/ ^ -/
%% | ---->------ | ---->------ | ---->------
%% | --/ / --/ / --/
%% | -<-/ | -<-/ | -<-/
%% | -/ | -/ | -/
%% | ---->------| ---->------| ---------->
%% |
%% -+------------+-----------------------------> t
%% epoch
%%
%% This structure allows to quickly seek to a the first message that
%% was recorded in a certain epoch in a certain topic or a
%% group of topics matching filter like `foo/bar/#`.
%%
%% Due to its structure, for each pair of rocksdb keys K1 and K2, such
%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) >
%% timestamp(K2).
%% That is, replay doesn't reorder messages published in each
%% individual topic.
%%
%% This property doesn't hold between different topics, but it's not deemed
%% a problem right now.
%%
%%================================================================================
%% API:
-export([create_new/3, open/5]).
-export([make_keymapper/1]).
-export([store/5]).
-export([make_iterator/2]).
-export([make_iterator/3]).
-export([next/1]).
-export([preserve_iterator/1]).
-export([restore_iterator/3]).
-export([refresh_iterator/1]).
%% Debug/troubleshooting:
%% Keymappers
-export([
keymapper_info/1,
compute_bitstring/3,
compute_topic_bitmask/2,
compute_time_bitmask/1,
hash/2
]).
%% Keyspace filters
-export([
make_keyspace_filter/2,
compute_initial_seek/1,
compute_next_seek/2,
compute_time_seek/3,
compute_topic_seek/4
]).
-export_type([db/0, iterator/0, schema/0]).
-export_type([options/0]).
-export_type([iteration_options/0]).
-compile(
{inline, [
bitwise_concat/3,
ones/1,
successor/1,
topic_hash_matches/3,
time_matches/3
]}
).
%%================================================================================
%% Type declarations
%%================================================================================
-type topic() :: emqx_replay:topic().
-type time() :: emqx_replay:time().
%% Number of bits
-type bits() :: non_neg_integer().
%% Key of a RocksDB record.
-type key() :: binary().
%% Distribution of entropy among topic levels.
%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits,
%% and _rest of levels_ (if any) get 16 bits.
-type bits_per_level() :: [bits(), ...].
-type options() :: #{
%% Number of bits in a message timestamp.
timestamp_bits := bits(),
%% Number of bits in a key allocated to each level in a message topic.
topic_bits_per_level := bits_per_level(),
%% Maximum granularity of iteration over time.
epoch := time(),
iteration => iteration_options(),
cf_options => emqx_replay_local_store:db_cf_options()
}.
-type iteration_options() :: #{
%% Request periodic iterator refresh.
%% This might be helpful during replays taking a lot of time (e.g. tens of seconds).
%% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not
%% the same as 1000 replayed messages.
iterator_refresh => {every, _NumOperations :: pos_integer()}
}.
%% Persistent configuration of the generation, it is used to create db
%% record when the database is reopened
-record(schema, {keymapper :: keymapper()}).
-opaque schema() :: #schema{}.
-record(db, {
shard :: emqx_replay:shard(),
handle :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle(),
keymapper :: keymapper(),
write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(),
read_options = [] :: emqx_replay_local_store:db_write_options()
}).
-record(it, {
handle :: rocksdb:itr_handle(),
filter :: keyspace_filter(),
cursor :: binary() | undefined,
next_action :: {seek, binary()} | next,
refresh_counter :: {non_neg_integer(), pos_integer()} | undefined
}).
-record(filter, {
keymapper :: keymapper(),
topic_filter :: emqx_topic:words(),
start_time :: integer(),
hash_bitfilter :: integer(),
hash_bitmask :: integer(),
time_bitfilter :: integer(),
time_bitmask :: integer()
}).
% NOTE
% Keymapper decides how to map messages into RocksDB column family keyspace.
-record(keymapper, {
source :: [bitsource(), ...],
bitsize :: bits(),
epoch :: non_neg_integer()
}).
-type bitsource() ::
%% Consume `_Size` bits from timestamp starting at `_Offset`th bit.
%% TODO consistency
{timestamp, _Offset :: bits(), _Size :: bits()}
%% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash.
| {hash, level | levels, _Size :: bits()}.
-opaque db() :: #db{}.
-opaque iterator() :: #it{}.
-type keymapper() :: #keymapper{}.
-type keyspace_filter() :: #filter{}.
%%================================================================================
%% API funcions
%%================================================================================
%% Create a new column family for the generation and a serializable representation of the schema
-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) ->
{schema(), emqx_replay_local_store:cf_refs()}.
create_new(DBHandle, GenId, Options) ->
CFName = data_cf(GenId),
CFOptions = maps:get(cf_options, Options, []),
{ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, CFOptions),
Schema = #schema{keymapper = make_keymapper(Options)},
{Schema, [{CFName, CFHandle}]}.
%% Reopen the database
-spec open(
emqx_replay:shard(),
rocksdb:db_handle(),
emqx_replay_local_store:gen_id(),
emqx_replay_local_store:cf_refs(),
schema()
) ->
db().
open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
{value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
#db{
shard = Shard,
handle = DBHandle,
cf = CFHandle,
keymapper = Keymapper
}.
-spec make_keymapper(options()) -> keymapper().
make_keymapper(#{
timestamp_bits := TimestampBits,
topic_bits_per_level := BitsPerLevel,
epoch := MaxEpoch
}) ->
TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))),
TimestampMSBs = TimestampBits - TimestampLSBs,
NLevels = length(BitsPerLevel),
{LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
Source = lists:flatten([
[{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0],
[{hash, level, Bits} || Bits <- LevelBits],
{hash, levels, TailLevelsBits},
[{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
]),
#keymapper{
source = Source,
bitsize = lists:sum([S || {_, _, S} <- Source]),
epoch = 1 bsl TimestampLSBs
}.
-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
ok | {error, _TODO}.
store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) ->
Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
Value = make_message_value(Topic, MessagePayload),
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-spec make_iterator(db(), emqx_replay:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(DB, Replay) ->
Options = emqx_replay_conf:shard_iteration_options(DB#db.shard),
make_iterator(DB, Replay, Options).
-spec make_iterator(db(), emqx_replay:replay(), iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time
% and call it a day: client violated the contract anyway.
{ok, iterator()} | {error, _TODO}.
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) ->
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
{ok, ITHandle} ->
Filter = make_keyspace_filter(Replay, DB#db.keymapper),
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
{ok, #it{
handle = ITHandle,
filter = Filter,
next_action = {seek, InitialSeek},
refresh_counter = RefreshCounter
}};
Err ->
Err
end.
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
It = maybe_refresh_iterator(It0),
case rocksdb:iterator_move(It#it.handle, It#it.next_action) of
% spec says `{ok, Key}` is also possible but the implementation says it's not
{ok, Key, Value} ->
% Preserve last seen key in the iterator so it could be restored / refreshed later.
ItNext = It#it{cursor = Key},
Bitstring = extract(Key, Keymapper),
case match_next(Bitstring, Value, It#it.filter) of
{_Topic, Payload} ->
{value, Payload, ItNext#it{next_action = next}};
next ->
next(ItNext#it{next_action = next});
NextBitstring when is_integer(NextBitstring) ->
NextSeek = combine(NextBitstring, <<>>, Keymapper),
next(ItNext#it{next_action = {seek, NextSeek}});
none ->
stop_iteration(ItNext)
end;
{error, invalid_iterator} ->
stop_iteration(It);
{error, iterator_closed} ->
{error, closed}
end.
-spec preserve_iterator(iterator()) -> binary().
preserve_iterator(#it{cursor = Cursor}) ->
State = #{
v => 1,
cursor => Cursor
},
term_to_binary(State).
-spec restore_iterator(db(), emqx_replay:replay(), binary()) ->
{ok, iterator()} | {error, _TODO}.
restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
State = binary_to_term(Serial),
restore_iterator(DB, Replay, State);
restore_iterator(DB, Replay, #{
v := 1,
cursor := Cursor
}) ->
case make_iterator(DB, Replay) of
{ok, It} when Cursor == undefined ->
% Iterator was preserved right after it has been made.
{ok, It};
{ok, It} ->
% Iterator was preserved mid-replay, seek right past the last seen key.
{ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}};
Err ->
Err
end.
-spec refresh_iterator(iterator()) -> iterator().
refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) ->
case rocksdb:iterator_refresh(Handle) of
ok when Action =:= next ->
% Now the underlying iterator is invalid, need to seek instead.
It#it{next_action = {seek, successor(Cursor)}};
ok ->
% Now the underlying iterator is invalid, but will seek soon anyway.
It;
{error, _} ->
% Implementation could in theory return an {error, ...} tuple.
% Supposedly our best bet is to ignore it.
% TODO logging?
It
end.
%%================================================================================
%% Internal exports
%%================================================================================
-spec keymapper_info(keymapper()) ->
#{source := [bitsource()], bitsize := bits(), epoch := time()}.
keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) ->
#{source => Source, bitsize => Bitsize, epoch => Epoch}.
make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
make_message_value(Topic, MessagePayload) ->
term_to_binary({Topic, MessagePayload}).
unwrap_message_value(Binary) ->
binary_to_term(Binary).
-spec combine(_Bitstring :: integer(), emqx_guid:guid() | <<>>, keymapper()) ->
key().
combine(Bitstring, MessageID, #keymapper{bitsize = Size}) ->
<<Bitstring:Size/integer, MessageID/binary>>.
-spec extract(key(), keymapper()) ->
_Bitstring :: integer().
extract(Key, #keymapper{bitsize = Size}) ->
<<Bitstring:Size/integer, _MessageID/binary>> = Key,
Bitstring.
-spec compute_bitstring(topic(), time(), keymapper()) -> integer().
compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) ->
compute_bitstring(Topic, Timestamp, Source, 0).
-spec compute_topic_bitmask(emqx_topic:words(), keymapper()) -> integer().
compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) ->
compute_topic_bitmask(TopicFilter, Source, 0).
-spec compute_time_bitmask(keymapper()) -> integer().
compute_time_bitmask(#keymapper{source = Source}) ->
compute_time_bitmask(Source, 0).
-spec hash(term(), bits()) -> integer().
hash(Input, Bits) ->
% at most 32 bits
erlang:phash2(Input, 1 bsl Bits).
-spec make_keyspace_filter(emqx_replay:replay(), keymapper()) -> keyspace_filter().
make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
TimeBitmask = compute_time_bitmask(Keymapper),
HashBitfilter = Bitstring band HashBitmask,
TimeBitfilter = Bitstring band TimeBitmask,
#filter{
keymapper = Keymapper,
topic_filter = TopicFilter,
start_time = StartTime,
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}.
-spec compute_initial_seek(keyspace_filter()) -> integer().
compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) ->
% Should be the same as `compute_initial_seek(0, Filter)`.
HashBitfilter bor TimeBitfilter.
-spec compute_next_seek(integer(), keyspace_filter()) -> integer().
compute_next_seek(
Bitstring,
Filter = #filter{
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}
) ->
HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter).
%%================================================================================
%% Internal functions
%%================================================================================
compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) ->
I = (Timestamp bsr Offset) band ones(Size),
compute_bitstring(Topic, Timestamp, Rest, bitwise_concat(Acc, I, Size));
compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) ->
I = hash(<<"/">>, Size),
compute_bitstring([], Timestamp, Rest, bitwise_concat(Acc, I, Size));
compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) ->
I = hash(Level, Size),
compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size));
compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) ->
I = hash(Tail, Size),
compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size));
compute_bitstring(_, _, [], Acc) ->
Acc.
compute_topic_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) ->
compute_topic_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size));
compute_topic_bitmask(['#'], [{hash, _, Size} | Rest], Acc) ->
compute_topic_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size));
compute_topic_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) ->
compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size));
compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) ->
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) ->
Mask =
case lists:member('+', Tail) orelse lists:member('#', Tail) of
true -> 0;
false -> ones(Size)
end,
compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size));
compute_topic_bitmask(_, [], Acc) ->
Acc.
compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) ->
compute_time_bitmask(Rest, bitwise_concat(Acc, ones(Size), Size));
compute_time_bitmask([{hash, _, Size} | Rest], Acc) ->
compute_time_bitmask(Rest, bitwise_concat(Acc, 0, Size));
compute_time_bitmask([], Acc) ->
Acc.
bitwise_concat(Acc, Item, ItemSize) ->
(Acc bsl ItemSize) bor Item.
ones(Bits) ->
1 bsl Bits - 1.
-spec successor(key()) -> key().
successor(Key) ->
<<Key/binary, 0:8>>.
%% |123|345|678|
%% foo bar baz
%% |123|000|678| - |123|fff|678|
%% foo + baz
%% |fff|000|fff|
%% |123|000|678|
%% |123|056|678| & |fff|000|fff| = |123|000|678|.
match_next(
Bitstring,
Value,
Filter = #filter{
topic_filter = TopicFilter,
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}
) ->
HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
case HashMatches and TimeMatches of
true ->
Message = {Topic, _Payload} = unwrap_message_value(Value),
case emqx_topic:match(Topic, TopicFilter) of
true ->
Message;
false ->
next
end;
false ->
compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter)
end.
%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
compute_next_seek(
_HashMatches = false,
_TimeMatches,
Bitstring,
Filter = #filter{
keymapper = Keymapper,
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}
) ->
NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper),
case NextBitstring of
none ->
none;
_ ->
TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask),
compute_next_seek(true, TimeMatches, NextBitstring, Filter)
end;
%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
compute_next_seek(
_HashMatches = true,
_TimeMatches = false,
Bitstring,
#filter{
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}
) ->
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask);
compute_next_seek(true, true, Bitstring, _It) ->
Bitstring.
topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) ->
(Bitstring band HashBitmask) == HashBitfilter.
time_matches(Bitstring, TimeBitfilter, TimeBitmask) ->
(Bitstring band TimeBitmask) >= TimeBitfilter.
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
% Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
(Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
%% Find the closest bitstring which is:
%% * greater than `Bitstring`,
%% * and falls into the hash space defined by `HashBitfilter`.
%% Note that the result can end up "back" in time and out of the time range.
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
Sources = Keymapper#keymapper.source,
Size = Keymapper#keymapper.bitsize,
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
% NOTE
% We're iterating through `Substring` here, in lockstep with `HashBitfilter`
% and `HashBitmask`, starting from least signigicant bits. Each bitsource in
% `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
% bits long which we interpret as a "digit". There are 2 flavors of those
% "digits":
% * regular digit with 2^S possible values,
% * degenerate digit with exactly 1 possible value U (represented with 0).
% Our goal here is to find a successor of `Bistring` and perform a kind of
% digit-by-digit addition operation with carry propagation.
NextSeek = zipfoldr3(
fun(Source, Substring, Filter, LBitmask, Offset, Acc) ->
case Source of
{hash, _, S} when LBitmask =:= 0 ->
% Regular case
bitwise_add_digit(Substring, Acc, S, Offset);
{hash, _, _} when LBitmask =/= 0, Substring < Filter ->
% Degenerate case, I_digit < U, no overflow.
% Successor is `U bsl Offset` which is equivalent to 0.
0;
{hash, _, S} when LBitmask =/= 0, Substring > Filter ->
% Degenerate case, I_digit > U, overflow.
% Successor is `(1 bsl Size + U) bsl Offset`.
overflow_digit(S, Offset);
{hash, _, S} when LBitmask =/= 0 ->
% Degenerate case, I_digit = U
% Perform digit addition with I_digit = 0, assuming "digit" has
% 0 bits of information (but is `S` bits long at the same time).
% This will overflow only if the result of previous iteration
% was an overflow.
bitwise_add_digit(0, Acc, 0, S, Offset);
{timestamp, _, S} ->
% Regular case
bitwise_add_digit(Substring, Acc, S, Offset)
end
end,
0,
Bitstring,
HashBitfilter,
HashBitmask,
Size,
Sources
),
case NextSeek bsr Size of
_Carry = 0 ->
% Found the successor.
% We need to recover values of those degenerate digits which we
% represented with 0 during digit-by-digit iteration.
NextSeek bor (HashBitfilter band HashBitmask);
_Carry = 1 ->
% We got "carried away" past the range, time to stop iteration.
none
end.
bitwise_add_digit(Digit, Number, Width, Offset) ->
bitwise_add_digit(Digit, Number, Width, Width, Offset).
%% Add "digit" (represented with integer `Digit`) to the `Number` assuming
%% this digit starts at `Offset` bits in `Number` and is `Width` bits long.
%% Perform an overflow if the result of addition would not fit into `Bits`
%% bits.
bitwise_add_digit(Digit, Number, Bits, Width, Offset) ->
Sum = (Digit bsl Offset) + Number,
case (Sum bsr Offset) < (1 bsl Bits) of
true -> Sum;
false -> overflow_digit(Width, Offset)
end.
%% Constuct a number which denotes an overflow of digit that starts at
%% `Offset` bits and is `Width` bits long.
overflow_digit(Width, Offset) ->
(1 bsl Width) bsl Offset.
%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least
%% significant bits first.
%%
%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are
%% specified in `Sources` list, in order from most significant bits to least
%% significant. Each iteration calls `FoldFun` with:
%% * bitsource that was used to extract sub-bitstrings,
%% * 3 sub-bitstrings in integer representation,
%% * bit offset into integers,
%% * current accumulator.
-spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) ->
Acc
when
FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc).
zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
Acc;
zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
OffsetNext = Offset - S,
AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest),
FoldFun(
Source,
substring(I1, OffsetNext, S),
substring(I2, OffsetNext, S),
substring(I3, OffsetNext, S),
OffsetNext,
AccNext
).
substring(I, Offset, Size) ->
(I bsr Offset) band ones(Size).
%% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_replay_local_store:gen_id()) -> [char()].
data_cf(GenId) ->
?MODULE_STRING ++ integer_to_list(GenId).
make_refresh_counter({every, N}) when is_integer(N), N > 0 ->
{0, N};
make_refresh_counter(undefined) ->
undefined.
maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) ->
refresh_iterator(It#it{refresh_counter = {0, N}});
maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) ->
It#it{refresh_counter = {M + 1, N}};
maybe_refresh_iterator(It = #it{refresh_counter = undefined}) ->
It.
stop_iteration(It) ->
ok = rocksdb:iterator_close(It#it.handle),
none.

View File

@ -0,0 +1,52 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_sup).
-behaviour(supervisor).
%% API:
-export([start_link/0]).
%% behaviour callbacks:
-export([init/1]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(SUP, ?MODULE).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
%%================================================================================
%% behaviour callbacks
%%================================================================================
init([]) ->
Children = [shard_sup()],
SupFlags = #{
strategy => one_for_all,
intensity => 0,
period => 1
},
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal functions
%%================================================================================
shard_sup() ->
#{
id => local_store_shard_sup,
start => {emqx_replay_local_store_sup, start_link, []},
restart => permanent,
type => supervisor,
shutdown => infinity
}.

View File

@ -0,0 +1,276 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_local_store_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG,
{emqx_replay_message_storage, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 5}
}
}}
).
-define(COMPACT_CONFIG,
{emqx_replay_message_storage, #{
timestamp_bits => 16,
topic_bits_per_level => [16, 16],
epoch => 10
}}
).
%% Smoke test for opening and reopening the database
t_open(_Config) ->
ok = emqx_replay_local_store_sup:stop_shard(?SHARD),
{ok, _} = emqx_replay_local_store_sup:start_shard(?SHARD).
%% Smoke test of store function
t_store(_Config) ->
MessageID = emqx_guid:gen(),
PublishedAt = 1000,
Topic = [<<"foo">>, <<"bar">>],
Payload = <<"message">>,
?assertMatch(ok, emqx_replay_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
%% Smoke test for iteration through a concrete topic
t_iterate(_Config) ->
%% Prepare data:
Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]],
Timestamps = lists:seq(1, 10),
[
emqx_replay_local_store:store(
?SHARD,
emqx_guid:gen(),
PublishedAt,
Topic,
integer_to_binary(PublishedAt)
)
|| Topic <- Topics, PublishedAt <- Timestamps
],
%% Iterate through individual topics:
[
begin
{ok, It} = emqx_replay_local_store:make_iterator(?SHARD, {Topic, 0}),
Values = iterate(It),
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
end
|| Topic <- Topics
],
ok.
%% Smoke test for iteration with wildcard topic filter
t_iterate_wildcard(_Config) ->
%% Prepare data:
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 10),
_ = [
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
),
?assertEqual(
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)])
),
ok.
t_iterate_long_tail_wildcard(_Config) ->
Topic = "b/c/d/e/f/g",
TopicFilter = "b/c/d/e/+/+",
Timestamps = lists:seq(1, 100),
_ = [
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)])
).
t_create_gen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz"],
Timestamps = lists:seq(1, 100),
[
?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>))
|| Topic <- Topics, PublishedAt <- Timestamps
].
t_iterate_multigen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 100),
_ = [
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
).
t_iterate_multigen_preserve_restore(_Config) ->
ReplayID = atom_to_binary(?FUNCTION_NAME),
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
Timestamps = lists:seq(1, 100),
TopicFilter = "foo/#",
TopicsMatching = ["foo/bar", "foo/bar/baz"],
_ = [
store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
|| Topic <- Topics, TS <- Timestamps
],
It0 = iterator(?SHARD, TopicFilter, 0),
{It1, Res10} = iterate(It0, 10),
% preserve mid-generation
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID),
{ok, It2} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID),
{It3, Res100} = iterate(It2, 88),
% preserve on the generation boundary
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID),
{ok, It4} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID),
{It5, Res200} = iterate(It4, 1000),
?assertEqual(none, It5),
?assertEqual(
lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
),
?assertEqual(
ok,
emqx_replay_local_store:discard_iterator(?SHARD, ReplayID)
),
?assertEqual(
{error, not_found},
emqx_replay_local_store:restore_iterator(?SHARD, ReplayID)
).
store(Shard, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
iterate(DB, TopicFilter, StartTime) ->
iterate(iterator(DB, TopicFilter, StartTime)).
iterate(It) ->
case emqx_replay_local_store:next(It) of
{value, Payload, ItNext} ->
[Payload | iterate(ItNext)];
none ->
[]
end.
iterate(It, 0) ->
{It, []};
iterate(It, N) ->
case emqx_replay_local_store:next(It) of
{value, Payload, ItNext} ->
{ItFinal, Ps} = iterate(ItNext, N - 1),
{ItFinal, [Payload | Ps]};
none ->
{none, []}
end.
iterator(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
It.
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
Topic;
parse_topic(Topic) ->
emqx_topic:words(iolist_to_binary(Topic)).
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_replay),
Config.
end_per_suite(_Config) ->
ok = application:stop(emqx_replay).
init_per_testcase(TC, Config) ->
ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_replay_local_store_sup:start_shard(shard(TC)),
Config.
end_per_testcase(TC, _Config) ->
ok = emqx_replay_local_store_sup:stop_shard(shard(TC)).
shard(TC) ->
list_to_binary(lists:concat([?MODULE, "_", TC])).
set_shard_config(Shard, Config) ->
ok = application:set_env(emqx_replay, shard_config, #{Shard => Config}).

View File

@ -0,0 +1,188 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_message_storage_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("stdlib/include/assert.hrl").
-import(emqx_replay_message_storage, [
make_keymapper/1,
keymapper_info/1,
compute_topic_bitmask/2,
compute_time_bitmask/1,
compute_topic_seek/4
]).
all() -> emqx_common_test_helpers:all(?MODULE).
t_make_keymapper(_) ->
?assertMatch(
#{
source := [
{timestamp, 9, 23},
{hash, level, 2},
{hash, level, 4},
{hash, levels, 8},
{timestamp, 0, 9}
],
bitsize := 46,
epoch := 512
},
keymapper_info(
make_keymapper(#{
timestamp_bits => 32,
topic_bits_per_level => [2, 4, 8],
epoch => 1000
})
)
).
t_make_keymapper_single_hash_level(_) ->
?assertMatch(
#{
source := [
{timestamp, 0, 32},
{hash, levels, 16}
],
bitsize := 48,
epoch := 1
},
keymapper_info(
make_keymapper(#{
timestamp_bits => 32,
topic_bits_per_level => [16],
epoch => 1
})
)
).
t_make_keymapper_no_timestamp(_) ->
?assertMatch(
#{
source := [
{hash, level, 4},
{hash, level, 8},
{hash, levels, 16}
],
bitsize := 28,
epoch := 1
},
keymapper_info(
make_keymapper(#{
timestamp_bits => 0,
topic_bits_per_level => [4, 8, 16],
epoch => 42
})
)
).
t_compute_topic_bitmask(_) ->
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
?assertEqual(
2#111_1111_11111_11,
compute_topic_bitmask([<<"foo">>, <<"bar">>], KM)
),
?assertEqual(
2#111_0000_11111_11,
compute_topic_bitmask([<<"foo">>, '+'], KM)
),
?assertEqual(
2#111_0000_00000_11,
compute_topic_bitmask([<<"foo">>, '+', '+'], KM)
),
?assertEqual(
2#111_0000_11111_00,
compute_topic_bitmask([<<"foo">>, '+', <<"bar">>, '+'], KM)
).
t_compute_topic_bitmask_wildcard(_) ->
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
?assertEqual(
2#000_0000_00000_00,
compute_topic_bitmask(['#'], KM)
),
?assertEqual(
2#111_0000_00000_00,
compute_topic_bitmask([<<"foo">>, '#'], KM)
),
?assertEqual(
2#111_1111_11111_00,
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'], KM)
).
t_compute_topic_bitmask_wildcard_long_tail(_) ->
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
?assertEqual(
2#111_1111_11111_11,
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, <<"xyzzy">>], KM)
),
?assertEqual(
2#111_1111_11111_00,
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'], KM)
).
t_compute_time_bitmask(_) ->
KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 200}),
?assertEqual(2#111_000000_1111111, compute_time_bitmask(KM)).
t_compute_time_bitmask_epoch_only(_) ->
KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 1}),
?assertEqual(2#1111111111_000000, compute_time_bitmask(KM)).
%% Filter = |123|***|678|***|
%% Mask = |123|***|678|***|
%% Key1 = |123|011|108|121| Seek = 0 |123|011|678|000|
%% Key2 = |123|011|679|919| Seek = 0 |123|012|678|000|
%% Key3 = |123|999|679|001| Seek = 1 |123|000|678|000| eos
%% Key4 = |125|011|179|017| Seek = 1 |123|000|678|000| eos
t_compute_next_topic_seek(_) ->
KM = make_keymapper(#{topic_bits_per_level => [8, 8, 16, 12], timestamp_bits => 0, epoch => 1}),
?assertMatch(
none,
compute_topic_seek(
16#FD_42_4242_043,
16#FD_42_4242_042,
16#FF_FF_FFFF_FFF,
KM
)
),
?assertMatch(
16#FD_11_0678_000,
compute_topic_seek(
16#FD_11_0108_121,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
),
?assertMatch(
16#FD_12_0678_000,
compute_topic_seek(
16#FD_11_0679_919,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
),
?assertMatch(
none,
compute_topic_seek(
16#FD_FF_0679_001,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
),
?assertMatch(
none,
compute_topic_seek(
16#FE_11_0179_017,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
).

View File

@ -0,0 +1,46 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_replay_message_storage_shim).
-export([open/0]).
-export([close/1]).
-export([store/5]).
-export([iterate/2]).
-type topic() :: list(binary()).
-type time() :: integer().
-opaque t() :: ets:tid().
-spec open() -> t().
open() ->
ets:new(?MODULE, [ordered_set, {keypos, 1}]).
-spec close(t()) -> ok.
close(Tab) ->
true = ets:delete(Tab),
ok.
-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) ->
ok | {error, _TODO}.
store(Tab, MessageID, PublishedAt, Topic, Payload) ->
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
ok.
-spec iterate(t(), emqx_replay:replay()) ->
[binary()].
iterate(Tab, {TopicFilter, StartTime}) ->
ets:foldr(
fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
case emqx_topic:match(Topic, TopicFilter) of
true when PublishedAt >= StartTime ->
[Payload | Acc];
_ ->
Acc
end
end,
[],
Tab
).

View File

@ -0,0 +1,377 @@
%% @doc This module provides lazy, composable producer streams that
%% can be considered counterparts to Archiver's consumer pipes and
%% therefore can facilitate testing
%%
%% Also it comes with an implementation of binary data stream which is
%% able to produce sufficiently large amounts of plausibly
%% pseudorandom binary payload in a deterministic way. It also
%% contains routines to check binary blobs via sampling
-module(payload_gen).
-define(end_of_stream, []).
-dialyzer(no_improper_lists).
%% Generic stream API:
-export([
interleave_streams/1,
retransmits/2,
next/1,
consume/2,
consume/1
]).
%% Binary payload generator API:
-export([
interleave_chunks/2,
interleave_chunks/1,
mb/1,
generator_fun/2,
generate_chunks/3,
generate_chunk/2,
check_consistency/3,
check_file_consistency/3,
get_byte/2
]).
%% List to stream generator API:
-export([list_to_stream/1]).
%% Proper generators:
-export([
binary_stream_gen/1,
interleaved_streams_gen/1,
interleaved_binary_gen/1,
interleaved_list_gen/1
]).
-export_type([payload/0, binary_payload/0]).
-define(hash_size, 16).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-type payload() :: {Seed :: term(), Size :: integer()}.
-type binary_payload() :: {
binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
}.
%% For performance reasons we treat regular lists as streams, see `next/1'
-opaque cont(Data) ::
fun(() -> stream(Data))
| stream(Data).
-type stream(Data) ::
maybe_improper_list(Data, cont(Data))
| ?end_of_stream.
-type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}).
-record(chunk_state, {
seed :: term(),
payload_size :: non_neg_integer(),
offset :: non_neg_integer(),
chunk_size :: non_neg_integer()
}).
-opaque chunk_state() :: #chunk_state{}.
-record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}).
-opaque interleave_state() :: #interleave_state{}.
%% =============================================================================
%% API functions
%% =============================================================================
%% -----------------------------------------------------------------------------
%% Proper generators
%% -----------------------------------------------------------------------------
%% @doc Proper generator that creates a binary stream
-spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type().
binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
?LET(
{Seed, Size},
{nat(), range(1, 16#100000)},
generate_chunk({Seed, Size}, ChunkSize)
).
%% @equiv interleaved_streams_gen(10, Type)
-spec interleaved_streams_gen(proper_types:type()) -> proper_types:type().
interleaved_streams_gen(Type) ->
interleaved_streams_gen(10, Type).
%% @doc Proper generator that creates a term of type
%% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed
%% into `interleave_streams/1' function
-spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) ->
proper_types:type().
interleaved_streams_gen(MaxNStreams, StreamType) ->
?LET(
NStreams,
range(1, MaxNStreams),
?LET(
Streams,
vector(NStreams, StreamType),
begin
Tags = [<<I/integer>> || I <- lists:seq(1, length(Streams))],
lists:zip(Tags, Streams)
end
)
).
-spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type().
interleaved_binary_gen(ChunkSize) ->
interleaved_streams_gen(binary_stream_gen(ChunkSize)).
-spec interleaved_list_gen(proper_types:type()) -> proper_types:type().
interleaved_list_gen(Type) ->
interleaved_streams_gen(non_empty(list(Type))).
%% -----------------------------------------------------------------------------
%% Generic streams
%% -----------------------------------------------------------------------------
%% @doc Consume one element from the stream.
-spec next(cont(A)) -> stream(A).
next(Fun) when is_function(Fun, 0) ->
Fun();
next(L) ->
L.
%% @doc Take a list of tagged streams and return a stream where
%% elements of the streams are tagged and randomly interleaved.
%%
%% Note: this function is more or less generic and it's compatible
%% with this module's `generate_chunks' function family, as well as
%% `ets:next', lists and what not
%%
%% Consider using simplified versions of this function
-spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}).
interleave_streams(Streams) ->
do_interleave_streams(
#interleave_state{streams = Streams}
).
%% @doc Take an arbitrary stream and add repetitions of the elements
%% TODO: Make retransmissions of arbitrary length
-spec retransmits(stream(Data), float()) -> stream(Data).
retransmits(Stream, Probability) ->
case Stream of
[Data | Cont0] ->
Cont = fun() -> retransmits(next(Cont0), Probability) end,
case rand:uniform() < Probability of
true -> [Data, Data | Cont];
false -> [Data | Cont]
end;
?end_of_stream ->
?end_of_stream
end.
%% @doc Consume all elements of the stream and feed them into a
%% callback (e.g. brod:produce)
-spec consume(
stream(A),
fun((A) -> Ret)
) -> [Ret].
consume(Stream, Callback) ->
case Stream of
[Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)];
?end_of_stream -> []
end.
%% @equiv consume(Stream, fun(A) -> A end)
-spec consume(stream(A)) -> [A].
consume(Stream) ->
consume(Stream, fun(A) -> A end).
%% -----------------------------------------------------------------------------
%% Misc functions
%% -----------------------------------------------------------------------------
%% @doc Return number of bytes in `N' megabytes
-spec mb(integer()) -> integer().
mb(N) ->
N * 1048576.
%% -----------------------------------------------------------------------------
%% List streams
%% -----------------------------------------------------------------------------
-spec list_to_stream([A]) -> stream(A).
list_to_stream(L) -> L.
%% -----------------------------------------------------------------------------
%% Binary streams
%% -----------------------------------------------------------------------------
%% @doc First argument is a chunk number, the second one is a seed.
%% This implementation is hardly efficient, but it was chosen for
%% clarity reasons
-spec generator_fun(integer(), binary()) -> binary().
generator_fun(N, Seed) ->
crypto:hash(md5, <<N:32, Seed/binary>>).
%% @doc Get byte at offset `N'
-spec get_byte(integer(), term()) -> byte().
get_byte(N, Seed) ->
do_get_byte(N, seed_hash(Seed)).
%% @doc Stream of binary chunks. Limitation: both payload size and
%% `ChunkSize' should be dividable by `?hash_size'
-spec generate_chunk(payload(), integer()) -> stream(binary_payload()).
generate_chunk({Seed, Size}, ChunkSize) when
ChunkSize rem ?hash_size =:= 0
->
State = #chunk_state{
seed = Seed,
payload_size = Size,
chunk_size = ChunkSize,
offset = 0
},
generate_chunk(State).
%% @doc Take a list of `payload()'s and a callback function, and start
%% producing the payloads in random order. Seed is used as a tag
%% @see interleave_streams/4
-spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) ->
tagged_binstream().
interleave_chunks(Streams0) ->
Streams = [
{Tag, generate_chunk(Payload, ChunkSize)}
|| {Payload = {Tag, _}, ChunkSize} <- Streams0
],
interleave_streams(Streams).
%% @doc Take a list of `payload()'s and a callback function, and start
%% consuming the payloads in a random order. Seed is used as a
%% tag. All streams use the same chunk size
%% @see interleave_streams/2
-spec interleave_chunks(
[payload()],
non_neg_integer()
) -> tagged_binstream().
interleave_chunks(Streams0, ChunkSize) ->
Streams = [
{Seed, generate_chunk({Seed, Size}, ChunkSize)}
|| {Seed, Size} <- Streams0
],
interleave_streams(Streams).
%% @doc Generate chunks of data and feed them into
%% `Callback'
-spec generate_chunks(
payload(),
integer(),
fun((binary()) -> A)
) -> [A].
generate_chunks(Payload, ChunkSize, Callback) ->
consume(generate_chunk(Payload, ChunkSize), Callback).
-spec check_consistency(
payload(),
integer(),
fun((integer()) -> {ok, binary()} | undefined)
) -> ok.
check_consistency({Seed, Size}, SampleSize, Callback) ->
SeedHash = seed_hash(Seed),
Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
%% Always check first and last bytes, and one that should not exist:
Samples = [0, Size - 1, Size | Random],
lists:foreach(
fun
(N) when N < Size ->
Expected = do_get_byte(N, SeedHash),
?assertEqual(
{N, {ok, Expected}},
{N, Callback(N)}
);
(N) ->
?assertMatch(undefined, Callback(N))
end,
Samples
).
-spec check_file_consistency(
payload(),
integer(),
file:filename()
) -> ok.
check_file_consistency(Payload, SampleSize, FileName) ->
{ok, FD} = file:open(FileName, [read, raw]),
try
Fun = fun(N) ->
case file:pread(FD, [{N, 1}]) of
{ok, [[X]]} -> {ok, X};
{ok, [eof]} -> undefined
end
end,
check_consistency(Payload, SampleSize, Fun)
after
file:close(FD)
end.
%% =============================================================================
%% Internal functions
%% =============================================================================
-spec do_interleave_streams(interleave_state()) -> stream(_Data).
do_interleave_streams(#interleave_state{streams = []}) ->
?end_of_stream;
do_interleave_streams(#interleave_state{streams = Streams} = State0) ->
%% Not the most efficient implementation (lots of avoidable list
%% traversals), but we don't expect the number of streams to be the
%% bottleneck
N = rand:uniform(length(Streams)),
{Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams),
case SC of
[Payload | SC1] ->
State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]},
Cont = fun() -> do_interleave_streams(State) end,
[{Tag, Payload} | Cont];
?end_of_stream ->
State = State0#interleave_state{streams = Hd ++ Tl},
do_interleave_streams(State)
end.
%% @doc Continue generating chunks
-spec generate_chunk(chunk_state()) -> stream(binary()).
generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when
Offset >= Size
->
?end_of_stream;
generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
State = State0#chunk_state{offset = Offset + ChunkSize},
Payload = generate_chunk(
State#chunk_state.seed,
Offset,
ChunkSize,
State#chunk_state.payload_size
),
[Payload | fun() -> generate_chunk(State) end].
generate_chunk(Seed, Offset, ChunkSize, Size) ->
SeedHash = seed_hash(Seed),
To = min(Offset + ChunkSize, Size) - 1,
Payload = iolist_to_binary([
generator_fun(I, SeedHash)
|| I <- lists:seq(Offset div 16, To div 16)
]),
ChunkNum = Offset div ChunkSize + 1,
ChunkCnt = ceil(Size / ChunkSize),
{Payload, ChunkNum, ChunkCnt}.
%% @doc Hash any term
-spec seed_hash(term()) -> binary().
seed_hash(Seed) ->
crypto:hash(md5, term_to_binary(Seed)).
%% @private Get byte at offset `N'
-spec do_get_byte(integer(), binary()) -> byte().
do_get_byte(N, Seed) ->
Chunk = generator_fun(N div ?hash_size, Seed),
binary:at(Chunk, N rem ?hash_size).

View File

@ -0,0 +1,464 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(prop_replay_message_storage).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(WORK_DIR, ["_build", "test"]).
-define(RUN_ID, {?MODULE, testrun_id}).
-define(ZONE, ?MODULE).
-define(GEN_ID, 42).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_bitstring_computes() ->
?FORALL(
Keymapper,
keymapper(),
?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper)))
end)
).
prop_topic_bitmask_computes() ->
Keymapper = make_keymapper(16, [8, 12, 16], 100),
?FORALL(TopicFilter, topic_filter(), begin
Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper),
% topic bits + timestamp LSBs
is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
end).
prop_next_seek_monotonic() ->
?FORALL(
{TopicFilter, StartTime, Keymapper},
{topic_filter(), pos_integer(), keymapper()},
begin
Filter = emqx_replay_message_storage:make_keyspace_filter(
{TopicFilter, StartTime},
Keymapper
),
?FORALL(
Bitstring,
bitstr(get_keymapper_bitsize(Keymapper)),
emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring
)
end
).
prop_next_seek_eq_initial_seek() ->
?FORALL(
Filter,
keyspace_filter(),
emqx_replay_message_storage:compute_initial_seek(Filter) =:=
emqx_replay_message_storage:compute_next_seek(0, Filter)
).
prop_iterate_messages() ->
TBPL = [4, 8, 12],
Options = #{
timestamp_bits => 32,
topic_bits_per_level => TBPL,
epoch => 200
},
% TODO
% Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit.
?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin
Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)),
{DB, Handle} = open_db(Filepath, Options),
Shim = emqx_replay_message_storage_shim:open(),
ok = store_db(DB, Stream),
ok = store_shim(Shim, Stream),
?FORALL(
{
{Topic, _},
Pattern,
StartTime
},
{
nth(Stream),
topic_filter_pattern(),
start_time()
},
begin
TopicFilter = make_topic_filter(Pattern, Topic),
Iteration = {TopicFilter, StartTime},
Messages = iterate_db(DB, Iteration),
Reference = iterate_shim(Shim, Iteration),
ok = close_db(Handle),
ok = emqx_replay_message_storage_shim:close(Shim),
?WHENFAIL(
begin
io:format(user, " *** Filepath = ~s~n", [Filepath]),
io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]),
io:format(user, " *** StartTime = ~p~n", [StartTime])
end,
is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages)
)
end
)
end).
prop_iterate_eq_iterate_with_preserve_restore() ->
TBPL = [4, 8, 16, 12],
Options = #{
timestamp_bits => 32,
topic_bits_per_level => TBPL,
epoch => 500
},
{DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
% TODO
% This proptest is impure because messages from testruns assumed to be
% independent of each other are accumulated in the same storage. This
% would probably confuse shrinker in the event a testrun fails.
ok = store_db(DB, Stream),
?FORALL(
{
{Topic, _},
Pat,
StartTime,
Commands
},
{
nth(Stream),
topic_filter_pattern(),
start_time(),
shuffled(flat([non_empty(list({preserve, restore})), list(iterate)]))
},
begin
Replay = {make_topic_filter(Pat, Topic), StartTime},
Iterator = make_iterator(DB, Replay),
Ctx = #{db => DB, replay => Replay},
Messages = run_iterator_commands(Commands, Iterator, Ctx),
equals(Messages, iterate_db(DB, Replay))
end
)
end).
prop_iterate_eq_iterate_with_refresh() ->
TBPL = [4, 8, 16, 12],
Options = #{
timestamp_bits => 32,
topic_bits_per_level => TBPL,
epoch => 500
},
{DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
% TODO
% This proptest is also impure, see above.
ok = store_db(DB, Stream),
?FORALL(
{
{Topic, _},
Pat,
StartTime,
RefreshEvery
},
{
nth(Stream),
topic_filter_pattern(),
start_time(),
pos_integer()
},
?TIMEOUT(5000, begin
Replay = {make_topic_filter(Pat, Topic), StartTime},
IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
Iterator = make_iterator(DB, Replay, IterationOptions),
Messages = iterate_db(Iterator),
equals(Messages, iterate_db(DB, Replay))
end)
)
end).
% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
% MessageID = emqx_guid:gen(),
% PublishedAt = ChunkNum,
% MessageID, PublishedAt, Topic
% ]),
% ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload),
% store_message_stream(DB, payload_gen:next(Rest));
% store_message_stream(_Zone, []) ->
% ok.
store_db(DB, Messages) ->
lists:foreach(
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
Bin = term_to_binary(Payload),
emqx_replay_message_storage:store(DB, MessageID, Timestamp, Topic, Bin)
end,
Messages
).
iterate_db(DB, Iteration) ->
iterate_db(make_iterator(DB, Iteration)).
iterate_db(It) ->
case emqx_replay_message_storage:next(It) of
{value, Payload, ItNext} ->
[binary_to_term(Payload) | iterate_db(ItNext)];
none ->
[]
end.
make_iterator(DB, Replay) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay),
It.
make_iterator(DB, Replay, Options) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay, Options),
It.
run_iterator_commands([iterate | Rest], It, Ctx) ->
case emqx_replay_message_storage:next(It) of
{value, Payload, ItNext} ->
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)];
none ->
[]
end;
run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
#{
db := DB,
replay := Replay
} = Ctx,
Serial = emqx_replay_message_storage:preserve_iterator(It),
{ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Replay, Serial),
run_iterator_commands(Rest, ItNext, Ctx);
run_iterator_commands([], It, _Ctx) ->
iterate_db(It).
store_shim(Shim, Messages) ->
lists:foreach(
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
Bin = term_to_binary(Payload),
emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
end,
Messages
).
iterate_shim(Shim, Iteration) ->
lists:map(
fun binary_to_term/1,
emqx_replay_message_storage_shim:iterate(Shim, Iteration)
).
%%--------------------------------------------------------------------
%% Setup / teardown
%%--------------------------------------------------------------------
open_db(Filepath, Options) ->
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
{Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options),
DB = emqx_replay_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
{DB, Handle}.
close_db(Handle) ->
rocksdb:close(Handle).
make_filepath(TC) ->
make_filepath(TC, 0).
make_filepath(TC, InstID) ->
Name = io_lib:format("~0p.~0p", [TC, InstID]),
Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]),
ok = filelib:ensure_dir(Path),
Path.
get_run_id() ->
case persistent_term:get(?RUN_ID, undefined) of
RunID when RunID /= undefined ->
RunID;
undefined ->
RunID = make_run_id(),
ok = persistent_term:put(?RUN_ID, RunID),
RunID
end.
make_run_id() ->
calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]).
%%--------------------------------------------------------------------
%% Type generators
%%--------------------------------------------------------------------
topic() ->
non_empty(list(topic_level())).
topic(EntropyWeights) ->
?LET(L, scaled(1 / 4, list(1)), begin
EWs = lists:sublist(EntropyWeights ++ L, length(L)),
?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs])
end).
topic_filter() ->
?SUCHTHAT(
L,
non_empty(
list(
frequency([
{5, topic_level()},
{2, '+'},
{1, '#'}
])
)
),
not lists:member('#', L) orelse lists:last(L) == '#'
).
topic_level_pattern() ->
frequency([
{5, level},
{2, '+'},
{1, '#'}
]).
topic_filter_pattern() ->
list(topic_level_pattern()).
topic_filter(Topic) ->
?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)).
make_topic_filter([], _) ->
[];
make_topic_filter(_, []) ->
[];
make_topic_filter(['#' | _], _) ->
['#'];
make_topic_filter(['+' | Rest], [_ | Levels]) ->
['+' | make_topic_filter(Rest, Levels)];
make_topic_filter([level | Rest], [L | Levels]) ->
[L | make_topic_filter(Rest, Levels)].
% topic() ->
% ?LAZY(?SIZED(S, frequency([
% {S, [topic_level() | topic()]},
% {1, []}
% ]))).
% topic_filter() ->
% ?LAZY(?SIZED(S, frequency([
% {round(S / 3 * 2), [topic_level() | topic_filter()]},
% {round(S / 3 * 1), ['+' | topic_filter()]},
% {1, []},
% {1, ['#']}
% ]))).
topic_level() ->
?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)).
topic_level(Entropy) ->
S = floor(1 + math:log2(Entropy) / 4),
?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).
topic_level_fixed() ->
oneof([
<<"foo">>,
<<"bar">>,
<<"baz">>,
<<"xyzzy">>
]).
keymapper() ->
?LET(
{TimestampBits, TopicBits, Epoch},
{
range(0, 128),
non_empty(list(range(1, 32))),
pos_integer()
},
make_keymapper(TimestampBits, TopicBits, Epoch * 100)
).
keyspace_filter() ->
?LET(
{TopicFilter, StartTime, Keymapper},
{topic_filter(), pos_integer(), keymapper()},
emqx_replay_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
).
messages(Topic) ->
?LET(
Ts,
list(Topic),
interleaved(
?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages))
)
).
message() ->
?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}).
message_streams(Topic) ->
?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]).
timestamp() ->
scaled(20, pos_integer()).
start_time() ->
scaled(10, pos_integer()).
bitstr(Size) ->
?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)).
nth(L) ->
?LET(I, range(1, length(L)), lists:nth(I, L)).
scaled(Factor, T) ->
?SIZED(S, resize(ceil(S * Factor), T)).
interleaved(T) ->
?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))).
shuffled(T) ->
?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))).
flat(T) ->
?LET(L, T, lists:flatten(L)).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
make_keymapper(TimestampBits, TopicBits, MaxEpoch) ->
emqx_replay_message_storage:make_keymapper(#{
timestamp_bits => TimestampBits,
topic_bits_per_level => TopicBits,
epoch => MaxEpoch
}).
get_keymapper_bitsize(Keymapper) ->
maps:get(bitsize, emqx_replay_message_storage:keymapper_info(Keymapper)).
-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}).
interleave(Seqs, Rng) ->
interleave(Seqs, length(Seqs), Rng).
interleave(Seqs, L, Rng) when L > 0 ->
{N, RngNext} = rand:uniform_s(L, Rng),
{SeqHead, SeqTail} = lists:split(N - 1, Seqs),
case SeqTail of
[{Tag, [M | Rest]} | SeqRest] ->
[{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)];
[{_, []} | SeqRest] ->
interleave(SeqHead ++ SeqRest, L - 1, RngNext)
end;
interleave([], 0, _) ->
[].
-spec shuffle(list(E), rand:state()) -> list(E).
shuffle(L, Rng) ->
{Rands, _} = randoms(length(L), Rng),
[E || {_, E} <- lists:sort(lists:zip(Rands, L))].
randoms(N, Rng) when N > 0 ->
{Rand, RngNext} = rand:uniform_s(Rng),
{Tail, RngFinal} = randoms(N - 1, RngNext),
{[Rand | Tail], RngFinal};
randoms(_, Rng) ->
{[], Rng}.

View File

@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.1", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},

View File

@ -61,7 +61,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir
#! /usr/bin/env elixir
defmodule CheckElixirApplications do
alias EMQXUmbrella.MixProject

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir
#! /usr/bin/env elixir
# ensure we have a fresh rebar.lock

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir
#! /usr/bin/env elixir
defmodule CheckElixirEMQXMachineBootDiscrepancies do
alias EMQXUmbrella.MixProject