diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 4ad6049f3..3d6ab6c37 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -18,6 +18,7 @@ /apps/emqx_rule_engine/ @emqx/emqx-review-board @kjellwinblad /apps/emqx_slow_subs/ @emqx/emqx-review-board @lafirest /apps/emqx_statsd/ @emqx/emqx-review-board @JimMoen +/apps/emqx_durable_storage/ @ieQu1 ## CI /deploy/ @emqx/emqx-review-board @Rory-Z diff --git a/apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl b/apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl index d036947c6..eeb1fbbcd 100644 --- a/apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl @@ -26,7 +26,7 @@ -include("bpapi.hrl"). %%================================================================================ -%% behavior callbacks +%% behaviour callbacks %%================================================================================ introduced_in() -> diff --git a/apps/emqx_durable_storage/BSL.txt b/apps/emqx_durable_storage/BSL.txt new file mode 100644 index 000000000..2374e6ce2 --- /dev/null +++ b/apps/emqx_durable_storage/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-06-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_durable_storage/IMPLEMENTATION.md b/apps/emqx_durable_storage/IMPLEMENTATION.md new file mode 100644 index 000000000..4c78b8cc8 --- /dev/null +++ b/apps/emqx_durable_storage/IMPLEMENTATION.md @@ -0,0 +1,75 @@ +# General concepts + +In the logic layer we don't speak about replication. +This is because we could use an external DB with its own replication logic. + +On the other hand, we introduce notion of shard right here at the logic. +This is because shared subscription logic needs to be aware of it to some extend, as it has to split work between subscribers somehow. + +# Tables + +## Message storage + +Data is written every time a message matching certain pattern is published. +This pattern is not part of the logic layer spec. + +Write throughput: very high +Data size: very high +Write pattern: append only +Read pattern: pseudoserial + +Number of records: O(total write throughput * retention time) + +## Session storage + +Data there is updated when: + +- A new client connects with clean session = false +- Client subscribes to a topic +- Client unsubscribes to a topic +- Garbage collection is performed + +Write throughput: low + +Data is read when a client connects and replay agents are started + +Read throughput: low + +Data format: + +`#session{clientId = "foobar", iterators = [ItKey1, ItKey2, ItKey3, ...]}` + +Number of records: O(N clients) +Size of record: O(N subscriptions per clients) + +## Iterator storage + +Data is written every time a client acks a message. +Data is read when a client reconnects and we restart replay agents. + +`#iterator{key = IterKey, data = Blob}` + +Number of records: O(N clients * N subscriptions per client) +Size of record: O(1) +Write throughput: high, lots of small updates +Write pattern: mostly key overwrite +Read throughput: low +Read pattern: random + +# Push vs. Pull model + +In push model we have replay agents iterating over the dataset in the shards. + +In pull model the the client processes work with iterators. + +## Push pros: +- Lower latency: message can be dispatched to the client as soon as it's persisted +- Less worry about buffering + +## Push cons: +- Need pushback logic +- It's not entirely justified when working with external DB that may not provide streaming API + +## Pull pros: +- No need for pushback: client advances iterators at its own tempo +- diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md new file mode 100644 index 000000000..7de43bee0 --- /dev/null +++ b/apps/emqx_durable_storage/README.md @@ -0,0 +1,37 @@ +# EMQX Replay + +`emqx_ds` is a durable storage for MQTT messages within EMQX. +It implements the following scenarios: +- Persisting messages published by clients +- + +> 0. App overview introduction +> 1. let people know what your project can do specifically. Is it a base +> library dependency, or what kind of functionality is provided to the user? +> 2. Provide context and add a link to any reference visitors might be +> unfamiliar with. +> 3. Design details, implementation technology architecture, Roadmap, etc. + +# [Features] - [Optional] +> A List of features your application provided. If the feature is quite simple, just +> list in the previous section. + +# Limitation +TBD + +# Documentation links +TBD + +# Usage +TBD + +# Configurations +TBD + +# HTTP APIs + +# Other +TBD + +# Contributing +Please see our [contributing.md](../../CONTRIBUTING.md). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl new file mode 100644 index 000000000..230ca3f9f --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -0,0 +1,190 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds). + +%% API: +%% Messages: +-export([message_store/2, message_store/1, message_stats/0]). +%% Iterator: +-export([iterator_update/2, iterator_next/1, iterator_stats/0]). +%% Session: +-export([ + session_open/1, + session_drop/1, + session_suspend/1, + session_add_iterator/2, + session_del_iterator/2, + session_stats/0 +]). + +%% internal exports: +-export([]). + +-export_type([ + message_id/0, + message_stats/0, + message_store_opts/0, + session_id/0, + iterator_id/0, + iterator/0, + shard/0, + topic/0, + time/0 +]). + +-include("emqx_ds_int.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-type session_id() :: emqx_types:clientid(). + +-type iterator() :: term(). + +-opaque iterator_id() :: binary(). + +%%-type session() :: #session{}. + +-type message_store_opts() :: #{}. + +-type message_stats() :: #{}. + +-type message_id() :: binary(). + +%% Parsed topic: +-type topic() :: list(binary()). + +-type shard() :: binary(). + +%% Timestamp +%% Earliest possible timestamp is 0. +%% TODO granularity? +-type time() :: non_neg_integer(). + +%%================================================================================ +%% API funcions +%%================================================================================ + +%%-------------------------------------------------------------------------------- +%% Message +%%-------------------------------------------------------------------------------- +-spec message_store([emqx_types:message()], message_store_opts()) -> + {ok, [message_id()]} | {error, _}. +message_store(_Msg, _Opts) -> + %% TODO + {error, not_implemented}. + +-spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. +message_store(Msg) -> + %% TODO + message_store(Msg, #{}). + +-spec message_stats() -> message_stats(). +message_stats() -> + #{}. + +%%-------------------------------------------------------------------------------- +%% Session +%%-------------------------------------------------------------------------------- + +%% @doc Called when a client connects. This function looks up a +%% session or creates a new one if previous one couldn't be found. +%% +%% This function also spawns replay agents for each iterator. +%% +%% Note: session API doesn't handle session takeovers, it's the job of +%% the broker. +-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}. +session_open(ClientID) -> + {atomic, Ret} = + mria:transaction( + ?DS_SHARD, + fun() -> + case mnesia:read(?SESSION_TAB, ClientID) of + [#session{iterators = Iterators}] -> + {false, ClientID, Iterators}; + [] -> + Session = #session{id = ClientID, iterators = []}, + mnesia:write(?SESSION_TAB, Session, write), + {true, ClientID, []} + end + end + ), + Ret. + +%% @doc Called when a client reconnects with `clean session=true' or +%% during session GC +-spec session_drop(emqx_types:clientid()) -> ok. +session_drop(ClientID) -> + {atomic, ok} = mria:transaction( + ?DS_SHARD, + fun() -> + mnesia:delete({?SESSION_TAB, ClientID}) + end + ), + ok. + +%% @doc Called when a client disconnects. This function terminates all +%% active processes related to the session. +-spec session_suspend(session_id()) -> ok | {error, session_not_found}. +session_suspend(_SessionId) -> + %% TODO + ok. + +%% @doc Called when a client subscribes to a topic. Idempotent. +-spec session_add_iterator(session_id(), emqx_topic:words()) -> + {ok, iterator_id()} | {error, session_not_found}. +session_add_iterator(_SessionId, _TopicFilter) -> + %% TODO + {ok, <<"">>}. + +%% @doc Called when a client unsubscribes from a topic. Returns `true' +%% if the session contained the subscription or `false' if it wasn't +%% subscribed. +-spec session_del_iterator(session_id(), emqx_topic:words()) -> + {ok, boolean()} | {error, session_not_found}. +session_del_iterator(_SessionId, _TopicFilter) -> + %% TODO + {ok, false}. + +-spec session_stats() -> #{}. +session_stats() -> + #{}. + +%%-------------------------------------------------------------------------------- +%% Iterator (pull API) +%%-------------------------------------------------------------------------------- + +%% @doc Called when a client acks a message +-spec iterator_update(iterator_id(), iterator()) -> ok. +iterator_update(_IterId, _Iter) -> + %% TODO + ok. + +%% @doc Called when a client acks a message +-spec iterator_next(iterator()) -> {value, emqx_types:message(), iterator()} | none | {error, _}. +iterator_next(_Iter) -> + %% TODO + none. + +-spec iterator_stats() -> #{}. +iterator_stats() -> + #{}. + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl new file mode 100644 index 000000000..fb4d487e9 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -0,0 +1,25 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_app). + +-export([start/2]). + +-include("emqx_ds_int.hrl"). + +start(_Type, _Args) -> + init_mnesia(), + emqx_ds_sup:start_link(). + +init_mnesia() -> + ok = mria:create_table( + ?SESSION_TAB, + [ + {rlog_shard, ?DS_SHARD}, + {type, set}, + {storage, rocksdb_copies}, + {record_name, session}, + {attributes, record_info(fields, session)} + ] + ). diff --git a/apps/emqx_durable_storage/src/emqx_ds_conf.erl b/apps/emqx_durable_storage/src/emqx_ds_conf.erl new file mode 100644 index 000000000..db8b14b45 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_conf.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_conf). + +%% TODO: make a proper HOCON schema and all... + +%% API: +-export([shard_config/1, db_options/0]). + +-export([shard_iteration_options/1]). +-export([default_iteration_options/0]). + +-type backend_config() :: + {emqx_ds_message_storage_bitmask, emqx_ds_message_storage_bitmask:options()} + | {module(), _Options}. + +-export_type([backend_config/0]). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-define(APP, emqx_ds). + +-spec shard_config(emqx_ds:shard()) -> backend_config(). +shard_config(Shard) -> + DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()), + Shards = application:get_env(?APP, shard_config, #{}), + maps:get(Shard, Shards, DefaultShardConfig). + +-spec shard_iteration_options(emqx_ds:shard()) -> + emqx_ds_message_storage_bitmask:iteration_options(). +shard_iteration_options(Shard) -> + case shard_config(Shard) of + {emqx_ds_message_storage_bitmask, Config} -> + maps:get(iteration, Config, default_iteration_options()); + {_Module, _} -> + default_iteration_options() + end. + +-spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options(). +default_iteration_options() -> + {emqx_ds_message_storage_bitmask, Config} = default_shard_config(), + maps:get(iteration, Config). + +-spec default_shard_config() -> backend_config(). +default_shard_config() -> + {emqx_ds_message_storage_bitmask, #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 8, 32, 16], + epoch => 5, + iteration => #{ + iterator_refresh => {every, 100} + } + }}. + +-spec db_options() -> emqx_ds_storage_layer:db_options(). +db_options() -> + application:get_env(?APP, db_options, []). diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl new file mode 100644 index 000000000..96688ede6 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -0,0 +1,27 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_HRL). +-define(EMQX_DS_HRL, true). + +-define(SESSION_TAB, emqx_ds_session). +-define(DS_SHARD, emqx_ds_shard). + +-record(session, { + id :: emqx_ds:session_id(), + iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}] +}). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl new file mode 100644 index 000000000..5bb0423d5 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -0,0 +1,731 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_message_storage_bitmask). + +%%================================================================================ +%% @doc Description of the schema +%% +%% Let us assume that `T' is a topic and `t' is time. These are the two +%% dimensions used to index messages. They can be viewed as +%% "coordinates" of an MQTT message in a 2D space. +%% +%% Oftentimes, when wildcard subscription is used, keys must be +%% scanned in both dimensions simultaneously. +%% +%% Rocksdb allows to iterate over sorted keys very fast. This means we +%% need to map our two-dimentional keys to a single index that is +%% sorted in a way that helps to iterate over both time and topic +%% without having to do a lot of random seeks. +%% +%% == Mapping of 2D keys to rocksdb keys == +%% +%% We use "zigzag" pattern to store messages, where rocksdb key is +%% composed like like this: +%% +%% |ttttt|TTTTTTTTT|tttt| +%% ^ ^ ^ +%% | | | +%% +-------+ | +---------+ +%% | | | +%% most significant topic hash least significant +%% bits of timestamp bits of timestamp +%% (a.k.a epoch) (a.k.a time offset) +%% +%% Topic hash is level-aware: each topic level is hashed separately +%% and the resulting hashes are bitwise-concatentated. This allows us +%% to map topics to fixed-length bitstrings while keeping some degree +%% of information about the hierarchy. +%% +%% Next important concept is what we call "epoch". Duration of the +%% epoch is determined by maximum time offset. Epoch is calculated by +%% shifting bits of the timestamp right. +%% +%% The resulting index is a space-filling curve that looks like +%% this in the topic-time 2D space: +%% +%% T ^ ---->------ |---->------ |---->------ +%% | --/ / --/ / --/ +%% | -<-/ | -<-/ | -<-/ +%% | -/ | -/ | -/ +%% | ---->------ | ---->------ | ---->------ +%% | --/ / --/ / --/ +%% | ---/ | ---/ | ---/ +%% | -/ ^ -/ ^ -/ +%% | ---->------ | ---->------ | ---->------ +%% | --/ / --/ / --/ +%% | -<-/ | -<-/ | -<-/ +%% | -/ | -/ | -/ +%% | ---->------| ---->------| ----------> +%% | +%% -+------------+-----------------------------> t +%% epoch +%% +%% This structure allows to quickly seek to a the first message that +%% was recorded in a certain epoch in a certain topic or a +%% group of topics matching filter like `foo/bar/#`. +%% +%% Due to its structure, for each pair of rocksdb keys K1 and K2, such +%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) > +%% timestamp(K2). +%% That is, replay doesn't reorder messages published in each +%% individual topic. +%% +%% This property doesn't hold between different topics, but it's not deemed +%% a problem right now. +%% +%%================================================================================ + +%% API: +-export([create_new/3, open/5]). +-export([make_keymapper/1]). + +-export([store/5]). +-export([make_iterator/2]). +-export([make_iterator/3]). +-export([next/1]). + +-export([preserve_iterator/1]). +-export([restore_iterator/3]). +-export([refresh_iterator/1]). + +%% Debug/troubleshooting: +%% Keymappers +-export([ + keymapper_info/1, + compute_bitstring/3, + compute_topic_bitmask/2, + compute_time_bitmask/1, + hash/2 +]). + +%% Keyspace filters +-export([ + make_keyspace_filter/2, + compute_initial_seek/1, + compute_next_seek/2, + compute_time_seek/3, + compute_topic_seek/4 +]). + +-export_type([db/0, iterator/0, schema/0]). + +-export_type([options/0]). +-export_type([iteration_options/0]). + +-compile( + {inline, [ + bitwise_concat/3, + ones/1, + successor/1, + topic_hash_matches/3, + time_matches/3 + ]} +). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-type topic() :: emqx_ds:topic(). +-type time() :: emqx_ds:time(). + +%% Number of bits +-type bits() :: non_neg_integer(). + +%% Key of a RocksDB record. +-type key() :: binary(). + +%% Distribution of entropy among topic levels. +%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits, +%% and _rest of levels_ (if any) get 16 bits. +-type bits_per_level() :: [bits(), ...]. + +-type options() :: #{ + %% Number of bits in a message timestamp. + timestamp_bits := bits(), + %% Number of bits in a key allocated to each level in a message topic. + topic_bits_per_level := bits_per_level(), + %% Maximum granularity of iteration over time. + epoch := time(), + + iteration => iteration_options(), + + cf_options => emqx_ds_storage_layer:db_cf_options() +}. + +-type iteration_options() :: #{ + %% Request periodic iterator refresh. + %% This might be helpful during replays taking a lot of time (e.g. tens of seconds). + %% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not + %% the same as 1000 replayed messages. + iterator_refresh => {every, _NumOperations :: pos_integer()} +}. + +%% Persistent configuration of the generation, it is used to create db +%% record when the database is reopened +-record(schema, {keymapper :: keymapper()}). + +-opaque schema() :: #schema{}. + +-record(db, { + shard :: emqx_ds:shard(), + handle :: rocksdb:db_handle(), + cf :: rocksdb:cf_handle(), + keymapper :: keymapper(), + write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(), + read_options = [] :: emqx_ds_storage_layer:db_write_options() +}). + +-record(it, { + handle :: rocksdb:itr_handle(), + filter :: keyspace_filter(), + cursor :: binary() | undefined, + next_action :: {seek, binary()} | next, + refresh_counter :: {non_neg_integer(), pos_integer()} | undefined +}). + +-record(filter, { + keymapper :: keymapper(), + topic_filter :: emqx_topic:words(), + start_time :: integer(), + hash_bitfilter :: integer(), + hash_bitmask :: integer(), + time_bitfilter :: integer(), + time_bitmask :: integer() +}). + +% NOTE +% Keymapper decides how to map messages into RocksDB column family keyspace. +-record(keymapper, { + source :: [bitsource(), ...], + bitsize :: bits(), + epoch :: non_neg_integer() +}). + +-type bitsource() :: + %% Consume `_Size` bits from timestamp starting at `_Offset`th bit. + %% TODO consistency + {timestamp, _Offset :: bits(), _Size :: bits()} + %% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash. + | {hash, level | levels, _Size :: bits()}. + +-opaque db() :: #db{}. +-opaque iterator() :: #it{}. +-type keymapper() :: #keymapper{}. +-type keyspace_filter() :: #filter{}. + +%%================================================================================ +%% API funcions +%%================================================================================ + +%% Create a new column family for the generation and a serializable representation of the schema +-spec create_new(rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), options()) -> + {schema(), emqx_ds_storage_layer:cf_refs()}. +create_new(DBHandle, GenId, Options) -> + CFName = data_cf(GenId), + CFOptions = maps:get(cf_options, Options, []), + {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, CFOptions), + Schema = #schema{keymapper = make_keymapper(Options)}, + {Schema, [{CFName, CFHandle}]}. + +%% Reopen the database +-spec open( + emqx_ds:shard(), + rocksdb:db_handle(), + emqx_ds_storage_layer:gen_id(), + emqx_ds_storage_layer:cf_refs(), + schema() +) -> + db(). +open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> + {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), + #db{ + shard = Shard, + handle = DBHandle, + cf = CFHandle, + keymapper = Keymapper + }. + +-spec make_keymapper(options()) -> keymapper(). +make_keymapper(#{ + timestamp_bits := TimestampBits, + topic_bits_per_level := BitsPerLevel, + epoch := MaxEpoch +}) -> + TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))), + TimestampMSBs = TimestampBits - TimestampLSBs, + NLevels = length(BitsPerLevel), + {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel), + Source = lists:flatten([ + [{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0], + [{hash, level, Bits} || Bits <- LevelBits], + {hash, levels, TailLevelsBits}, + [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0] + ]), + #keymapper{ + source = Source, + bitsize = lists:sum([S || {_, _, S} <- Source]), + epoch = 1 bsl TimestampLSBs + }. + +-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> + ok | {error, _TODO}. +store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) -> + Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), + Value = make_message_value(Topic, MessagePayload), + rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). + +-spec make_iterator(db(), emqx_ds:replay()) -> + {ok, iterator()} | {error, _TODO}. +make_iterator(DB, Replay) -> + Options = emqx_ds_conf:shard_iteration_options(DB#db.shard), + make_iterator(DB, Replay, Options). + +-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> + % {error, invalid_start_time}? might just start from the beginning of time + % and call it a day: client violated the contract anyway. + {ok, iterator()} | {error, _TODO}. +make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) -> + case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of + {ok, ITHandle} -> + Filter = make_keyspace_filter(Replay, DB#db.keymapper), + InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), + RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), + {ok, #it{ + handle = ITHandle, + filter = Filter, + next_action = {seek, InitialSeek}, + refresh_counter = RefreshCounter + }}; + Err -> + Err + end. + +-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. +next(It0 = #it{filter = #filter{keymapper = Keymapper}}) -> + It = maybe_refresh_iterator(It0), + case rocksdb:iterator_move(It#it.handle, It#it.next_action) of + % spec says `{ok, Key}` is also possible but the implementation says it's not + {ok, Key, Value} -> + % Preserve last seen key in the iterator so it could be restored / refreshed later. + ItNext = It#it{cursor = Key}, + Bitstring = extract(Key, Keymapper), + case match_next(Bitstring, Value, It#it.filter) of + {_Topic, Payload} -> + {value, Payload, ItNext#it{next_action = next}}; + next -> + next(ItNext#it{next_action = next}); + NextBitstring when is_integer(NextBitstring) -> + NextSeek = combine(NextBitstring, <<>>, Keymapper), + next(ItNext#it{next_action = {seek, NextSeek}}); + none -> + stop_iteration(ItNext) + end; + {error, invalid_iterator} -> + stop_iteration(It); + {error, iterator_closed} -> + {error, closed} + end. + +-spec preserve_iterator(iterator()) -> binary(). +preserve_iterator(#it{cursor = Cursor}) -> + State = #{ + v => 1, + cursor => Cursor + }, + term_to_binary(State). + +-spec restore_iterator(db(), emqx_ds:replay(), binary()) -> + {ok, iterator()} | {error, _TODO}. +restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> + State = binary_to_term(Serial), + restore_iterator(DB, Replay, State); +restore_iterator(DB, Replay, #{ + v := 1, + cursor := Cursor +}) -> + case make_iterator(DB, Replay) of + {ok, It} when Cursor == undefined -> + % Iterator was preserved right after it has been made. + {ok, It}; + {ok, It} -> + % Iterator was preserved mid-replay, seek right past the last seen key. + {ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}}; + Err -> + Err + end. + +-spec refresh_iterator(iterator()) -> iterator(). +refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) -> + case rocksdb:iterator_refresh(Handle) of + ok when Action =:= next -> + % Now the underlying iterator is invalid, need to seek instead. + It#it{next_action = {seek, successor(Cursor)}}; + ok -> + % Now the underlying iterator is invalid, but will seek soon anyway. + It; + {error, _} -> + % Implementation could in theory return an {error, ...} tuple. + % Supposedly our best bet is to ignore it. + % TODO logging? + It + end. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-spec keymapper_info(keymapper()) -> + #{source := [bitsource()], bitsize := bits(), epoch := time()}. +keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) -> + #{source => Source, bitsize => Bitsize, epoch => Epoch}. + +make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> + combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper). + +make_message_value(Topic, MessagePayload) -> + term_to_binary({Topic, MessagePayload}). + +unwrap_message_value(Binary) -> + binary_to_term(Binary). + +-spec combine(_Bitstring :: integer(), emqx_guid:guid() | <<>>, keymapper()) -> + key(). +combine(Bitstring, MessageID, #keymapper{bitsize = Size}) -> + <>. + +-spec extract(key(), keymapper()) -> + _Bitstring :: integer(). +extract(Key, #keymapper{bitsize = Size}) -> + <> = Key, + Bitstring. + +-spec compute_bitstring(topic(), time(), keymapper()) -> integer(). +compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) -> + compute_bitstring(Topic, Timestamp, Source, 0). + +-spec compute_topic_bitmask(emqx_topic:words(), keymapper()) -> integer(). +compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> + compute_topic_bitmask(TopicFilter, Source, 0). + +-spec compute_time_bitmask(keymapper()) -> integer(). +compute_time_bitmask(#keymapper{source = Source}) -> + compute_time_bitmask(Source, 0). + +-spec hash(term(), bits()) -> integer(). +hash(Input, Bits) -> + % at most 32 bits + erlang:phash2(Input, 1 bsl Bits). + +-spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter(). +make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> + Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), + HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), + TimeBitmask = compute_time_bitmask(Keymapper), + HashBitfilter = Bitstring band HashBitmask, + TimeBitfilter = Bitstring band TimeBitmask, + #filter{ + keymapper = Keymapper, + topic_filter = TopicFilter, + start_time = StartTime, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + }. + +-spec compute_initial_seek(keyspace_filter()) -> integer(). +compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) -> + % Should be the same as `compute_initial_seek(0, Filter)`. + HashBitfilter bor TimeBitfilter. + +-spec compute_next_seek(integer(), keyspace_filter()) -> integer(). +compute_next_seek( + Bitstring, + Filter = #filter{ + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) -> + I = (Timestamp bsr Offset) band ones(Size), + compute_bitstring(Topic, Timestamp, Rest, bitwise_concat(Acc, I, Size)); +compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) -> + I = hash(<<"/">>, Size), + compute_bitstring([], Timestamp, Rest, bitwise_concat(Acc, I, Size)); +compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) -> + I = hash(Level, Size), + compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size)); +compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) -> + I = hash(Tail, Size), + compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size)); +compute_bitstring(_, _, [], Acc) -> + Acc. + +compute_topic_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) -> + compute_topic_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size)); +compute_topic_bitmask(['#'], [{hash, _, Size} | Rest], Acc) -> + compute_topic_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size)); +compute_topic_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) -> + compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size)); +compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) -> + compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> + compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) -> + Mask = + case lists:member('+', Tail) orelse lists:member('#', Tail) of + true -> 0; + false -> ones(Size) + end, + compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size)); +compute_topic_bitmask(_, [], Acc) -> + Acc. + +compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) -> + compute_time_bitmask(Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_time_bitmask([{hash, _, Size} | Rest], Acc) -> + compute_time_bitmask(Rest, bitwise_concat(Acc, 0, Size)); +compute_time_bitmask([], Acc) -> + Acc. + +bitwise_concat(Acc, Item, ItemSize) -> + (Acc bsl ItemSize) bor Item. + +ones(Bits) -> + 1 bsl Bits - 1. + +-spec successor(key()) -> key(). +successor(Key) -> + <>. + +%% |123|345|678| +%% foo bar baz + +%% |123|000|678| - |123|fff|678| + +%% foo + baz + +%% |fff|000|fff| + +%% |123|000|678| + +%% |123|056|678| & |fff|000|fff| = |123|000|678|. + +match_next( + Bitstring, + Value, + Filter = #filter{ + topic_filter = TopicFilter, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), + case HashMatches and TimeMatches of + true -> + Message = {Topic, _Payload} = unwrap_message_value(Value), + case emqx_topic:match(Topic, TopicFilter) of + true -> + Message; + false -> + next + end; + false -> + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter) + end. + +%% `Bitstring` is out of the hash space defined by `HashBitfilter`. +compute_next_seek( + _HashMatches = false, + _TimeMatches, + Bitstring, + Filter = #filter{ + keymapper = Keymapper, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper), + case NextBitstring of + none -> + none; + _ -> + TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(true, TimeMatches, NextBitstring, Filter) + end; +%% `Bitstring` is out of the time range defined by `TimeBitfilter`. +compute_next_seek( + _HashMatches = true, + _TimeMatches = false, + Bitstring, + #filter{ + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask); +compute_next_seek(true, true, Bitstring, _It) -> + Bitstring. + +topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) -> + (Bitstring band HashBitmask) == HashBitfilter. + +time_matches(Bitstring, TimeBitfilter, TimeBitmask) -> + (Bitstring band TimeBitmask) >= TimeBitfilter. + +compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) -> + % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`. + (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter. + +%% Find the closest bitstring which is: +%% * greater than `Bitstring`, +%% * and falls into the hash space defined by `HashBitfilter`. +%% Note that the result can end up "back" in time and out of the time range. +compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> + Sources = Keymapper#keymapper.source, + Size = Keymapper#keymapper.bitsize, + compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size). + +compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> + % NOTE + % We're iterating through `Substring` here, in lockstep with `HashBitfilter` + % and `HashBitmask`, starting from least signigicant bits. Each bitsource in + % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S` + % bits long which we interpret as a "digit". There are 2 flavors of those + % "digits": + % * regular digit with 2^S possible values, + % * degenerate digit with exactly 1 possible value U (represented with 0). + % Our goal here is to find a successor of `Bistring` and perform a kind of + % digit-by-digit addition operation with carry propagation. + NextSeek = zipfoldr3( + fun(Source, Substring, Filter, LBitmask, Offset, Acc) -> + case Source of + {hash, _, S} when LBitmask =:= 0 -> + % Regular case + bitwise_add_digit(Substring, Acc, S, Offset); + {hash, _, _} when LBitmask =/= 0, Substring < Filter -> + % Degenerate case, I_digit < U, no overflow. + % Successor is `U bsl Offset` which is equivalent to 0. + 0; + {hash, _, S} when LBitmask =/= 0, Substring > Filter -> + % Degenerate case, I_digit > U, overflow. + % Successor is `(1 bsl Size + U) bsl Offset`. + overflow_digit(S, Offset); + {hash, _, S} when LBitmask =/= 0 -> + % Degenerate case, I_digit = U + % Perform digit addition with I_digit = 0, assuming "digit" has + % 0 bits of information (but is `S` bits long at the same time). + % This will overflow only if the result of previous iteration + % was an overflow. + bitwise_add_digit(0, Acc, 0, S, Offset); + {timestamp, _, S} -> + % Regular case + bitwise_add_digit(Substring, Acc, S, Offset) + end + end, + 0, + Bitstring, + HashBitfilter, + HashBitmask, + Size, + Sources + ), + case NextSeek bsr Size of + _Carry = 0 -> + % Found the successor. + % We need to recover values of those degenerate digits which we + % represented with 0 during digit-by-digit iteration. + NextSeek bor (HashBitfilter band HashBitmask); + _Carry = 1 -> + % We got "carried away" past the range, time to stop iteration. + none + end. + +bitwise_add_digit(Digit, Number, Width, Offset) -> + bitwise_add_digit(Digit, Number, Width, Width, Offset). + +%% Add "digit" (represented with integer `Digit`) to the `Number` assuming +%% this digit starts at `Offset` bits in `Number` and is `Width` bits long. +%% Perform an overflow if the result of addition would not fit into `Bits` +%% bits. +bitwise_add_digit(Digit, Number, Bits, Width, Offset) -> + Sum = (Digit bsl Offset) + Number, + case (Sum bsr Offset) < (1 bsl Bits) of + true -> Sum; + false -> overflow_digit(Width, Offset) + end. + +%% Constuct a number which denotes an overflow of digit that starts at +%% `Offset` bits and is `Width` bits long. +overflow_digit(Width, Offset) -> + (1 bsl Width) bsl Offset. + +%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least +%% significant bits first. +%% +%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are +%% specified in `Sources` list, in order from most significant bits to least +%% significant. Each iteration calls `FoldFun` with: +%% * bitsource that was used to extract sub-bitstrings, +%% * 3 sub-bitstrings in integer representation, +%% * bit offset into integers, +%% * current accumulator. +-spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) -> + Acc +when + FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc). +zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) -> + Acc; +zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) -> + OffsetNext = Offset - S, + AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest), + FoldFun( + Source, + substring(I1, OffsetNext, S), + substring(I2, OffsetNext, S), + substring(I3, OffsetNext, S), + OffsetNext, + AccNext + ). + +substring(I, Offset, Size) -> + (I bsr Offset) band ones(Size). + +%% @doc Generate a column family ID for the MQTT messages +-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. +data_cf(GenId) -> + ?MODULE_STRING ++ integer_to_list(GenId). + +make_refresh_counter({every, N}) when is_integer(N), N > 0 -> + {0, N}; +make_refresh_counter(undefined) -> + undefined. + +maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) -> + refresh_iterator(It#it{refresh_counter = {0, N}}); +maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) -> + It#it{refresh_counter = {M + 1, N}}; +maybe_refresh_iterator(It = #it{refresh_counter = undefined}) -> + It. + +stop_iteration(It) -> + ok = rocksdb:iterator_close(It#it.handle), + none. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replay.erl b/apps/emqx_durable_storage/src/emqx_ds_replay.erl new file mode 100644 index 000000000..a66cee7fd --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replay.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_replay). + +%% API: +-export([]). + +-export_type([replay_id/0, replay/0]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-type replay_id() :: binary(). + +-type replay() :: { + _TopicFilter :: emqx_ds:topic(), + _StartTime :: emqx_ds:time() +}. + +%%================================================================================ +%% API funcions +%%================================================================================ + +%%================================================================================ +%% behaviour callbacks +%%================================================================================ + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl new file mode 100644 index 000000000..43a399a1b --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -0,0 +1,505 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_layer). + +-behaviour(gen_server). + +%% API: +-export([start_link/1]). +-export([create_generation/3]). + +-export([store/5]). + +-export([make_iterator/2, next/1]). + +-export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]). + +%% behaviour callbacks: +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]). + +-compile({inline, [meta_lookup/2]}). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%% see rocksdb:db_options() +% -type options() :: proplists:proplist(). + +-type db_write_options() :: proplists:proplist(). + +-type cf_refs() :: [{string(), rocksdb:cf_handle()}]. + +%% Message storage generation +%% Keep in mind that instances of this type are persisted in long-term storage. +-type generation() :: #{ + %% Module that handles data for the generation + module := module(), + %% Module-specific data defined at generation creation time + data := term(), + %% When should this generation become active? + %% This generation should only contain messages timestamped no earlier than that. + %% The very first generation will have `since` equal 0. + since := emqx_ds:time() +}. + +-record(s, { + shard :: emqx_ds:shard(), + db :: rocksdb:db_handle(), + cf_iterator :: rocksdb:cf_handle(), + cf_generations :: cf_refs() +}). + +-record(it, { + shard :: emqx_ds:shard(), + gen :: gen_id(), + replay :: emqx_ds:replay(), + module :: module(), + data :: term() +}). + +-type gen_id() :: 0..16#ffff. + +-opaque state() :: #s{}. +-opaque iterator() :: #it{}. + +%% Contents of the default column family: +%% +%% [{<<"genNN">>, #generation{}}, ..., +%% {<<"current">>, GenID}] + +-define(DEFAULT_CF, "default"). +-define(DEFAULT_CF_OPTS, []). + +-define(ITERATOR_CF, "$iterators"). + +%% TODO +%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`. +%% 2. Supposedly might be compressed _very_ effectively. +%% 3. `inplace_update_support`? +-define(ITERATOR_CF_OPTS, []). + +-define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}). + +%%================================================================================ +%% Callbacks +%%================================================================================ + +-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> + {_Schema, cf_refs()}. + +-callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> + term(). + +-callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) -> + ok | {error, _}. + +-callback make_iterator(_Schema, emqx_ds:replay()) -> + {ok, _It} | {error, _}. + +-callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. + +-callback preserve_iterator(_Schema, _It) -> term(). + +-callback next(It) -> {value, binary(), It} | none | {error, closed}. + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec start_link(emqx_ds:shard()) -> {ok, pid()}. +start_link(Shard) -> + gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []). + +-spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> + {ok, gen_id()} | {error, nonmonotonic}. +create_generation(Shard, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Shard), {create_generation, Since, Config}). + +-spec store( + emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary() +) -> + ok | {error, _}. +store(Shard, GUID, Time, Topic, Msg) -> + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), + Mod:store(Data, GUID, Time, Topic, Msg). + +-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> + {ok, iterator()} | {error, _TODO}. +make_iterator(Shard, Replay = {_, StartTime}) -> + {GenId, Gen} = meta_lookup_gen(Shard, StartTime), + open_iterator(Gen, #it{ + shard = Shard, + gen = GenId, + replay = Replay + }). + +-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. +next(It = #it{module = Mod, data = ItData}) -> + case Mod:next(ItData) of + {value, Val, ItDataNext} -> + {value, Val, It#it{data = ItDataNext}}; + {error, _} = Error -> + Error; + none -> + case open_next_iterator(It) of + {ok, ItNext} -> + next(ItNext); + {error, _} = Error -> + Error; + none -> + none + end + end. + +-spec preserve_iterator(iterator(), emqx_ds:replay_id()) -> + ok | {error, _TODO}. +preserve_iterator(It = #it{}, ReplayID) -> + iterator_put_state(ReplayID, It). + +-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> + {ok, iterator()} | {error, _TODO}. +restore_iterator(Shard, ReplayID) -> + case iterator_get_state(Shard, ReplayID) of + {ok, Serial} -> + restore_iterator_state(Shard, Serial); + not_found -> + {error, not_found}; + {error, _Reason} = Error -> + Error + end. + +-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> + ok | {error, _TODO}. +discard_iterator(Shard, ReplayID) -> + iterator_delete(Shard, ReplayID). + +%%================================================================================ +%% behaviour callbacks +%%================================================================================ + +init([Shard]) -> + process_flag(trap_exit, true), + {ok, S0} = open_db(Shard), + S = ensure_current_generation(S0), + ok = populate_metadata(S), + {ok, S}. + +handle_call({create_generation, Since, Config}, _From, S) -> + case create_new_gen(Since, Config, S) of + {ok, GenId, NS} -> + {reply, {ok, GenId}, NS}; + {error, _} = Error -> + {reply, Error, S} + end; +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #s{db = DB, shard = Shard}) -> + meta_erase(Shard), + ok = rocksdb:close(DB). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}). + +-spec populate_metadata(state()) -> ok. +populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) -> + ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}), + Current = schema_get_current(DBHandle), + lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)). + +-spec populate_metadata(gen_id(), state()) -> ok. +populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> + Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), + meta_register_gen(Shard, GenId, Gen). + +-spec ensure_current_generation(state()) -> state(). +ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) -> + case schema_get_current(DBHandle) of + undefined -> + Config = emqx_ds_conf:shard_config(Shard), + {ok, _, NS} = create_new_gen(0, Config, S), + NS; + _GenId -> + S + end. + +-spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> + {ok, gen_id(), state()} | {error, nonmonotonic}. +create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> + GenId = get_next_id(meta_get_current(Shard)), + GenId = get_next_id(schema_get_current(DBHandle)), + case is_gen_valid(Shard, GenId, Since) of + ok -> + {ok, Gen, NS} = create_gen(GenId, Since, Config, S), + %% TODO: Transaction? Column family creation can't be transactional, anyway. + ok = schema_put_gen(DBHandle, GenId, Gen), + ok = schema_put_current(DBHandle, GenId), + ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)), + {ok, GenId, NS}; + {error, _} = Error -> + Error + end. + +-spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> + {ok, generation(), state()}. +create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) -> + % TODO: Backend implementation should ensure idempotency. + {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), + Gen = #{ + module => Module, + data => Schema, + since => Since + }, + {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. + +-spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}. +open_db(Shard) -> + Filename = binary_to_list(Shard), + DBOptions = [ + {create_if_missing, true}, + {create_missing_column_families, true} + | emqx_ds_conf:db_options() + ], + ExistingCFs = + case rocksdb:list_column_families(Filename, DBOptions) of + {ok, CFs} -> + [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; + % DB is not present. First start + {error, {db_open, _}} -> + [] + end, + ColumnFamilies = [ + {?DEFAULT_CF, ?DEFAULT_CF_OPTS}, + {?ITERATOR_CF, ?ITERATOR_CF_OPTS} + | ExistingCFs + ], + case rocksdb:open(Filename, DBOptions, ColumnFamilies) of + {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> + {CFNames, _} = lists:unzip(ExistingCFs), + {ok, #s{ + shard = Shard, + db = DBHandle, + cf_iterator = CFIterator, + cf_generations = lists:zip(CFNames, CFRefs) + }}; + Error -> + Error + end. + +-spec open_gen(gen_id(), generation(), state()) -> generation(). +open_gen( + GenId, + Gen = #{module := Mod, data := Data}, + #s{shard = Shard, db = DBHandle, cf_generations = CFs} +) -> + DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), + Gen#{data := DB}. + +-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. +open_next_iterator(It = #it{shard = Shard, gen = GenId}) -> + open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}). + +open_next_iterator(undefined, _It) -> + none; +open_next_iterator(Gen = #{}, It) -> + open_iterator(Gen, It). + +-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. +open_iterator(#{module := Mod, data := Data}, It = #it{}) -> + case Mod:make_iterator(Data, It#it.replay) of + {ok, ItData} -> + {ok, It#it{module = Mod, data = ItData}}; + Err -> + Err + end. + +-spec open_restore_iterator(generation(), iterator(), binary()) -> + {ok, iterator()} | {error, _Reason}. +open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) -> + case Mod:restore_iterator(Data, Replay, Serial) of + {ok, ItData} -> + {ok, It#it{module = Mod, data = ItData}}; + Err -> + Err + end. + +%% + +-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>). + +-define(ITERATION_WRITE_OPTS, []). +-define(ITERATION_READ_OPTS, []). + +iterator_get_state(Shard, ReplayID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS). + +iterator_put_state(ID, It = #it{shard = Shard}) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + Serial = preserve_iterator_state(It), + rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS). + +iterator_delete(Shard, ID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS). + +preserve_iterator_state(#it{ + gen = Gen, + replay = {TopicFilter, StartTime}, + module = Mod, + data = ItData +}) -> + term_to_binary(#{ + v => 1, + gen => Gen, + filter => TopicFilter, + start => StartTime, + st => Mod:preserve_iterator(ItData) + }). + +restore_iterator_state(Shard, Serial) when is_binary(Serial) -> + restore_iterator_state(Shard, binary_to_term(Serial)); +restore_iterator_state( + Shard, + #{ + v := 1, + gen := Gen, + filter := TopicFilter, + start := StartTime, + st := State + } +) -> + It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, + open_restore_iterator(meta_get_gen(Shard, Gen), It, State). + +%% Functions for dealing with the metadata stored persistently in rocksdb + +-define(CURRENT_GEN, <<"current">>). +-define(SCHEMA_WRITE_OPTS, []). +-define(SCHEMA_READ_OPTS, []). + +-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation(). +schema_get_gen(DBHandle, GenId) -> + {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), + binary_to_term(Bin). + +-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}. +schema_put_gen(DBHandle, GenId, Gen) -> + rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). + +-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined. +schema_get_current(DBHandle) -> + case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of + {ok, Bin} -> + binary_to_integer(Bin); + not_found -> + undefined + end. + +-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}. +schema_put_current(DBHandle, GenId) -> + rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS). + +-spec schema_gen_key(integer()) -> binary(). +schema_gen_key(N) -> + <<"gen", N:32>>. + +-undef(CURRENT_GEN). +-undef(SCHEMA_WRITE_OPTS). +-undef(SCHEMA_READ_OPTS). + +%% Functions for dealing with the runtime shard metadata: + +-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}). + +-spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok. +meta_register_gen(Shard, GenId, Gen) -> + Gs = + case GenId > 0 of + true -> meta_lookup(Shard, GenId - 1); + false -> [] + end, + ok = meta_put(Shard, GenId, [Gen | Gs]), + ok = meta_put(Shard, current, GenId). + +-spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}. +meta_lookup_gen(Shard, Time) -> + % TODO + % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning + % towards a "no". + Current = meta_lookup(Shard, current), + Gens = meta_lookup(Shard, Current), + find_gen(Time, Current, Gens). + +find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> + {GenId, Gen}; +find_gen(Time, GenId, [_Gen | Rest]) -> + find_gen(Time, GenId - 1, Rest). + +-spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined. +meta_get_gen(Shard, GenId) -> + case meta_lookup(Shard, GenId, []) of + [Gen | _Older] -> Gen; + [] -> undefined + end. + +-spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined. +meta_get_current(Shard) -> + meta_lookup(Shard, current, undefined). + +-spec meta_lookup(emqx_ds:shard(), _K) -> _V. +meta_lookup(Shard, K) -> + persistent_term:get(?PERSISTENT_TERM(Shard, K)). + +-spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default. +meta_lookup(Shard, K, Default) -> + persistent_term:get(?PERSISTENT_TERM(Shard, K), Default). + +-spec meta_put(emqx_ds:shard(), _K, _V) -> ok. +meta_put(Shard, K, V) -> + persistent_term:put(?PERSISTENT_TERM(Shard, K), V). + +-spec meta_erase(emqx_ds:shard()) -> ok. +meta_erase(Shard) -> + [ + persistent_term:erase(K) + || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard + ], + ok. + +-undef(PERSISTENT_TERM). + +get_next_id(undefined) -> 0; +get_next_id(GenId) -> GenId + 1. + +is_gen_valid(Shard, GenId, Since) when GenId > 0 -> + [GenPrev | _] = meta_lookup(Shard, GenId - 1), + case GenPrev of + #{since := SincePrev} when Since > SincePrev -> + ok; + #{} -> + {error, nonmonotonic} + end; +is_gen_valid(_Shard, 0, 0) -> + ok. + +%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. +%% store_cfs(DBHandle, CFRefs) -> +%% lists:foreach( +%% fun({CFName, CFRef}) -> +%% persistent_term:put({self(), CFName}, {DBHandle, CFRef}) +%% end, +%% CFRefs). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl new file mode 100644 index 000000000..ed745df5f --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -0,0 +1,62 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_layer_sup). + +-behaviour(supervisor). + +%% API: +-export([start_link/0, start_shard/1, stop_shard/1]). + +%% behaviour callbacks: +-export([init/1]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SUP, ?MODULE). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUP}, ?MODULE, []). + +-spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret(). +start_shard(Shard) -> + supervisor:start_child(?SUP, shard_child_spec(Shard)). + +-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. +stop_shard(Shard) -> + ok = supervisor:terminate_child(?SUP, Shard), + ok = supervisor:delete_child(?SUP, Shard). + +%%================================================================================ +%% behaviour callbacks +%%================================================================================ + +init([]) -> + Children = [], + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 + }, + {ok, {SupFlags, Children}}. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec(). +shard_child_spec(Shard) -> + #{ + id => Shard, + start => {emqx_ds_storage_layer, start_link, [Shard]}, + shutdown => 5_000, + restart => permanent, + type => worker + }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl new file mode 100644 index 000000000..ca939e892 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_sup). + +-behaviour(supervisor). + +%% API: +-export([start_link/0]). + +%% behaviour callbacks: +-export([init/1]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SUP, ?MODULE). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUP}, ?MODULE, []). + +%%================================================================================ +%% behaviour callbacks +%%================================================================================ + +init([]) -> + Children = [shard_sup()], + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, + {ok, {SupFlags, Children}}. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +shard_sup() -> + #{ + id => local_store_shard_sup, + start => {emqx_ds_storage_layer_sup, start_link, []}, + restart => permanent, + type => supervisor, + shutdown => infinity + }. diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src new file mode 100644 index 000000000..7ea036536 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -0,0 +1,11 @@ +%% -*- mode: erlang -*- +{application, emqx_durable_storage, [ + {description, "Message persistence and subscription replays for EMQX"}, + % strict semver, bump manually! + {vsn, "0.1.0"}, + {modules, []}, + {registered, []}, + {applications, [kernel, stdlib, rocksdb, gproc, mria]}, + {mod, {emqx_ds_app, []}}, + {env, []} +]}. diff --git a/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl new file mode 100644 index 000000000..599bd6c7b --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl @@ -0,0 +1,188 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_message_storage_bitmask_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("stdlib/include/assert.hrl"). + +-import(emqx_ds_message_storage_bitmask, [ + make_keymapper/1, + keymapper_info/1, + compute_topic_bitmask/2, + compute_time_bitmask/1, + compute_topic_seek/4 +]). + +all() -> emqx_common_test_helpers:all(?MODULE). + +t_make_keymapper(_) -> + ?assertMatch( + #{ + source := [ + {timestamp, 9, 23}, + {hash, level, 2}, + {hash, level, 4}, + {hash, levels, 8}, + {timestamp, 0, 9} + ], + bitsize := 46, + epoch := 512 + }, + keymapper_info( + make_keymapper(#{ + timestamp_bits => 32, + topic_bits_per_level => [2, 4, 8], + epoch => 1000 + }) + ) + ). + +t_make_keymapper_single_hash_level(_) -> + ?assertMatch( + #{ + source := [ + {timestamp, 0, 32}, + {hash, levels, 16} + ], + bitsize := 48, + epoch := 1 + }, + keymapper_info( + make_keymapper(#{ + timestamp_bits => 32, + topic_bits_per_level => [16], + epoch => 1 + }) + ) + ). + +t_make_keymapper_no_timestamp(_) -> + ?assertMatch( + #{ + source := [ + {hash, level, 4}, + {hash, level, 8}, + {hash, levels, 16} + ], + bitsize := 28, + epoch := 1 + }, + keymapper_info( + make_keymapper(#{ + timestamp_bits => 0, + topic_bits_per_level => [4, 8, 16], + epoch => 42 + }) + ) + ). + +t_compute_topic_bitmask(_) -> + KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), + ?assertEqual( + 2#111_1111_11111_11, + compute_topic_bitmask([<<"foo">>, <<"bar">>], KM) + ), + ?assertEqual( + 2#111_0000_11111_11, + compute_topic_bitmask([<<"foo">>, '+'], KM) + ), + ?assertEqual( + 2#111_0000_00000_11, + compute_topic_bitmask([<<"foo">>, '+', '+'], KM) + ), + ?assertEqual( + 2#111_0000_11111_00, + compute_topic_bitmask([<<"foo">>, '+', <<"bar">>, '+'], KM) + ). + +t_compute_topic_bitmask_wildcard(_) -> + KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), + ?assertEqual( + 2#000_0000_00000_00, + compute_topic_bitmask(['#'], KM) + ), + ?assertEqual( + 2#111_0000_00000_00, + compute_topic_bitmask([<<"foo">>, '#'], KM) + ), + ?assertEqual( + 2#111_1111_11111_00, + compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'], KM) + ). + +t_compute_topic_bitmask_wildcard_long_tail(_) -> + KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), + ?assertEqual( + 2#111_1111_11111_11, + compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, <<"xyzzy">>], KM) + ), + ?assertEqual( + 2#111_1111_11111_00, + compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'], KM) + ). + +t_compute_time_bitmask(_) -> + KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 200}), + ?assertEqual(2#111_000000_1111111, compute_time_bitmask(KM)). + +t_compute_time_bitmask_epoch_only(_) -> + KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 1}), + ?assertEqual(2#1111111111_000000, compute_time_bitmask(KM)). + +%% Filter = |123|***|678|***| +%% Mask = |123|***|678|***| +%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000| +%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000| +%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos +%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos + +t_compute_next_topic_seek(_) -> + KM = make_keymapper(#{topic_bits_per_level => [8, 8, 16, 12], timestamp_bits => 0, epoch => 1}), + ?assertMatch( + none, + compute_topic_seek( + 16#FD_42_4242_043, + 16#FD_42_4242_042, + 16#FF_FF_FFFF_FFF, + KM + ) + ), + ?assertMatch( + 16#FD_11_0678_000, + compute_topic_seek( + 16#FD_11_0108_121, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ), + ?assertMatch( + 16#FD_12_0678_000, + compute_topic_seek( + 16#FD_11_0679_919, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ), + ?assertMatch( + none, + compute_topic_seek( + 16#FD_FF_0679_001, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ), + ?assertMatch( + none, + compute_topic_seek( + 16#FE_11_0179_017, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl new file mode 100644 index 000000000..46a1436bb --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -0,0 +1,276 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_layer_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(SHARD, shard(?FUNCTION_NAME)). + +-define(DEFAULT_CONFIG, + {emqx_ds_message_storage_bitmask, #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 32, 16], + epoch => 5, + iteration => #{ + iterator_refresh => {every, 5} + } + }} +). + +-define(COMPACT_CONFIG, + {emqx_ds_message_storage_bitmask, #{ + timestamp_bits => 16, + topic_bits_per_level => [16, 16], + epoch => 10 + }} +). + +%% Smoke test for opening and reopening the database +t_open(_Config) -> + ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD). + +%% Smoke test of store function +t_store(_Config) -> + MessageID = emqx_guid:gen(), + PublishedAt = 1000, + Topic = [<<"foo">>, <<"bar">>], + Payload = <<"message">>, + ?assertMatch(ok, emqx_ds_storage_layer:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). + +%% Smoke test for iteration through a concrete topic +t_iterate(_Config) -> + %% Prepare data: + Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]], + Timestamps = lists:seq(1, 10), + [ + emqx_ds_storage_layer:store( + ?SHARD, + emqx_guid:gen(), + PublishedAt, + Topic, + integer_to_binary(PublishedAt) + ) + || Topic <- Topics, PublishedAt <- Timestamps + ], + %% Iterate through individual topics: + [ + begin + {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {Topic, 0}), + Values = iterate(It), + ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) + end + || Topic <- Topics + ], + ok. + +%% Smoke test for iteration with wildcard topic filter +t_iterate_wildcard(_Config) -> + %% Prepare data: + Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], + Timestamps = lists:seq(1, 10), + _ = [ + store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)]) + ), + ?assertEqual( + [], + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)]) + ), + ?assertEqual( + lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) + ), + ?assertEqual( + lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)]) + ), + ?assertEqual( + [], + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)]) + ), + ?assertEqual( + lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)]) + ), + ?assertEqual( + [], + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)]) + ), + ok. + +t_iterate_long_tail_wildcard(_Config) -> + Topic = "b/c/d/e/f/g", + TopicFilter = "b/c/d/e/+/+", + Timestamps = lists:seq(1, 100), + _ = [ + store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)]) + ). + +t_create_gen(_Config) -> + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), + ?assertEqual( + {error, nonmonotonic}, + emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) + ), + ?assertEqual( + {error, nonmonotonic}, + emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) + ), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz"], + Timestamps = lists:seq(1, 100), + [ + ?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>)) + || Topic <- Topics, PublishedAt <- Timestamps + ]. + +t_iterate_multigen(_Config) -> + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], + Timestamps = lists:seq(1, 100), + _ = [ + store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)]) + ). + +t_iterate_multigen_preserve_restore(_Config) -> + ReplayID = atom_to_binary(?FUNCTION_NAME), + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz", "a/bar"], + Timestamps = lists:seq(1, 100), + TopicFilter = "foo/#", + TopicsMatching = ["foo/bar", "foo/bar/baz"], + _ = [ + store(?SHARD, TS, Topic, term_to_binary({Topic, TS})) + || Topic <- Topics, TS <- Timestamps + ], + It0 = iterator(?SHARD, TopicFilter, 0), + {It1, Res10} = iterate(It0, 10), + % preserve mid-generation + ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID), + {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), + {It3, Res100} = iterate(It2, 88), + % preserve on the generation boundary + ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID), + {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), + {It5, Res200} = iterate(It4, 1000), + ?assertEqual(none, It5), + ?assertEqual( + lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200]) + ), + ?assertEqual( + ok, + emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID) + ), + ?assertEqual( + {error, not_found}, + emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID) + ). + +store(Shard, PublishedAt, Topic, Payload) -> + ID = emqx_guid:gen(), + emqx_ds_storage_layer:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). + +iterate(DB, TopicFilter, StartTime) -> + iterate(iterator(DB, TopicFilter, StartTime)). + +iterate(It) -> + case emqx_ds_storage_layer:next(It) of + {value, Payload, ItNext} -> + [Payload | iterate(ItNext)]; + none -> + [] + end. + +iterate(It, 0) -> + {It, []}; +iterate(It, N) -> + case emqx_ds_storage_layer:next(It) of + {value, Payload, ItNext} -> + {ItFinal, Ps} = iterate(ItNext, N - 1), + {ItFinal, [Payload | Ps]}; + none -> + {none, []} + end. + +iterator(DB, TopicFilter, StartTime) -> + {ok, It} = emqx_ds_storage_layer:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), + It. + +parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> + Topic; +parse_topic(Topic) -> + emqx_topic:words(iolist_to_binary(Topic)). + +%% CT callbacks + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(emqx_durable_storage), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_durable_storage). + +init_per_testcase(TC, Config) -> + ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)), + Config. + +end_per_testcase(TC, _Config) -> + ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). + +shard(TC) -> + list_to_binary(lists:concat([?MODULE, "_", TC])). + +set_shard_config(Shard, Config) -> + ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}). diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl new file mode 100644 index 000000000..59668ca01 --- /dev/null +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_message_storage_bitmask_shim). + +-export([open/0]). +-export([close/1]). +-export([store/5]). +-export([iterate/2]). + +-type topic() :: list(binary()). +-type time() :: integer(). + +-opaque t() :: ets:tid(). + +-spec open() -> t(). +open() -> + ets:new(?MODULE, [ordered_set, {keypos, 1}]). + +-spec close(t()) -> ok. +close(Tab) -> + true = ets:delete(Tab), + ok. + +-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) -> + ok | {error, _TODO}. +store(Tab, MessageID, PublishedAt, Topic, Payload) -> + true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), + ok. + +-spec iterate(t(), emqx_ds:replay()) -> + [binary()]. +iterate(Tab, {TopicFilter, StartTime}) -> + ets:foldr( + fun({{PublishedAt, _}, Topic, Payload}, Acc) -> + case emqx_topic:match(Topic, TopicFilter) of + true when PublishedAt >= StartTime -> + [Payload | Acc]; + _ -> + Acc + end + end, + [], + Tab + ). diff --git a/apps/emqx_durable_storage/test/props/payload_gen.erl b/apps/emqx_durable_storage/test/props/payload_gen.erl new file mode 100644 index 000000000..17e68f8d5 --- /dev/null +++ b/apps/emqx_durable_storage/test/props/payload_gen.erl @@ -0,0 +1,377 @@ +%% @doc This module provides lazy, composable producer streams that +%% can be considered counterparts to Archiver's consumer pipes and +%% therefore can facilitate testing +%% +%% Also it comes with an implementation of binary data stream which is +%% able to produce sufficiently large amounts of plausibly +%% pseudorandom binary payload in a deterministic way. It also +%% contains routines to check binary blobs via sampling +-module(payload_gen). + +-define(end_of_stream, []). + +-dialyzer(no_improper_lists). + +%% Generic stream API: +-export([ + interleave_streams/1, + retransmits/2, + next/1, + consume/2, + consume/1 +]). + +%% Binary payload generator API: +-export([ + interleave_chunks/2, + interleave_chunks/1, + + mb/1, + + generator_fun/2, + generate_chunks/3, + generate_chunk/2, + check_consistency/3, + check_file_consistency/3, + get_byte/2 +]). + +%% List to stream generator API: +-export([list_to_stream/1]). + +%% Proper generators: +-export([ + binary_stream_gen/1, + interleaved_streams_gen/1, + interleaved_binary_gen/1, + interleaved_list_gen/1 +]). + +-export_type([payload/0, binary_payload/0]). + +-define(hash_size, 16). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-type payload() :: {Seed :: term(), Size :: integer()}. + +-type binary_payload() :: { + binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer() +}. + +%% For performance reasons we treat regular lists as streams, see `next/1' +-opaque cont(Data) :: + fun(() -> stream(Data)) + | stream(Data). + +-type stream(Data) :: + maybe_improper_list(Data, cont(Data)) + | ?end_of_stream. + +-type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}). + +-record(chunk_state, { + seed :: term(), + payload_size :: non_neg_integer(), + offset :: non_neg_integer(), + chunk_size :: non_neg_integer() +}). + +-opaque chunk_state() :: #chunk_state{}. + +-record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}). + +-opaque interleave_state() :: #interleave_state{}. + +%% ============================================================================= +%% API functions +%% ============================================================================= + +%% ----------------------------------------------------------------------------- +%% Proper generators +%% ----------------------------------------------------------------------------- + +%% @doc Proper generator that creates a binary stream +-spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type(). +binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 -> + ?LET( + {Seed, Size}, + {nat(), range(1, 16#100000)}, + generate_chunk({Seed, Size}, ChunkSize) + ). + +%% @equiv interleaved_streams_gen(10, Type) +-spec interleaved_streams_gen(proper_types:type()) -> proper_types:type(). +interleaved_streams_gen(Type) -> + interleaved_streams_gen(10, Type). + +%% @doc Proper generator that creates a term of type +%% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed +%% into `interleave_streams/1' function +-spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) -> + proper_types:type(). +interleaved_streams_gen(MaxNStreams, StreamType) -> + ?LET( + NStreams, + range(1, MaxNStreams), + ?LET( + Streams, + vector(NStreams, StreamType), + begin + Tags = [<> || I <- lists:seq(1, length(Streams))], + lists:zip(Tags, Streams) + end + ) + ). + +-spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type(). +interleaved_binary_gen(ChunkSize) -> + interleaved_streams_gen(binary_stream_gen(ChunkSize)). + +-spec interleaved_list_gen(proper_types:type()) -> proper_types:type(). +interleaved_list_gen(Type) -> + interleaved_streams_gen(non_empty(list(Type))). + +%% ----------------------------------------------------------------------------- +%% Generic streams +%% ----------------------------------------------------------------------------- + +%% @doc Consume one element from the stream. +-spec next(cont(A)) -> stream(A). +next(Fun) when is_function(Fun, 0) -> + Fun(); +next(L) -> + L. + +%% @doc Take a list of tagged streams and return a stream where +%% elements of the streams are tagged and randomly interleaved. +%% +%% Note: this function is more or less generic and it's compatible +%% with this module's `generate_chunks' function family, as well as +%% `ets:next', lists and what not +%% +%% Consider using simplified versions of this function +-spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}). +interleave_streams(Streams) -> + do_interleave_streams( + #interleave_state{streams = Streams} + ). + +%% @doc Take an arbitrary stream and add repetitions of the elements +%% TODO: Make retransmissions of arbitrary length +-spec retransmits(stream(Data), float()) -> stream(Data). +retransmits(Stream, Probability) -> + case Stream of + [Data | Cont0] -> + Cont = fun() -> retransmits(next(Cont0), Probability) end, + case rand:uniform() < Probability of + true -> [Data, Data | Cont]; + false -> [Data | Cont] + end; + ?end_of_stream -> + ?end_of_stream + end. + +%% @doc Consume all elements of the stream and feed them into a +%% callback (e.g. brod:produce) +-spec consume( + stream(A), + fun((A) -> Ret) +) -> [Ret]. +consume(Stream, Callback) -> + case Stream of + [Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)]; + ?end_of_stream -> [] + end. + +%% @equiv consume(Stream, fun(A) -> A end) +-spec consume(stream(A)) -> [A]. +consume(Stream) -> + consume(Stream, fun(A) -> A end). + +%% ----------------------------------------------------------------------------- +%% Misc functions +%% ----------------------------------------------------------------------------- + +%% @doc Return number of bytes in `N' megabytes +-spec mb(integer()) -> integer(). +mb(N) -> + N * 1048576. + +%% ----------------------------------------------------------------------------- +%% List streams +%% ----------------------------------------------------------------------------- +-spec list_to_stream([A]) -> stream(A). +list_to_stream(L) -> L. + +%% ----------------------------------------------------------------------------- +%% Binary streams +%% ----------------------------------------------------------------------------- + +%% @doc First argument is a chunk number, the second one is a seed. +%% This implementation is hardly efficient, but it was chosen for +%% clarity reasons +-spec generator_fun(integer(), binary()) -> binary(). +generator_fun(N, Seed) -> + crypto:hash(md5, <>). + +%% @doc Get byte at offset `N' +-spec get_byte(integer(), term()) -> byte(). +get_byte(N, Seed) -> + do_get_byte(N, seed_hash(Seed)). + +%% @doc Stream of binary chunks. Limitation: both payload size and +%% `ChunkSize' should be dividable by `?hash_size' +-spec generate_chunk(payload(), integer()) -> stream(binary_payload()). +generate_chunk({Seed, Size}, ChunkSize) when + ChunkSize rem ?hash_size =:= 0 +-> + State = #chunk_state{ + seed = Seed, + payload_size = Size, + chunk_size = ChunkSize, + offset = 0 + }, + generate_chunk(State). + +%% @doc Take a list of `payload()'s and a callback function, and start +%% producing the payloads in random order. Seed is used as a tag +%% @see interleave_streams/4 +-spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) -> + tagged_binstream(). +interleave_chunks(Streams0) -> + Streams = [ + {Tag, generate_chunk(Payload, ChunkSize)} + || {Payload = {Tag, _}, ChunkSize} <- Streams0 + ], + interleave_streams(Streams). + +%% @doc Take a list of `payload()'s and a callback function, and start +%% consuming the payloads in a random order. Seed is used as a +%% tag. All streams use the same chunk size +%% @see interleave_streams/2 +-spec interleave_chunks( + [payload()], + non_neg_integer() +) -> tagged_binstream(). +interleave_chunks(Streams0, ChunkSize) -> + Streams = [ + {Seed, generate_chunk({Seed, Size}, ChunkSize)} + || {Seed, Size} <- Streams0 + ], + interleave_streams(Streams). + +%% @doc Generate chunks of data and feed them into +%% `Callback' +-spec generate_chunks( + payload(), + integer(), + fun((binary()) -> A) +) -> [A]. +generate_chunks(Payload, ChunkSize, Callback) -> + consume(generate_chunk(Payload, ChunkSize), Callback). + +-spec check_consistency( + payload(), + integer(), + fun((integer()) -> {ok, binary()} | undefined) +) -> ok. +check_consistency({Seed, Size}, SampleSize, Callback) -> + SeedHash = seed_hash(Seed), + Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)], + %% Always check first and last bytes, and one that should not exist: + Samples = [0, Size - 1, Size | Random], + lists:foreach( + fun + (N) when N < Size -> + Expected = do_get_byte(N, SeedHash), + ?assertEqual( + {N, {ok, Expected}}, + {N, Callback(N)} + ); + (N) -> + ?assertMatch(undefined, Callback(N)) + end, + Samples + ). + +-spec check_file_consistency( + payload(), + integer(), + file:filename() +) -> ok. +check_file_consistency(Payload, SampleSize, FileName) -> + {ok, FD} = file:open(FileName, [read, raw]), + try + Fun = fun(N) -> + case file:pread(FD, [{N, 1}]) of + {ok, [[X]]} -> {ok, X}; + {ok, [eof]} -> undefined + end + end, + check_consistency(Payload, SampleSize, Fun) + after + file:close(FD) + end. + +%% ============================================================================= +%% Internal functions +%% ============================================================================= + +-spec do_interleave_streams(interleave_state()) -> stream(_Data). +do_interleave_streams(#interleave_state{streams = []}) -> + ?end_of_stream; +do_interleave_streams(#interleave_state{streams = Streams} = State0) -> + %% Not the most efficient implementation (lots of avoidable list + %% traversals), but we don't expect the number of streams to be the + %% bottleneck + N = rand:uniform(length(Streams)), + {Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams), + case SC of + [Payload | SC1] -> + State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]}, + Cont = fun() -> do_interleave_streams(State) end, + [{Tag, Payload} | Cont]; + ?end_of_stream -> + State = State0#interleave_state{streams = Hd ++ Tl}, + do_interleave_streams(State) + end. + +%% @doc Continue generating chunks +-spec generate_chunk(chunk_state()) -> stream(binary()). +generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when + Offset >= Size +-> + ?end_of_stream; +generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) -> + State = State0#chunk_state{offset = Offset + ChunkSize}, + Payload = generate_chunk( + State#chunk_state.seed, + Offset, + ChunkSize, + State#chunk_state.payload_size + ), + [Payload | fun() -> generate_chunk(State) end]. + +generate_chunk(Seed, Offset, ChunkSize, Size) -> + SeedHash = seed_hash(Seed), + To = min(Offset + ChunkSize, Size) - 1, + Payload = iolist_to_binary([ + generator_fun(I, SeedHash) + || I <- lists:seq(Offset div 16, To div 16) + ]), + ChunkNum = Offset div ChunkSize + 1, + ChunkCnt = ceil(Size / ChunkSize), + {Payload, ChunkNum, ChunkCnt}. + +%% @doc Hash any term +-spec seed_hash(term()) -> binary(). +seed_hash(Seed) -> + crypto:hash(md5, term_to_binary(Seed)). + +%% @private Get byte at offset `N' +-spec do_get_byte(integer(), binary()) -> byte(). +do_get_byte(N, Seed) -> + Chunk = generator_fun(N div ?hash_size, Seed), + binary:at(Chunk, N rem ?hash_size). diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl new file mode 100644 index 000000000..7452906b8 --- /dev/null +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -0,0 +1,464 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(prop_replay_message_storage). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(WORK_DIR, ["_build", "test"]). +-define(RUN_ID, {?MODULE, testrun_id}). + +-define(ZONE, ?MODULE). +-define(GEN_ID, 42). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_bitstring_computes() -> + ?FORALL( + Keymapper, + keymapper(), + ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin + BS = emqx_ds_message_storage_bitmask:compute_bitstring(Topic, Timestamp, Keymapper), + is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper))) + end) + ). + +prop_topic_bitmask_computes() -> + Keymapper = make_keymapper(16, [8, 12, 16], 100), + ?FORALL(TopicFilter, topic_filter(), begin + Mask = emqx_ds_message_storage_bitmask:compute_topic_bitmask(TopicFilter, Keymapper), + % topic bits + timestamp LSBs + is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) + end). + +prop_next_seek_monotonic() -> + ?FORALL( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + begin + Filter = emqx_ds_message_storage_bitmask:make_keyspace_filter( + {TopicFilter, StartTime}, + Keymapper + ), + ?FORALL( + Bitstring, + bitstr(get_keymapper_bitsize(Keymapper)), + emqx_ds_message_storage_bitmask:compute_next_seek(Bitstring, Filter) >= Bitstring + ) + end + ). + +prop_next_seek_eq_initial_seek() -> + ?FORALL( + Filter, + keyspace_filter(), + emqx_ds_message_storage_bitmask:compute_initial_seek(Filter) =:= + emqx_ds_message_storage_bitmask:compute_next_seek(0, Filter) + ). + +prop_iterate_messages() -> + TBPL = [4, 8, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 200 + }, + % TODO + % Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit. + ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin + Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), + {DB, Handle} = open_db(Filepath, Options), + Shim = emqx_ds_message_storage_bitmask_shim:open(), + ok = store_db(DB, Stream), + ok = store_shim(Shim, Stream), + ?FORALL( + { + {Topic, _}, + Pattern, + StartTime + }, + { + nth(Stream), + topic_filter_pattern(), + start_time() + }, + begin + TopicFilter = make_topic_filter(Pattern, Topic), + Iteration = {TopicFilter, StartTime}, + Messages = iterate_db(DB, Iteration), + Reference = iterate_shim(Shim, Iteration), + ok = close_db(Handle), + ok = emqx_ds_message_storage_bitmask_shim:close(Shim), + ?WHENFAIL( + begin + io:format(user, " *** Filepath = ~s~n", [Filepath]), + io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]), + io:format(user, " *** StartTime = ~p~n", [StartTime]) + end, + is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages) + ) + end + ) + end). + +prop_iterate_eq_iterate_with_preserve_restore() -> + TBPL = [4, 8, 16, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 500 + }, + {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), + ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin + % TODO + % This proptest is impure because messages from testruns assumed to be + % independent of each other are accumulated in the same storage. This + % would probably confuse shrinker in the event a testrun fails. + ok = store_db(DB, Stream), + ?FORALL( + { + {Topic, _}, + Pat, + StartTime, + Commands + }, + { + nth(Stream), + topic_filter_pattern(), + start_time(), + shuffled(flat([non_empty(list({preserve, restore})), list(iterate)])) + }, + begin + Replay = {make_topic_filter(Pat, Topic), StartTime}, + Iterator = make_iterator(DB, Replay), + Ctx = #{db => DB, replay => Replay}, + Messages = run_iterator_commands(Commands, Iterator, Ctx), + equals(Messages, iterate_db(DB, Replay)) + end + ) + end). + +prop_iterate_eq_iterate_with_refresh() -> + TBPL = [4, 8, 16, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 500 + }, + {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), + ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin + % TODO + % This proptest is also impure, see above. + ok = store_db(DB, Stream), + ?FORALL( + { + {Topic, _}, + Pat, + StartTime, + RefreshEvery + }, + { + nth(Stream), + topic_filter_pattern(), + start_time(), + pos_integer() + }, + ?TIMEOUT(5000, begin + Replay = {make_topic_filter(Pat, Topic), StartTime}, + IterationOptions = #{iterator_refresh => {every, RefreshEvery}}, + Iterator = make_iterator(DB, Replay, IterationOptions), + Messages = iterate_db(Iterator), + equals(Messages, iterate_db(DB, Replay)) + end) + ) + end). + +% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> +% MessageID = emqx_guid:gen(), +% PublishedAt = ChunkNum, +% MessageID, PublishedAt, Topic +% ]), +% ok = emqx_ds_message_storage_bitmask:store(DB, MessageID, PublishedAt, Topic, Payload), +% store_message_stream(DB, payload_gen:next(Rest)); +% store_message_stream(_Zone, []) -> +% ok. + +store_db(DB, Messages) -> + lists:foreach( + fun({Topic, Payload = {MessageID, Timestamp, _}}) -> + Bin = term_to_binary(Payload), + emqx_ds_message_storage_bitmask:store(DB, MessageID, Timestamp, Topic, Bin) + end, + Messages + ). + +iterate_db(DB, Iteration) -> + iterate_db(make_iterator(DB, Iteration)). + +iterate_db(It) -> + case emqx_ds_message_storage_bitmask:next(It) of + {value, Payload, ItNext} -> + [binary_to_term(Payload) | iterate_db(ItNext)]; + none -> + [] + end. + +make_iterator(DB, Replay) -> + {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay), + It. + +make_iterator(DB, Replay, Options) -> + {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay, Options), + It. + +run_iterator_commands([iterate | Rest], It, Ctx) -> + case emqx_ds_message_storage_bitmask:next(It) of + {value, Payload, ItNext} -> + [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)]; + none -> + [] + end; +run_iterator_commands([{preserve, restore} | Rest], It, Ctx) -> + #{ + db := DB, + replay := Replay + } = Ctx, + Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It), + {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Replay, Serial), + run_iterator_commands(Rest, ItNext, Ctx); +run_iterator_commands([], It, _Ctx) -> + iterate_db(It). + +store_shim(Shim, Messages) -> + lists:foreach( + fun({Topic, Payload = {MessageID, Timestamp, _}}) -> + Bin = term_to_binary(Payload), + emqx_ds_message_storage_bitmask_shim:store(Shim, MessageID, Timestamp, Topic, Bin) + end, + Messages + ). + +iterate_shim(Shim, Iteration) -> + lists:map( + fun binary_to_term/1, + emqx_ds_message_storage_bitmask_shim:iterate(Shim, Iteration) + ). + +%%-------------------------------------------------------------------- +%% Setup / teardown +%%-------------------------------------------------------------------- + +open_db(Filepath, Options) -> + {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), + {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options), + DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), + {DB, Handle}. + +close_db(Handle) -> + rocksdb:close(Handle). + +make_filepath(TC) -> + make_filepath(TC, 0). + +make_filepath(TC, InstID) -> + Name = io_lib:format("~0p.~0p", [TC, InstID]), + Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]), + ok = filelib:ensure_dir(Path), + Path. + +get_run_id() -> + case persistent_term:get(?RUN_ID, undefined) of + RunID when RunID /= undefined -> + RunID; + undefined -> + RunID = make_run_id(), + ok = persistent_term:put(?RUN_ID, RunID), + RunID + end. + +make_run_id() -> + calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]). + +%%-------------------------------------------------------------------- +%% Type generators +%%-------------------------------------------------------------------- + +topic() -> + non_empty(list(topic_level())). + +topic(EntropyWeights) -> + ?LET(L, scaled(1 / 4, list(1)), begin + EWs = lists:sublist(EntropyWeights ++ L, length(L)), + ?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs]) + end). + +topic_filter() -> + ?SUCHTHAT( + L, + non_empty( + list( + frequency([ + {5, topic_level()}, + {2, '+'}, + {1, '#'} + ]) + ) + ), + not lists:member('#', L) orelse lists:last(L) == '#' + ). + +topic_level_pattern() -> + frequency([ + {5, level}, + {2, '+'}, + {1, '#'} + ]). + +topic_filter_pattern() -> + list(topic_level_pattern()). + +topic_filter(Topic) -> + ?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)). + +make_topic_filter([], _) -> + []; +make_topic_filter(_, []) -> + []; +make_topic_filter(['#' | _], _) -> + ['#']; +make_topic_filter(['+' | Rest], [_ | Levels]) -> + ['+' | make_topic_filter(Rest, Levels)]; +make_topic_filter([level | Rest], [L | Levels]) -> + [L | make_topic_filter(Rest, Levels)]. + +% topic() -> +% ?LAZY(?SIZED(S, frequency([ +% {S, [topic_level() | topic()]}, +% {1, []} +% ]))). + +% topic_filter() -> +% ?LAZY(?SIZED(S, frequency([ +% {round(S / 3 * 2), [topic_level() | topic_filter()]}, +% {round(S / 3 * 1), ['+' | topic_filter()]}, +% {1, []}, +% {1, ['#']} +% ]))). + +topic_level() -> + ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). + +topic_level(Entropy) -> + S = floor(1 + math:log2(Entropy) / 4), + ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). + +topic_level_fixed() -> + oneof([ + <<"foo">>, + <<"bar">>, + <<"baz">>, + <<"xyzzy">> + ]). + +keymapper() -> + ?LET( + {TimestampBits, TopicBits, Epoch}, + { + range(0, 128), + non_empty(list(range(1, 32))), + pos_integer() + }, + make_keymapper(TimestampBits, TopicBits, Epoch * 100) + ). + +keyspace_filter() -> + ?LET( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + emqx_ds_message_storage_bitmask:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) + ). + +messages(Topic) -> + ?LET( + Ts, + list(Topic), + interleaved( + ?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages)) + ) + ). + +message() -> + ?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}). + +message_streams(Topic) -> + ?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]). + +timestamp() -> + scaled(20, pos_integer()). + +start_time() -> + scaled(10, pos_integer()). + +bitstr(Size) -> + ?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)). + +nth(L) -> + ?LET(I, range(1, length(L)), lists:nth(I, L)). + +scaled(Factor, T) -> + ?SIZED(S, resize(ceil(S * Factor), T)). + +interleaved(T) -> + ?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))). + +shuffled(T) -> + ?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))). + +flat(T) -> + ?LET(L, T, lists:flatten(L)). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> + emqx_ds_message_storage_bitmask:make_keymapper(#{ + timestamp_bits => TimestampBits, + topic_bits_per_level => TopicBits, + epoch => MaxEpoch + }). + +get_keymapper_bitsize(Keymapper) -> + maps:get(bitsize, emqx_ds_message_storage_bitmask:keymapper_info(Keymapper)). + +-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). +interleave(Seqs, Rng) -> + interleave(Seqs, length(Seqs), Rng). + +interleave(Seqs, L, Rng) when L > 0 -> + {N, RngNext} = rand:uniform_s(L, Rng), + {SeqHead, SeqTail} = lists:split(N - 1, Seqs), + case SeqTail of + [{Tag, [M | Rest]} | SeqRest] -> + [{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)]; + [{_, []} | SeqRest] -> + interleave(SeqHead ++ SeqRest, L - 1, RngNext) + end; +interleave([], 0, _) -> + []. + +-spec shuffle(list(E), rand:state()) -> list(E). +shuffle(L, Rng) -> + {Rands, _} = randoms(length(L), Rng), + [E || {_, E} <- lists:sort(lists:zip(Rands, L))]. + +randoms(N, Rng) when N > 0 -> + {Rand, RngNext} = rand:uniform_s(Rng), + {Tail, RngFinal} = randoms(N - 1, RngNext), + {[Rand | Tail], RngFinal}; +randoms(_, Rng) -> + {[], Rng}.