Merge pull request #13370 from ieQu1/dev/skip-streams

New durable storage layout with explicit index for LTS wildcards
This commit is contained in:
ieQu1 2024-07-09 20:27:21 +02:00 committed by GitHub
commit 92dc059908
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 2477 additions and 268 deletions

View File

@ -234,6 +234,42 @@ fields(layout_builtin_wildcard_optimized) ->
}
)}
];
fields(layout_builtin_wildcard_optimized_v2) ->
[
{type,
sc(
wildcard_optimized_v2,
#{
'readOnly' => true,
default => wildcard_optimized_v2,
desc => ?DESC(layout_builtin_wildcard_optimized_type)
}
)},
{bytes_per_topic_level,
sc(
range(1, 16),
#{
default => 8,
importance => ?IMPORTANCE_HIDDEN
}
)},
{topic_index_bytes,
sc(
pos_integer(),
#{
default => 8,
importance => ?IMPORTANCE_HIDDEN
}
)},
{serialization_schema,
sc(
emqx_ds_msg_serializer:schema(),
#{
default => v1,
importance => ?IMPORTANCE_HIDDEN
}
)}
];
fields(layout_builtin_reference) ->
[
{type,
@ -242,6 +278,7 @@ fields(layout_builtin_reference) ->
#{
'readOnly' => true,
importance => ?IMPORTANCE_LOW,
default => reference,
desc => ?DESC(layout_builtin_reference_type)
}
)}
@ -284,7 +321,7 @@ common_builtin_fields() ->
importance => ?IMPORTANCE_MEDIUM,
default =>
#{
<<"type">> => wildcard_optimized
<<"type">> => wildcard_optimized_v2
}
}
)}
@ -298,6 +335,8 @@ desc(builtin_write_buffer) ->
?DESC(builtin_write_buffer);
desc(layout_builtin_wildcard_optimized) ->
?DESC(layout_builtin_wildcard_optimized);
desc(layout_builtin_wildcard_optimized_v2) ->
?DESC(layout_builtin_wildcard_optimized);
desc(layout_builtin_reference) ->
?DESC(layout_builtin_reference);
desc(_) ->
@ -307,6 +346,19 @@ desc(_) ->
%% Internal functions
%%================================================================================
translate_layout(
#{
type := wildcard_optimized_v2,
bytes_per_topic_level := BytesPerTopicLevel,
topic_index_bytes := TopicIndexBytes,
serialization_schema := SSchema
}
) ->
{emqx_ds_storage_skipstream_lts, #{
wildcard_hash_bytes => BytesPerTopicLevel,
topic_index_bytes => TopicIndexBytes,
serialization_schema => SSchema
}};
translate_layout(
#{
type := wildcard_optimized,
@ -336,7 +388,11 @@ builtin_layouts() ->
%% suitable for production use. However, it's very simple and
%% produces a very predictabale replay order, which can be useful
%% for testing and debugging:
[ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
[
ref(layout_builtin_wildcard_optimized_v2),
ref(layout_builtin_wildcard_optimized),
ref(layout_builtin_reference)
].
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -64,6 +64,7 @@
-export([work_dir/1]).
-export([work_dir/2]).
-export([clean_work_dir/1]).
-export([load_apps/1]).
-export([start_apps/2]).
@ -162,6 +163,7 @@ start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
% 4. Setup isolated mnesia directory
ok = emqx_common_test_helpers:load(mnesia),
ok = application:set_env(mnesia, dir, filename:join([WorkDir, mnesia])),
ok = application:set_env(emqx_durable_storage, db_data_dir, filename:join([WorkDir, ds])),
% 5. Start ekka separately.
% For some reason it's designed to be started in non-regular way, so we have to track
% applications started in the process manually.
@ -432,6 +434,16 @@ work_dir(TCName, CTConfig) ->
WorkDir = work_dir(CTConfig),
filename:join(WorkDir, TCName).
%% @doc Delete contents of the workdir.
clean_work_dir(WorkDir) ->
ct:pal("Cleaning workdir ~p", [WorkDir]),
case re:run(WorkDir, "./_build/test/logs/") of
{match, _} ->
file:del_dir_r(WorkDir);
nomatch ->
error({unsafe_workdir, WorkDir})
end.
%%
start_ekka() ->

View File

@ -599,6 +599,9 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok.
suite() ->
[{timetrap, 50_000}].
init_per_testcase(TC, Config) ->
Apps = emqx_cth_suite:start(
[emqx_durable_storage, emqx_ds_backends],

View File

@ -46,6 +46,12 @@
shard_of_message/4
]).
%% Internal exports:
-export([
do_next/3,
do_delete_next/4
]).
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
-include_lib("emqx_utils/include/emqx_message.hrl").
@ -295,19 +301,8 @@ update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0
end.
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
ShardId = {DB, Shard},
T0 = erlang:monotonic_time(microsecond),
Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)),
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0),
case Result of
{ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch};
Other ->
Other
end.
next(DB, Iter, N) ->
with_worker(do_next, [DB, Iter, N]).
-spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[emqx_ds:ds_specific_delete_stream()].
@ -347,7 +342,36 @@ make_delete_iterator(DB, ?delete_stream(Shard, InnerStream), TopicFilter, StartT
-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N) ->
delete_next(DB, Iter, Selector, N) ->
with_worker(do_delete_next, [DB, Iter, Selector, N]).
%%================================================================================
%% Internal exports
%%================================================================================
current_timestamp(ShardId) ->
emqx_ds_builtin_local_meta:current_timestamp(ShardId).
-spec do_next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
do_next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
ShardId = {DB, Shard},
T0 = erlang:monotonic_time(microsecond),
Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)),
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0),
case Result of
{ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch};
Other ->
Other
end.
-spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
do_delete_next(
DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N
) ->
ShardId = {DB, Shard},
case
emqx_ds_storage_layer:delete_next(
@ -362,13 +386,6 @@ delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIte
Error
end.
%%================================================================================
%% Internal exports
%%================================================================================
current_timestamp(ShardId) ->
emqx_ds_builtin_local_meta:current_timestamp(ShardId).
%%================================================================================
%% Internal functions
%%================================================================================
@ -380,3 +397,20 @@ timeus_to_timestamp(undefined) ->
undefined;
timeus_to_timestamp(TimestampUs) ->
TimestampUs div 1000.
with_worker(F, A) ->
Parent = self(),
Ref = make_ref(),
{_Pid, MRef} = spawn_opt(
fun() ->
Parent ! {Ref, apply(?MODULE, F, A)}
end,
[monitor, {min_heap_size, 10000}]
),
receive
{Ref, Result} ->
erlang:demonitor(MRef, [flush]),
Result;
{'DOWN', MRef, _, _, _, Info} ->
{error, unrecoverable, Info}
end.

View File

@ -1,3 +1,36 @@
# `emqx_ds_builtin_raft`
Replication layer for the builtin EMQX durable storage backend that uses Raft algorithm.
Raft backend introduces the concept of **site** to alleviate the problem of changing node names.
Site IDs are persistent, and they are randomly generated at the first startup of the node.
Each node in the cluster has a unique site ID, that is independent from the Erlang node name (`emqx@...`).
## Configurations
OTP application environment variables:
- `emqx_durable_storage.reads`: `leader_preferred` | `local_preferred`.
# CLI
Runtime settings for the durable storages can be modified via CLI as well as the REST API.
The following CLI commands are available:
- `emqx ctl ds info` — get a quick overview of the durable storage state
- `emqx ctl ds set_replicas <DS> <Site1> <Site2> ...` — update the list of replicas for a durable storage.
- `emqx ctl ds join <DS> <Site>` — add a replica of durable storage on the site
- `emqx ctl ds leave <DS> <Site>` — remove a replica of a durable storage from the site
# HTTP APIs
The following REST APIs are available for managing the builtin durable storages:
- `/ds/sites` — list known sites.
- `/ds/sites/:site` — get information about the site (its status, current EMQX node name managing the site, etc.)
- `/ds/storages` — list durable storages
- `/ds/storages/:ds` — get information about the durable storage and its shards
- `/ds/storages/:ds/replicas` — list or update sites that contain replicas of a durable storage
- `/ds/storages/:ds/replicas/:site` — add or remove replica of the durable storage on the site

View File

@ -29,15 +29,12 @@
emqx_ds_test_helpers:on(NODES, fun() -> BODY end)
).
opts() ->
opts(#{}).
opts(Overrides) ->
opts(Config, Overrides) ->
Layout = ?config(layout, Config),
maps:merge(
#{
backend => builtin_raft,
%% storage => {emqx_ds_storage_reference, #{}},
storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}},
storage => Layout,
n_shards => 16,
n_sites => 1,
replication_factor => 3,
@ -58,7 +55,7 @@ appspec(emqx_durable_storage) ->
t_metadata(init, Config) ->
Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}),
[{apps, Apps} | Config];
t_metadata('end', Config) ->
@ -108,7 +105,7 @@ t_replication_transfers_snapshots(init, Config) ->
{t_replication_transfers_snapshots2, #{apps => Apps}},
{t_replication_transfers_snapshots3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
Nodes = emqx_cth_cluster:start(NodeSpecs),
[{nodes, Nodes}, {specs, NodeSpecs} | Config];
@ -125,9 +122,10 @@ t_replication_transfers_snapshots(Config) ->
Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
?check_trace(
#{timetrap => 30_000},
begin
%% Initialize DB on all nodes and wait for it to be online.
Opts = opts(#{n_shards => 1, n_sites => 3}),
Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
@ -139,8 +137,11 @@ t_replication_transfers_snapshots(Config) ->
),
%% Stop the DB on the "offline" node.
ok = emqx_cth_cluster:stop_node(NodeOffline),
_ = ?block_until(#{?snk_kind := ds_ra_state_enter, state := leader}, 500, 0),
?wait_async_action(
ok = emqx_cth_cluster:stop_node(NodeOffline),
#{?snk_kind := ds_ra_state_enter, state := leader},
5_000
),
%% Fill the storage with messages and few additional generations.
emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),
@ -153,9 +154,10 @@ t_replication_transfers_snapshots(Config) ->
?snk_meta := #{node := NodeOffline}
})
),
?assertEqual(
ok,
erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
ok = ?ON(
NodeOffline,
emqx_ds:open_db(?DB, opts(Config, #{}))
),
%% Trigger storage operation and wait the replica to be restored.
@ -183,7 +185,7 @@ t_rebalance(init, Config) ->
{t_rebalance3, #{apps => Apps}},
{t_rebalance4, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
[{nodes, Nodes} | Config];
t_rebalance('end', Config) ->
@ -206,7 +208,7 @@ t_rebalance(Config) ->
begin
Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes],
%% 1. Initialize DB on the first node.
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
[
?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts)))
|| Node <- Nodes
@ -218,7 +220,7 @@ t_rebalance(Config) ->
{ok, [_]},
?ON(N1, emqx_ds_replication_layer_meta:assign_db_sites(?DB, [S1]))
),
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?retry(500, 10, ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB))),
ct:pal("Sites: ~p~n", [Sites]),
@ -293,7 +295,7 @@ t_rebalance(Config) ->
ct:pal("Transitions (~p -> ~p): ~p~n", [
Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
]),
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))),
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))),
%% Verify that at the end each node is now responsible for each shard.
?defer_assert(
@ -316,7 +318,7 @@ t_join_leave_errors(init, Config) ->
{t_join_leave_errors1, #{apps => Apps}},
{t_join_leave_errors2, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
[{nodes, Nodes} | Config];
t_join_leave_errors('end', Config) ->
@ -327,7 +329,7 @@ t_join_leave_errors(Config) ->
%% join/leave operations are reported correctly.
[N1, N2] = ?config(nodes, Config),
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?FUNCTION_NAME, Opts])),
?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?FUNCTION_NAME, Opts])),
@ -370,7 +372,7 @@ t_join_leave_errors(Config) ->
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?FUNCTION_NAME, S1])),
?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)),
?retry(
1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME))
1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME))
),
%% Should be no-op.
@ -385,7 +387,7 @@ t_rebalance_chaotic_converges(init, Config) ->
{t_rebalance_chaotic_converges2, #{apps => Apps}},
{t_rebalance_chaotic_converges3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
[{nodes, Nodes} | Config];
t_rebalance_chaotic_converges('end', Config) ->
@ -411,7 +413,7 @@ t_rebalance_chaotic_converges(Config) ->
ct:pal("Sites: ~p~n", [Sites]),
%% Initialize DB on first two nodes.
Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 16, n_sites => 2, replication_factor => 3}),
%% Open DB:
?assertEqual(
@ -456,7 +458,7 @@ t_rebalance_chaotic_converges(Config) ->
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
%% Wait for the last transition to complete.
?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?retry(1000, 30, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?defer_assert(
?assertEqual(
@ -482,7 +484,7 @@ t_rebalance_offline_restarts(init, Config) ->
{t_rebalance_offline_restarts2, #{apps => Apps}},
{t_rebalance_offline_restarts3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
Nodes = emqx_cth_cluster:start(Specs),
[{nodes, Nodes}, {nodespecs, Specs} | Config];
@ -498,7 +500,7 @@ t_rebalance_offline_restarts(Config) ->
_Specs = [NS1, NS2, _] = ?config(nodespecs, Config),
%% Initialize DB on all 3 nodes.
Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
@ -544,7 +546,7 @@ t_drop_generation(Config) ->
{t_drop_generation3, #{apps => Apps}}
],
#{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}
),
@ -552,7 +554,7 @@ t_drop_generation(Config) ->
?check_trace(
try
%% Initialize DB on all 3 nodes.
Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 1, n_sites => 3, replication_factor => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
@ -614,21 +616,21 @@ t_drop_generation(Config) ->
t_error_mapping_replication_layer(init, Config) ->
Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}),
[{apps, Apps} | Config];
t_error_mapping_replication_layer('end', Config) ->
emqx_cth_suite:stop(?config(apps, Config)),
Config.
t_error_mapping_replication_layer(_Config) ->
t_error_mapping_replication_layer(Config) ->
%% This checks that the replication layer maps recoverable errors correctly.
ok = emqx_ds_test_helpers:mock_rpc(),
ok = snabbkaffe:start_trace(),
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
[Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
TopicFilter = emqx_topic:words(<<"foo/#">>),
@ -695,7 +697,7 @@ t_error_mapping_replication_layer(_Config) ->
Results2 = lists:map(
fun(Iter) ->
case emqx_ds:next(DB, Iter, _BatchSize = 42) of
Ok = {ok, _Iter, [_ | _]} ->
Ok = {ok, _Iter, _} ->
Ok;
Error = {error, recoverable, {badrpc, _}} ->
Error;
@ -716,20 +718,20 @@ t_error_mapping_replication_layer(_Config) ->
%% problems.
t_store_batch_fail(init, Config) ->
Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}),
[{apps, Apps} | Config];
t_store_batch_fail('end', Config) ->
emqx_cth_suite:stop(?config(apps, Config)),
Config.
t_store_batch_fail(_Config) ->
t_store_batch_fail(Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
#{timetrap => 15_000},
try
meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
ok = meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
%% Success:
Batch1 = [
message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
@ -737,7 +739,7 @@ t_store_batch_fail(_Config) ->
],
?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
%% Inject unrecoverable error:
meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
ok = meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
{error, unrecoverable, mock}
end),
Batch2 = [
@ -747,10 +749,10 @@ t_store_batch_fail(_Config) ->
?assertMatch(
{error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
),
meck:unload(emqx_ds_storage_layer),
ok = meck:unload(emqx_ds_storage_layer),
%% Inject a recoveralbe error:
meck:new(ra, [passthrough, no_history]),
meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
ok = meck:new(ra, [passthrough, no_history]),
ok = meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}),
{timeout, mock}
end),
@ -766,9 +768,9 @@ t_store_batch_fail(_Config) ->
{error, recoverable, {timeout, mock}},
emqx_ds:store_batch(DB, Batch3, #{sync => true})
),
meck:unload(ra),
ok = meck:unload(ra),
?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 0))
after
meck:unload()
end,
@ -803,7 +805,7 @@ t_crash_restart_recover(init, Config) ->
{t_crash_stop_recover2, #{apps => Apps}},
{t_crash_stop_recover3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
Nodes = emqx_cth_cluster:start(Specs),
[{nodes, Nodes}, {nodespecs, Specs} | Config];
@ -815,7 +817,7 @@ t_crash_restart_recover(Config) ->
%% correctly preserved.
Nodes = [N1, N2, N3] = ?config(nodes, Config),
_Specs = [_, NS2, NS3] = ?config(nodespecs, Config),
DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}),
DBOpts = opts(Config, #{n_shards => 16, n_sites => 3, replication_factor => 3}),
%% Prepare test event stream.
NMsgs = 400,
@ -856,7 +858,10 @@ t_crash_restart_recover(Config) ->
MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}),
{ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity),
{timeout, Events} = snabbkaffe:receive_events(SubRef),
LostMessages = [M || #{batch := Messages} <- Events, M <- Messages],
LostMessages = [
emqx_ds_test_helpers:message_canonical_form(M)
|| #{batch := Messages} <- Events, M <- Messages
],
ct:pal("Some messages were lost: ~p", [LostMessages]),
?assert(length(LostMessages) < NMsgs div 20),
@ -876,8 +881,16 @@ t_crash_restart_recover(Config) ->
%% Does any messages were lost unexpectedly?
{_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)),
ExpectedMessages = emqx_utils_stream:consume(ExpectedStream),
MissingMessages = ExpectedMessages -- DSMessages,
?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages))
MissingMessages = emqx_ds_test_helpers:message_set_subtract(
ExpectedMessages, DSMessages
),
?defer_assert(
?assertEqual(
[],
emqx_ds_test_helpers:sublist(MissingMessages -- LostMessages),
emqx_ds_test_helpers:sublist(DSMessages)
)
)
end,
lists:foreach(VerifyClient, TopicStreams)
end,
@ -984,12 +997,36 @@ sample(N, List) ->
suite() -> [{timetrap, {seconds, 60}}].
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
[{group, Grp} || {Grp, _} <- groups()].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{bitfield_lts, TCs},
{skipstream_lts, TCs}
].
init_per_group(Group, Config) ->
LayoutConf =
case Group of
skipstream_lts ->
{emqx_ds_storage_skipstream_lts, #{with_guid => true}};
bitfield_lts ->
{emqx_ds_storage_bitfield_lts, #{}}
end,
[{layout, LayoutConf} | Config].
end_per_group(_Group, Config) ->
Config.
init_per_testcase(TCName, Config0) ->
Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0),
Config.
Config1 = [{work_dir, emqx_cth_suite:work_dir(TCName, Config0)} | Config0],
emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config1).
end_per_testcase(TCName, Config) ->
ok = snabbkaffe:stop(),
emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
Result = emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config),
catch emqx_ds:drop_db(TCName),
emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
Result.

View File

@ -13,11 +13,10 @@ This makes the storage disk requirements very predictable: only the number of _p
DS _backend_ is a callback module that implements `emqx_ds` behavior.
EMQX repository contains the "builtin" backend, implemented in `emqx_ds_replication_layer` module, that uses Raft algorithm for data replication, and RocksDB as the main storage.
EMQX repository contains two builtin backends based on RocksDB:
Note that builtin backend introduces the concept of **site** to alleviate the problem of changing node names.
Site IDs are persistent, and they are randomly generated at the first startup of the node.
Each node in the cluster has a unique site ID, that is independent from the Erlang node name (`emqx@...`).
- `emqx_ds_builtin_local`
- `emqx_ds_builtin_raft`
### Layout
@ -113,8 +112,8 @@ In the future it can serve as a storage for retained messages or as a generic me
# Configurations
Global options for `emqx_durable_storage` application are configured via OTP application environment.
Database-specific settings are stored in the schema table.
Common global options for builtin backends are configured via OTP application environment.
Database-specific settings are stored in EMQX config.
The following application environment variables are available:
@ -124,26 +123,9 @@ The following application environment variables are available:
- `emqx_durable_storage.egress_flush_interval`: period at which the batches of messages are committed to the durable storage.
- `emqx_durable_storage.reads`: `leader_preferred` | `local_preferred`.
Runtime settings for the durable storages can be modified via CLI as well as the REST API.
The following CLI commands are available:
- `emqx ctl ds info` — get a quick overview of the durable storage state
- `emqx ctl ds set_replicas <DS> <Site1> <Site2> ...` — update the list of replicas for a durable storage.
- `emqx ctl ds join <DS> <Site>` — add a replica of durable storage on the site
- `emqx ctl ds leave <DS> <Site>` — remove a replica of a durable storage from the site
# HTTP APIs
The following REST APIs are available for managing the builtin durable storages:
- `/ds/sites` — list known sites.
- `/ds/sites/:site` — get information about the site (its status, current EMQX node name managing the site, etc.)
- `/ds/storages` — list durable storages
- `/ds/storages/:ds` — get information about the durable storage and its shards
- `/ds/storages/:ds/replicas` — list or update sites that contain replicas of a durable storage
- `/ds/storages/:ds/replicas/:site` — add or remove replica of the durable storage on the site
None
# Other

View File

@ -0,0 +1,90 @@
-- This schema specifies binary encoding of EMQX's internal
-- representation of a message.
--
-- Note: MQTT standard specifies that certain properties like topic
-- should be UTF8 strings. Here we represent them as OCTET STRING to
-- avoid extra checks.
DurableMessage DEFINITIONS AUTOMATIC TAGS ::=
BEGIN
-- Non-standard flag:
MiscFlag ::= SEQUENCE {
key UTF8String,
value BOOLEAN
}
-- Non-standard header or property.
-- Both key and value are interpreted as erlang terms:
MiscProperty ::= SEQUENCE {
key OCTET STRING,
value OCTET STRING
}
ClientAttr ::= SEQUENCE {
key OCTET STRING,
value OCTET STRING
}
-- Wrapper for any data that doesn't comply with the strict schema:
Misc ::= CHOICE {
flag MiscFlag,
header MiscProperty,
property MiscProperty,
-- Currently these are unused:
clientAttr ClientAttr,
extra MiscProperty
}
-- Both key and value are interpreted as binaries:
UserProperty ::= SEQUENCE {
key OCTET STRING,
value OCTET STRING
}
-- Common properties that are present in almost any message:
StdProperties ::= SEQUENCE {
payloadFormatIndicator INTEGER (0..255) OPTIONAL,
messageExpiryInterval INTEGER (0..4294967295) OPTIONAL,
responseTopic OCTET STRING OPTIONAL,
correlationData OCTET STRING OPTIONAL,
contentType OCTET STRING OPTIONAL,
userProperty SEQUENCE OF UserProperty
}
ProtoVer ::= CHOICE {
mqtt INTEGER(0..255),
mqtt-sn INTEGER(0..255),
coap INTEGER(0..255)
}
-- Common headers that are present in almost any message:
StdHeaders ::= SEQUENCE {
protoVer ProtoVer OPTIONAL,
peerhost OCTET STRING (SIZE(4..16)) OPTIONAL, -- IPv4 (4 octets) .. IPv6 (16 octets)
peername OCTET STRING (SIZE(6..18)) OPTIONAL, -- IPv4 (4 octets) .. IPv6 (16 octets) + 2 octets for (TCP/UDP) port
username OCTET STRING OPTIONAL
}
From ::= CHOICE {
atom UTF8String,
binary OCTET STRING
}
DurableMessage ::= SEQUENCE {
id OCTET STRING,
from From,
topic OCTET STRING,
payload OCTET STRING,
timestamp INTEGER,
qos INTEGER (0..2),
-- MQTT PUBLISH flags:
sys BOOLEAN,
dup BOOLEAN,
retain BOOLEAN,
-- Headers:
headers StdHeaders,
properties StdProperties,
-- Miscellaneous, highly EMQX-specific internal data:
misc SEQUENCE OF Misc OPTIONAL
}
END

View File

@ -0,0 +1,223 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This script can be loaded to a running EMQX EE node. It will
%% create a number of DS databases with different options and fill
%% them with data of given size.
%%
%% Then it will measure size of the database directories and create
%% a "storage (in)efficiency" report.
-module(storage_efficiency).
-include_lib("emqx_utils/include/emqx_message.hrl").
%% API:
-export([run/0, run/1]).
%%================================================================================
%% API functions
%%================================================================================
run() ->
run(#{}).
run(Custom) ->
RunConf = maps:merge(
#{
%% Sleep between batches:
sleep => 1_000,
%% Don't run test, only plot data:
dry_run => false,
%% Payload size multiplier:
size => 10,
%% Number of batches:
batches => 100,
%% Add generation every N batches:
add_generation => 10
},
Custom
),
lists:foreach(
fun(DBConf) ->
run(DBConf, RunConf)
end,
configs()
).
%% erlfmt-ignore
gnuplot_script(Filename) ->
"set terminal qt\n"
%% "set logscale y 10\n"
"set title \"" ++ filename:basename(Filename, ".dat") ++ "\"\n"
"set key autotitle columnheader\n"
"plot for [n=2:*] \"" ++ Filename ++ "\" using 1:n with linespoints".
%%================================================================================
%% Internal functions
%%================================================================================
configs() ->
[
{'benchmark-skipstream-asn1',
db_conf({emqx_ds_storage_skipstream_lts, #{serialization_schema => asn1}})},
{'benchmark-skipstream-v1',
db_conf({emqx_ds_storage_skipstream_lts, #{serialization_schema => v1}})},
{'benchmark-bitfield', db_conf({emqx_ds_storage_bitfield_lts, #{}})}
].
db_conf(Storage) ->
#{
backend => builtin_local,
%% n_sites => 1,
n_shards => 1,
%% replication_factor => 1,
%% replication_options => #{},
storage => Storage
}.
-record(s, {
data_size = 0,
payload_size = 0,
n_messages = 0,
datapoints = #{},
x_axis = []
}).
run({DB, Config}, RunConf) ->
#{
batches := NBatches,
size := PSMultiplier,
add_generation := AddGeneration,
sleep := Sleep,
dry_run := DryRun
} = RunConf,
{ok, _} = application:ensure_all_started(emqx_ds_backends),
Dir = dir(DB),
Filename = atom_to_list(DB) ++ ".dat",
DryRun orelse
begin
io:format(user, "Running benchmark for ~p in ~p~n", [DB, Dir]),
%% Ensure safe directory:
{match, _} = re:run(Dir, filename:join("data", DB)),
%% Ensure clean state:
ok = emqx_ds:open_db(DB, Config),
ok = emqx_ds:drop_db(DB),
ok = file:del_dir_r(Dir),
%% Open a fresh DB:
ok = emqx_ds:open_db(DB, Config),
S = lists:foldl(
fun(Batch, Acc0) ->
Size = PSMultiplier * Batch,
io:format(user, "Storing batch with payload size ~p~n", [Size]),
Acc1 = store_batch(DB, Size, Acc0),
%% Sleep so all data is hopefully flushed:
timer:sleep(Sleep),
(Batch div AddGeneration) =:= 0 andalso
emqx_ds:add_generation(DB),
collect_datapoint(DB, Acc1)
end,
collect_datapoint(DB, #s{}),
lists:seq(1, NBatches)
),
{ok, FD} = file:open(Filename, [write]),
io:put_chars(FD, print(S)),
file:close(FD)
end,
os:cmd("echo '" ++ gnuplot_script(Filename) ++ "' | gnuplot --persist -"),
ok.
collect_datapoint(
DB, S0 = #s{n_messages = N, data_size = DS, payload_size = PS, datapoints = DP0, x_axis = X}
) ->
NewData = [{"$_n", N}, {"$data", DS}, {"$payloads", PS} | dirsize(DB)],
DP = lists:foldl(
fun({Key, Val}, Acc) ->
maps:update_with(
Key,
fun(M) -> M#{N => Val} end,
#{},
Acc
)
end,
DP0,
NewData
),
S0#s{
datapoints = DP,
x_axis = [N | X]
}.
print(#s{x_axis = XX, datapoints = DP}) ->
Cols = lists:sort(maps:keys(DP)),
Lines = [
%% Print header:
Cols
%% Scan through rows:
| [
%% Scan throgh columns:
[integer_to_binary(maps:get(X, maps:get(Col, DP), 0)) || Col <- Cols]
|| X <- lists:reverse(XX)
]
],
lists:join(
"\n",
[lists:join(" ", Line) || Line <- Lines]
).
dirsize(DB) ->
RawOutput = os:cmd("cd " ++ dir(DB) ++ "; du -b --max-depth 1 ."),
[
begin
[Sz, Dir] = string:lexemes(L, "\t"),
{Dir, list_to_integer(Sz)}
end
|| L <- string:lexemes(RawOutput, "\n")
].
dir(DB) ->
filename:join(emqx_ds_storage_layer:base_dir(), DB).
store_batch(DB, PayloadSize, S0 = #s{n_messages = N, data_size = DS, payload_size = PS}) ->
From = rand:bytes(16),
BatchSize = 50,
Batch = [
#message{
id = emqx_guid:gen(),
timestamp = emqx_message:timestamp_now(),
payload = rand:bytes(PayloadSize),
from = From,
topic = emqx_topic:join([
<<"blah">>,
<<"blah">>,
'',
<<"blah">>,
From,
<<"bazzzzzzzzzzzzzzzzzzzzzzz">>,
integer_to_binary(I)
])
}
|| I <- lists:seq(1, BatchSize)
],
ok = emqx_ds:store_batch(DB, Batch, #{sync => true}),
S0#s{
n_messages = N + length(Batch),
data_size = DS + lists:sum(lists:map(fun msg_size/1, Batch)),
payload_size = PS + length(Batch) * PayloadSize
}.
%% We consider MQTT wire encoding to be "close to the ideal".
msg_size(Msg = #message{}) ->
iolist_size(emqx_frame:serialize(emqx_message:to_packet(undefined, Msg))).

View File

@ -0,0 +1,4 @@
*.hrl
*.erl
*.beam
*.asn1db

View File

@ -35,15 +35,24 @@
-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time).
-define(DS_BUILTIN_NEXT_TIME, emqx_ds_builtin_next_time).
%%% LTS Storage counters:
%%% Bitfield LTS Storage counters:
%% This counter is incremented when the iterator seeks to the next interval:
-define(DS_LTS_SEEK_COUNTER, emqx_ds_storage_bitfield_lts_counter_seek).
-define(DS_BITFIELD_LTS_SEEK_COUNTER, emqx_ds_storage_bitfield_lts_counter_seek).
%% This counter is incremented when the iterator proceeds to the next
%% key within the interval (this is is best case scenario):
-define(DS_LTS_NEXT_COUNTER, emqx_ds_storage_bitfield_lts_counter_next).
-define(DS_BITFIELD_LTS_NEXT_COUNTER, emqx_ds_storage_bitfield_lts_counter_next).
%% This counter is incremented when the key passes bitmask check, but
%% the value is rejected by the subsequent post-processing:
-define(DS_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision).
-define(DS_BITFIELD_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision).
%%% Skipstream LTS Storage counters:
-define(DS_SKIPSTREAM_LTS_SEEK, emqx_ds_storage_skipstream_lts_seek).
-define(DS_SKIPSTREAM_LTS_NEXT, emqx_ds_storage_skipstream_lts_next).
-define(DS_SKIPSTREAM_LTS_HASH_COLLISION, emqx_ds_storage_skipstream_lts_hash_collision).
-define(DS_SKIPSTREAM_LTS_HIT, emqx_ds_storage_skipstream_lts_hit).
-define(DS_SKIPSTREAM_LTS_MISS, emqx_ds_storage_skipstream_lts_miss).
-define(DS_SKIPSTREAM_LTS_FUTURE, emqx_ds_storage_skipstream_lts_future).
-define(DS_SKIPSTREAM_LTS_EOS, emqx_ds_storage_skipstream_lts_end_of_stream).
-endif.

View File

@ -1,3 +1,8 @@
%% -*- mode:erlang -*-
{deps, [{emqx_utils, {path, "../emqx_utils"}}]}.
{erl_opts, [{src_dirs, ["src", "gen_src"]}]}.
{pre_hooks, [
{"(linux|darwin|solaris)", compile, "erlc -bper +noobj -o gen_src asn.1/DurableMessage.asn"}
]}.

View File

@ -36,7 +36,9 @@
inc_lts_seek_counter/2,
inc_lts_next_counter/2,
inc_lts_collision_counter/2
inc_lts_collision_counter/2,
collect_shard_counter/3
]).
%% behavior callbacks:
@ -57,9 +59,16 @@
-define(STORAGE_LAYER_METRICS, [
{slide, ?DS_STORE_BATCH_TIME},
{counter, ?DS_LTS_SEEK_COUNTER},
{counter, ?DS_LTS_NEXT_COUNTER},
{counter, ?DS_LTS_COLLISION_COUNTER}
{counter, ?DS_BITFIELD_LTS_SEEK_COUNTER},
{counter, ?DS_BITFIELD_LTS_NEXT_COUNTER},
{counter, ?DS_BITFIELD_LTS_COLLISION_COUNTER},
{counter, ?DS_SKIPSTREAM_LTS_SEEK},
{counter, ?DS_SKIPSTREAM_LTS_NEXT},
{counter, ?DS_SKIPSTREAM_LTS_HASH_COLLISION},
{counter, ?DS_SKIPSTREAM_LTS_HIT},
{counter, ?DS_SKIPSTREAM_LTS_MISS},
{counter, ?DS_SKIPSTREAM_LTS_FUTURE},
{counter, ?DS_SKIPSTREAM_LTS_EOS}
]).
-define(FETCH_METRICS, [
@ -150,15 +159,19 @@ observe_next_time(DB, NextTime) ->
-spec inc_lts_seek_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
inc_lts_seek_counter({DB, _}, Inc) ->
catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_SEEK_COUNTER, Inc).
catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_BITFIELD_LTS_SEEK_COUNTER, Inc).
-spec inc_lts_next_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
inc_lts_next_counter({DB, _}, Inc) ->
catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_NEXT_COUNTER, Inc).
catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_BITFIELD_LTS_NEXT_COUNTER, Inc).
-spec inc_lts_collision_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
inc_lts_collision_counter({DB, _}, Inc) ->
catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_COLLISION_COUNTER, Inc).
catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_BITFIELD_LTS_COLLISION_COUNTER, Inc).
-spec collect_shard_counter(emqx_ds_storage_layer:shard_id(), atom(), non_neg_integer()) -> ok.
collect_shard_counter({DB, _}, Key, Inc) ->
catch emqx_metrics_worker:inc(?WORKER, DB, Key, Inc).
prometheus_meta() ->
lists:map(

View File

@ -26,7 +26,13 @@
trie_copy_learned_paths/2,
topic_key/3,
match_topics/2,
lookup_topic_key/2
lookup_topic_key/2,
reverse_lookup/2,
info/2,
info/1,
compress_topic/3,
decompress_topic/2
]).
%% Debug:
@ -34,18 +40,21 @@
-export_type([
options/0,
level/0,
static_key/0,
trie/0,
msg_storage_key/0
msg_storage_key/0,
learned_structure/0
]).
-include_lib("stdlib/include/ms_transform.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-elvis([{elvis_style, variable_naming_convention, disable}]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
-endif.
%%================================================================================
%% Type declarations
@ -55,15 +64,22 @@
-define(EOT, []).
-define(PLUS, '+').
-type edge() :: binary() | ?EOT | ?PLUS.
-type level() :: binary() | ''.
%% Fixed size binary
-type static_key() :: non_neg_integer().
-type edge() :: level() | ?EOT | ?PLUS.
%% Fixed size binary or integer, depending on the options:
-type static_key() :: non_neg_integer() | binary().
%% Trie root:
-define(PREFIX, prefix).
%% Special prefix root for reverse lookups:
-define(rlookup, rlookup).
-define(rlookup(STATIC), {?rlookup, STATIC}).
-type state() :: static_key() | ?PREFIX.
-type varying() :: [binary() | ?PLUS].
-type varying() :: [level() | ?PLUS].
-type msg_storage_key() :: {static_key(), varying()}.
@ -71,27 +87,42 @@
-type persist_callback() :: fun((_Key, _Val) -> ok).
-type learned_structure() :: [level() | ?PLUS, ...].
-type options() ::
#{
persist_callback => persist_callback(),
static_key_size => pos_integer()
%% If set, static key is an integer that fits in a given nubmer of bits:
static_key_bits => pos_integer(),
%% If set, static key is a _binary_ of a given length:
static_key_bytes => pos_integer(),
reverse_lookups => boolean()
}.
-type dump() :: [{_Key, _Val}].
-record(trie, {
persist :: persist_callback(),
is_binary_key :: boolean(),
static_key_size :: pos_integer(),
trie :: ets:tid(),
stats :: ets:tid()
stats :: ets:tid(),
rlookups = false :: boolean()
}).
-opaque trie() :: #trie{}.
-record(trans, {
key :: {state(), edge()},
next :: state()
}).
-record(trans, {key, next}).
-type trans() ::
#trans{
key :: {state(), edge()},
next :: state()
}
| #trans{
key :: {?rlookup, static_key()},
next :: [level() | ?PLUS]
}.
%%================================================================================
%% API functions
@ -100,21 +131,31 @@
%% @doc Create an empty trie
-spec trie_create(options()) -> trie().
trie_create(UserOpts) ->
Defaults = #{
persist_callback => fun(_, _) -> ok end,
static_key_size => 8
},
#{
persist_callback := Persist,
static_key_size := StaticKeySize
} = maps:merge(Defaults, UserOpts),
Persist = maps:get(
persist_callback,
UserOpts,
fun(_, _) -> ok end
),
Rlookups = maps:get(reverse_lookups, UserOpts, false),
IsBinaryKey =
case UserOpts of
#{static_key_bits := StaticKeySize} ->
false;
#{static_key_bytes := StaticKeySize} ->
true;
_ ->
StaticKeySize = 16,
true
end,
Trie = ets:new(trie, [{keypos, #trans.key}, set, public]),
Stats = ets:new(stats, [{keypos, 1}, set, public]),
#trie{
persist = Persist,
is_binary_key = IsBinaryKey,
static_key_size = StaticKeySize,
trie = Trie,
stats = Stats
stats = Stats,
rlookups = Rlookups
}.
-spec trie_create() -> trie().
@ -149,9 +190,21 @@ trie_dump(Trie, Filter) ->
all ->
Fun = fun(_) -> true end;
wildcard ->
Fun = fun contains_wildcard/1
Fun = fun(L) -> lists:member(?PLUS, L) end
end,
lists:append([P || P <- paths(Trie), Fun(P)]).
Paths = lists:filter(
fun(Path) ->
Fun(tokens_of_path(Path))
end,
paths(Trie)
),
RlookupIdx = lists:filter(
fun({_, Tokens}) ->
Fun(Tokens)
end,
all_emanating(Trie, ?rlookup)
),
lists:flatten([Paths, RlookupIdx]).
-spec trie_copy_learned_paths(trie(), trie()) -> trie().
trie_copy_learned_paths(OldTrie, NewTrie) ->
@ -164,17 +217,17 @@ trie_copy_learned_paths(OldTrie, NewTrie) ->
NewTrie.
%% @doc Lookup the topic key. Create a new one, if not found.
-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
-spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key().
topic_key(Trie, ThresholdFun, Tokens) ->
do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, []).
do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []).
%% @doc Return an exisiting topic key if it exists.
-spec lookup_topic_key(trie(), [binary()]) -> {ok, msg_storage_key()} | undefined.
-spec lookup_topic_key(trie(), [level()]) -> {ok, msg_storage_key()} | undefined.
lookup_topic_key(Trie, Tokens) ->
do_lookup_topic_key(Trie, ?PREFIX, Tokens, []).
%% @doc Return list of keys of topics that match a given topic filter
-spec match_topics(trie(), [binary() | '+' | '#']) ->
-spec match_topics(trie(), [level() | '+' | '#']) ->
[msg_storage_key()].
match_topics(Trie, TopicFilter) ->
do_match_topics(Trie, ?PREFIX, [], TopicFilter).
@ -206,7 +259,8 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
{ok, FD} = file:open(Filename, [write]),
Print = fun
(?PREFIX) -> "prefix";
(NodeId) -> integer_to_binary(NodeId, 16)
(Bin) when is_binary(Bin) -> Bin;
(NodeId) when is_integer(NodeId) -> integer_to_binary(NodeId, 16)
end,
io:format(FD, "digraph {~n", []),
lists:foreach(
@ -225,11 +279,64 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
io:format(FD, "}~n", []),
file:close(FD).
-spec reverse_lookup(trie(), static_key()) -> {ok, learned_structure()} | undefined.
reverse_lookup(#trie{rlookups = false}, _) ->
error({badarg, reverse_lookups_disabled});
reverse_lookup(#trie{trie = Trie}, StaticKey) ->
case ets:lookup(Trie, ?rlookup(StaticKey)) of
[#trans{next = Next}] ->
{ok, Next};
[] ->
undefined
end.
%% @doc Get information about the trie.
%%
%% Note: `reverse_lookups' must be enabled to get the number of
%% topics.
-spec info(trie(), size | topics) -> _.
info(#trie{rlookups = true, stats = Stats}, topics) ->
case ets:lookup(Stats, ?rlookup) of
[{_, N}] -> N;
[] -> 0
end;
info(#trie{}, topics) ->
undefined;
info(#trie{trie = T}, size) ->
ets:info(T, size).
%% @doc Return size of the trie
-spec info(trie()) -> proplists:proplist().
info(Trie) ->
[
{size, info(Trie, size)},
{topics, info(Trie, topics)}
].
%%%%%%%% Topic compression %%%%%%%%%%
%% @doc Given topic structure for the static LTS index (as returned by
%% `reverse_lookup'), compress a topic filter to exclude static
%% levels:
-spec compress_topic(static_key(), learned_structure(), emqx_ds:topic_filter()) ->
[emqx_ds_lts:level() | '+'].
compress_topic(StaticKey, TopicStructure, TopicFilter) ->
compress_topic(StaticKey, TopicStructure, TopicFilter, []).
%% @doc Given topic structure and a compressed topic filter, return
%% the original* topic filter.
%%
%% * '#' will be replaced with '+'s
-spec decompress_topic(learned_structure(), [level() | '+']) ->
emqx_ds:topic_filter().
decompress_topic(TopicStructure, Topic) ->
decompress_topic(TopicStructure, Topic, []).
%%================================================================================
%% Internal exports
%%================================================================================
-spec trie_next(trie(), state(), binary() | ?EOT) -> {Wildcard, state()} | undefined when
-spec trie_next(trie(), state(), level() | ?EOT) -> {Wildcard, state()} | undefined when
Wildcard :: boolean().
trie_next(#trie{trie = Trie}, State, ?EOT) ->
case ets:lookup(Trie, {State, ?EOT}) of
@ -261,16 +368,19 @@ trie_insert(Trie, State, Token) ->
%% Internal functions
%%================================================================================
-spec trie_insert(trie(), state(), edge(), state()) -> {Updated, state()} when
NChildren :: non_neg_integer(),
Updated :: false | NChildren.
-spec trie_insert
(trie(), state(), edge(), state()) -> {Updated, state()} when
NChildren :: non_neg_integer(),
Updated :: false | NChildren;
(trie(), ?rlookup, static_key(), [level() | '+']) ->
{false | non_neg_integer(), state()}.
trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) ->
Key = {State, Token},
Rec = #trans{
key = Key,
next = NewState
},
case ets:insert_new(Trie, Rec) of
case ets_insert_new(Trie, Rec) of
true ->
ok = Persist(Key, NewState),
Inc =
@ -287,7 +397,7 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token,
end.
-spec get_id_for_key(trie(), state(), edge()) -> static_key().
get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 ->
get_id_for_key(#trie{is_binary_key = IsBin, static_key_size = Size}, State, Token) ->
%% Requirements for the return value:
%%
%% It should be globally unique for the `{State, Token}` pair. Other
@ -303,11 +413,21 @@ get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 ->
%% If we want to impress computer science crowd, sorry, I mean to
%% minimize storage requirements, we can even employ Huffman coding
%% based on the frequency of messages.
<<Int:(Size * 8), _/bytes>> = crypto:hash(sha256, term_to_binary([State | Token])),
Int.
Hash = crypto:hash(sha256, term_to_binary([State | Token])),
case IsBin of
false ->
%% Note: for backward compatibility with bitstream_lts
%% layout we allow the key to be an integer. But this also
%% changes the semantics of `static_key_size` from number
%% of bytes to bits:
<<Int:Size, _/bytes>> = Hash,
Int;
true ->
element(1, erlang:split_binary(Hash, Size))
end.
%% erlfmt-ignore
-spec do_match_topics(trie(), state(), [binary() | '+'], [binary() | '+' | '#']) ->
-spec do_match_topics(trie(), state(), [level() | '+'], [level() | '+' | '#']) ->
list().
do_match_topics(Trie, State, Varying, []) ->
case trie_next(Trie, State, ?EOT) of
@ -341,7 +461,7 @@ do_match_topics(Trie, State, Varying, [Level | Rest]) ->
Emanating
).
-spec do_lookup_topic_key(trie(), state(), [binary()], [binary()]) ->
-spec do_lookup_topic_key(trie(), state(), [level()], [level()]) ->
{ok, msg_storage_key()} | undefined.
do_lookup_topic_key(Trie, State, [], Varying) ->
case trie_next(Trie, State, ?EOT) of
@ -360,29 +480,42 @@ do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) ->
undefined
end.
do_topic_key(Trie, _, _, State, [], Varying) ->
do_topic_key(Trie, _, _, State, [], Tokens, Varying) ->
%% We reached the end of topic. Assert: Trie node that corresponds
%% to EOT cannot be a wildcard.
{_, false, Static} = trie_next_(Trie, State, ?EOT),
{Updated, false, Static} = trie_next_(Trie, State, ?EOT),
_ =
case Trie#trie.rlookups andalso Updated of
false ->
ok;
_ ->
trie_insert(Trie, rlookup, Static, lists:reverse(Tokens))
end,
{Static, lists:reverse(Varying)};
do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) ->
do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Tokens, Varying0) ->
% TODO: it's not necessary to call it every time.
Threshold = ThresholdFun(Depth),
{NChildren, IsWildcard, NextState} = trie_next_(Trie, State, Tok),
Varying =
case trie_next_(Trie, State, Tok) of
{NChildren, _, NextState} when is_integer(NChildren), NChildren >= Threshold ->
case IsWildcard of
_ when is_integer(NChildren), NChildren >= Threshold ->
%% Number of children for the trie node reached the
%% threshold, we need to insert wildcard here.
{_, _WildcardState} = trie_insert(Trie, State, ?PLUS),
Varying0;
{_, false, NextState} ->
false ->
Varying0;
{_, true, NextState} ->
true ->
%% This topic level is marked as wildcard in the trie,
%% we need to add it to the varying part of the key:
[Tok | Varying0]
end,
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying).
TokOrWildcard =
case IsWildcard of
true -> ?PLUS;
false -> Tok
end,
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, [TokOrWildcard | Tokens], Varying).
%% @doc Has side effects! Inserts missing elements
-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
@ -450,12 +583,51 @@ follow_path(#trie{} = T, State, Path) ->
all_emanating(T, State)
).
contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) ->
true;
contains_wildcard([_ | Rest]) ->
contains_wildcard(Rest);
contains_wildcard([]) ->
false.
tokens_of_path([{{_State, Token}, _Next} | Rest]) ->
[Token | tokens_of_path(Rest)];
tokens_of_path([]) ->
[].
%% Wrapper for type checking only:
-compile({inline, ets_insert_new/2}).
-spec ets_insert_new(ets:tid(), trans()) -> boolean().
ets_insert_new(Tid, Trans) ->
ets:insert_new(Tid, Trans).
compress_topic(_StaticKey, [], [], Acc) ->
lists:reverse(Acc);
compress_topic(StaticKey, TStructL0, ['#'], Acc) ->
case TStructL0 of
[] ->
lists:reverse(Acc);
['+' | TStructL] ->
compress_topic(StaticKey, TStructL, ['#'], ['+' | Acc]);
[_ | TStructL] ->
compress_topic(StaticKey, TStructL, ['#'], Acc)
end;
compress_topic(StaticKey, ['+' | TStructL], [Level | TopicL], Acc) ->
compress_topic(StaticKey, TStructL, TopicL, [Level | Acc]);
compress_topic(StaticKey, [Struct | TStructL], [Level | TopicL], Acc) when
Level =:= '+'; Level =:= Struct
->
compress_topic(StaticKey, TStructL, TopicL, Acc);
compress_topic(StaticKey, TStructL, TopicL, _Acc) ->
%% Topic is mismatched with the structure. This should never
%% happen. LTS got corrupted?
Err = #{
msg => 'Topic structure mismatch',
static_key => StaticKey,
input => TopicL,
structure => TStructL
},
throw({unrecoverable, Err}).
decompress_topic(['+' | TStructL], [Level | TopicL], Acc) ->
decompress_topic(TStructL, TopicL, [Level | Acc]);
decompress_topic([StaticLevel | TStructL], TopicL, Acc) ->
decompress_topic(TStructL, TopicL, [StaticLevel | Acc]);
decompress_topic([], [], Acc) ->
lists:reverse(Acc).
%%================================================================================
%% Tests
@ -658,6 +830,76 @@ topic_match_test() ->
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
end.
%% erlfmt-ignore
rlookup_test() ->
T = trie_create(#{reverse_lookups => true}),
Threshold = 2,
ThresholdFun = fun(0) -> 1000;
(_) -> Threshold
end,
{S1, []} = test_key(T, ThresholdFun, [1]),
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
{S111, []} = test_key(T, ThresholdFun, [1, 1, 1]),
{S11e, []} = test_key(T, ThresholdFun, [1, 1, '']),
%% Now add learned wildcards:
{S21, []} = test_key(T, ThresholdFun, [2, 1]),
{S22, []} = test_key(T, ThresholdFun, [2, 2]),
{S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]),
{S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]),
{S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]),
{S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]),
%% Check reverse matching:
?assertEqual({ok, [<<"1">>]}, reverse_lookup(T, S1)),
?assertEqual({ok, [<<"1">>, <<"1">>]}, reverse_lookup(T, S11)),
?assertEqual({ok, [<<"1">>, <<"2">>]}, reverse_lookup(T, S12)),
?assertEqual({ok, [<<"1">>, <<"1">>, <<"1">>]}, reverse_lookup(T, S111)),
?assertEqual({ok, [<<"1">>, <<"1">>, '']}, reverse_lookup(T, S11e)),
?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T, S21)),
?assertEqual({ok, [<<"2">>, <<"2">>]}, reverse_lookup(T, S22)),
?assertEqual({ok, [<<"2">>, '+']}, reverse_lookup(T, S2_)),
?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"1">>]}, reverse_lookup(T, S2_11)),
?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"2">>]}, reverse_lookup(T, S2_12)),
?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T, S2_1_)),
%% Dump and restore trie to make sure rlookup still works:
T1 = trie_restore(#{reverse_lookups => true}, trie_dump(T, all)),
destroy(T),
?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T1, S21)),
?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T1, S2_1_)).
n_topics_test() ->
Threshold = 3,
ThresholdFun = fun
(0) -> 1000;
(_) -> Threshold
end,
T = trie_create(#{reverse_lookups => true}),
?assertEqual(0, info(T, topics)),
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
?assertEqual(1, info(T, topics)),
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
?assertEqual(2, info(T, topics)),
{_S13, []} = test_key(T, ThresholdFun, [1, 3]),
?assertEqual(3, info(T, topics)),
{S1_, [_]} = test_key(T, ThresholdFun, [1, 4]),
?assertEqual(4, info(T, topics)),
{S1_, [_]} = test_key(T, ThresholdFun, [1, 5]),
{S1_, [_]} = test_key(T, ThresholdFun, [1, 6]),
{S1_, [_]} = test_key(T, ThresholdFun, [1, 7]),
?assertEqual(4, info(T, topics)),
?assertMatch(
[{size, N}, {topics, 4}] when is_integer(N),
info(T)
).
-define(keys_history, topic_key_history).
%% erlfmt-ignore
@ -773,11 +1015,16 @@ paths_test() ->
),
%% Test filter function for paths containing wildcards
WildcardPaths = lists:filter(fun contains_wildcard/1, Paths),
WildcardPaths = lists:filter(
fun(Path) ->
lists:member(?PLUS, tokens_of_path(Path))
end,
Paths
),
FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
?assertEqual(
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
#{
expected => ExpectedWildcardPaths,
wildcards => FormattedWildcardPaths
@ -795,13 +1042,97 @@ paths_test() ->
#trie{trie = Tab2} = T2,
Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
?assertEqual(Dump1, Dump2),
ok.
?assertEqual(Dump1, Dump2).
format_path([{{_State, Edge}, _Next} | Rest]) ->
[Edge | format_path(Rest)];
format_path([]) ->
[].
compress_topic_test() ->
%% Structure without wildcards:
?assertEqual([], compress_topic(42, [], [])),
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>])),
?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, ''])),
?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, '+'])),
?assertEqual([], compress_topic(42, [<<"foo">>, ''], ['+', '+'])),
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['#'])),
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '#'])),
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['+', '#'])),
?assertEqual(
[], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '', '#'])
),
%% With wildcards:
?assertEqual(
[<<"1">>], compress_topic(42, [<<"foo">>, '+', <<"bar">>], [<<"foo">>, <<"1">>, <<"bar">>])
),
?assertEqual(
[<<"1">>, <<"2">>],
compress_topic(
42,
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
[<<"foo">>, <<"1">>, <<"bar">>, <<"2">>, <<"baz">>]
)
),
?assertEqual(
['+', <<"2">>],
compress_topic(
42,
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
[<<"foo">>, '+', <<"bar">>, <<"2">>, <<"baz">>]
)
),
?assertEqual(
['+', '+'],
compress_topic(
42,
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>]
)
),
?assertEqual(
['+', '+'],
compress_topic(
42,
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
['#']
)
),
?assertEqual(
['+', '+'],
compress_topic(
42,
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
[<<"foo">>, '+', '+', '#']
)
),
%% Mismatch:
?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [<<"bar">>])),
?assertException(_, {unrecoverable, _}, compress_topic(42, [], [<<"bar">>])),
?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [])),
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['', '', ''])),
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], [<<"foo">>, '#'])),
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['+', '+', '+', '#'])),
?assertException(_, {unrecoverable, _}, compress_topic(42, ['+'], [<<"bar">>, '+'])),
?assertException(
_, {unrecoverable, _}, compress_topic(42, [<<"foo">>, '+'], [<<"bar">>, <<"baz">>])
).
decompress_topic_test() ->
%% Structure without wildcards:
?assertEqual([], decompress_topic([], [])),
?assertEqual(
[<<"foo">>, '', <<"bar">>],
decompress_topic([<<"foo">>, '', <<"bar">>], [])
),
%% With wildcards:
?assertEqual(
[<<"foo">>, '', <<"bar">>, <<"baz">>],
decompress_topic([<<"foo">>, '+', <<"bar">>, '+'], ['', <<"baz">>])
),
?assertEqual(
[<<"foo">>, '+', <<"bar">>, '+', ''],
decompress_topic([<<"foo">>, '+', <<"bar">>, '+', ''], ['+', '+'])
).
-endif.

View File

@ -0,0 +1,515 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This utility module provides a generic method for encoding
%% (and decoding) MQTT messages at rest.
%%
%% Note to developer: backward compatibility has to be maintained at
%% all times, for all releases.
-module(emqx_ds_msg_serializer).
%% API:
-export([serialize/2, deserialize/2, check_schema/1]).
%% internal exports:
-export([]).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("typerefl/include/types.hrl").
-include("../gen_src/DurableMessage.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-elvis([{elvis_style, atom_naming_convention, disable}]).
-dialyzer({nowarn_function, [serialize_asn1/1, deserialize_asn1/1]}).
%%================================================================================
%% Type declarations
%%================================================================================
%% FIXME: Properl reflection fails dialyzer check due wrong spec in
%% typerefl
-type schema() :: term().
-reflect_type([schema/0]).
%%================================================================================
%% API functions
%%================================================================================
-spec check_schema(schema()) -> ok | {error, _}.
check_schema(v1) ->
ok;
check_schema(asn1) ->
ok;
check_schema(_) ->
{error, "Unknown schema type"}.
-spec serialize(schema(), emqx_types:message()) -> binary().
serialize(v1, Msg) ->
serialize_v1(Msg);
serialize(asn1, Msg) ->
serialize_asn1(Msg).
-spec deserialize(schema(), binary()) -> emqx_types:message().
deserialize(v1, Blob) ->
deserialize_v1(Blob);
deserialize(asn1, Blob) ->
deserialize_asn1(Blob).
%%================================================================================
%% Internal functions
%%================================================================================
%%--------------------------------------------------------------------------------
%% V1 (erlang:term_to_binary/binary_to_term). Simple not the most
%% space- and CPU-efficient encoding
%% --------------------------------------------------------------------------------
serialize_v1(Msg) ->
term_to_binary(message_to_value_v1(Msg)).
message_to_value_v1(#message{
id = Id,
qos = Qos,
from = From,
flags = Flags,
headers = Headers,
topic = Topic,
payload = Payload,
timestamp = Timestamp,
extra = Extra
}) ->
{Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}.
deserialize_v1(Blob) ->
value_v1_to_message(binary_to_term(Blob)).
value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}) ->
#message{
id = Id,
qos = Qos,
from = From,
flags = Flags,
headers = Headers,
topic = Topic,
payload = Payload,
timestamp = Timestamp,
extra = Extra
}.
%%--------------------------------------------------------------------------------
%% Encoding based on ASN1.
%%--------------------------------------------------------------------------------
serialize_asn1(#message{
id = Id,
qos = Qos,
from = From0,
flags = Flags,
topic = Topic,
payload = Payload,
timestamp = Timestamp,
headers = Headers
}) ->
MiscFlags = maps:fold(
fun
(Key, Val, Acc) when Key =/= sys, Key =/= dup, Key =/= retain ->
[asn1_encode_misc(flag, Key, Val) | Acc];
(_, _, Acc) ->
Acc
end,
[],
Flags
),
{StdHeaders, StdProps, MiscHeaders} = asn1_encode_headers(Headers),
{ok, Bin} = 'DurableMessage':encode('DurableMessage', #'DurableMessage'{
id = Id,
from =
case is_atom(From0) of
true -> {atom, erlang:atom_to_binary(From0, utf8)};
false -> {binary, From0}
end,
topic = Topic,
payload = iolist_to_binary(Payload),
timestamp = Timestamp,
qos = Qos,
sys = maps:get(sys, Flags, false),
dup = maps:get(dup, Flags, false),
retain = maps:get(retain, Flags, false),
properties = StdProps,
headers = StdHeaders,
%% TODO: store client attrs?
misc = MiscFlags ++ MiscHeaders
}),
Bin.
deserialize_asn1(Blob) ->
{ok, #'DurableMessage'{
id = Id,
from = From0,
topic = Topic,
payload = Payload,
timestamp = Timestamp,
qos = Qos,
sys = Sys,
dup = Dup,
retain = Retain,
headers = StdHeaders,
properties = StdProperties,
misc = Misc
}} = 'DurableMessage':decode('DurableMessage', Blob),
From =
case From0 of
{atom, Bin} -> erlang:binary_to_atom(Bin, utf8);
{binary, Bin} -> Bin
end,
%% Decode flags:
Flags = #{sys => Sys, dup => Dup, retain => Retain},
asn1_deserialize_misc(Misc, #message{
id = Id,
qos = Qos,
from = From,
topic = Topic,
payload = Payload,
timestamp = Timestamp,
flags = Flags,
headers = asn1_decode_headers(StdHeaders, StdProperties)
}).
asn1_encode_headers(Headers) ->
PeerName =
case Headers of
#{peername := {IP1, Port}} -> encode_ip_port(16, IP1, Port);
_ -> asn1_NOVALUE
end,
PeerHost =
case Headers of
#{peerhost := IP2} -> encode_ip_port(0, IP2, 0);
_ -> asn1_NOVALUE
end,
ProtoVer = asn1_encode_proto_ver(Headers),
StdHeaders = #'StdHeaders'{
protoVer = ProtoVer,
peername = PeerName,
peerhost = PeerHost,
username =
case Headers of
#{username := U} when is_binary(U) -> U;
_ -> asn1_NOVALUE
end
},
{StdProps, MiscProps} = asn1_encode_properties(maps:get(properties, Headers, #{})),
MiscHeaders = maps:fold(
fun
(Header, _V, Acc) when
Header =:= properties; Header =:= username; Header =:= client_attrs
->
Acc;
(protocol, _V, Acc) when ProtoVer =/= asn1_NOVALUE ->
Acc;
(proto_ver, _V, Acc) when ProtoVer =/= asn1_NOVALUE ->
Acc;
(peername, _V, Acc) when PeerName =/= asn1_NOVALUE ->
Acc;
(peerhost, _V, Acc) when PeerHost =/= asn1_NOVALUE ->
Acc;
%% Add headers that could not be encoded using fixed schema:
(Key, Val, Acc) ->
[asn1_encode_misc(header, Key, Val) | Acc]
end,
[],
Headers
),
{StdHeaders, StdProps, MiscHeaders ++ MiscProps}.
asn1_encode_properties(Props) ->
UserProps = maps:get('User-Property', Props, []),
StdProperties = #'StdProperties'{
payloadFormatIndicator = asn1_std_prop('Payload-Format-Indicator', Props),
messageExpiryInterval = asn1_std_prop('Message-Expiry-Interval', Props),
responseTopic = asn1_std_prop('Response-Topic', Props),
correlationData = asn1_std_prop('Correlation-Data', Props),
contentType = asn1_std_prop('Content-Type', Props),
userProperty = [#'UserProperty'{key = K, value = V} || {K, V} <- UserProps]
},
MiscProperties = maps:fold(
fun
(K, V, Acc) when
K =/= 'Payload-Format-Indicator',
K =/= 'Message-Expiry-Interval',
K =/= 'Response-Topic',
K =/= 'Correlation-Data',
K =/= 'Content-Type',
K =/= 'User-Property'
->
[asn1_encode_misc(property, K, V) | Acc];
(_, _, Acc) ->
Acc
end,
[],
Props
),
{StdProperties, MiscProperties}.
asn1_encode_misc(header, Key, Val) ->
{header, #'MiscProperty'{
key = term_to_binary(Key), value = term_to_binary(Val)
}};
asn1_encode_misc(property, Key, Val) ->
{property, #'MiscProperty'{
key = term_to_binary(Key), value = term_to_binary(Val)
}};
asn1_encode_misc(flag, Key, Val) ->
{flag, #'MiscFlag'{
key = atom_to_binary(Key, utf8), value = Val
}}.
asn1_std_prop(Key, Map) ->
case Map of
#{Key := Val} -> Val;
_ -> asn1_NOVALUE
end.
asn1_decode_headers(
#'StdHeaders'{
protoVer = ProtoVer, peerhost = Peerhost, peername = Peername, username = Username
},
StdProperties
) ->
M0 = asn1_decode_properties(StdProperties),
M1 =
case ProtoVer of
asn1_NOVALUE -> M0;
{Protocol, Ver} -> M0#{protocol => Protocol, proto_ver => Ver}
end,
M2 = asn1_add_optional(peername, decode_ip_port(16, Peername), M1),
M3 =
case decode_ip_port(0, Peerhost) of
asn1_NOVALUE -> M2;
{PeerIP, _} -> M2#{peerhost => PeerIP}
end,
asn1_add_optional(username, Username, M3).
asn1_decode_properties(#'StdProperties'{
payloadFormatIndicator = PFI,
userProperty = UP,
messageExpiryInterval = MEI,
responseTopic = RT,
correlationData = CD,
contentType = CT
}) ->
M0 =
case [{K, V} || #'UserProperty'{key = K, value = V} <- UP] of
[] -> #{};
UserProps -> #{'User-Property' => UserProps}
end,
M1 = asn1_add_optional('Payload-Format-Indicator', PFI, M0),
M2 = asn1_add_optional('Message-Expiry-Interval', MEI, M1),
M3 = asn1_add_optional('Response-Topic', RT, M2),
M4 = asn1_add_optional('Correlation-Data', CD, M3),
M5 = asn1_add_optional('Content-Type', CT, M4),
case maps:size(M5) of
0 -> #{};
_ -> #{properties => M5}
end.
asn1_add_optional(_Key, asn1_NOVALUE, Acc) -> Acc;
asn1_add_optional(Key, Val, Acc) -> maps:put(Key, Val, Acc).
-define(IS_VER(V), is_integer(V), V >= 0, V =< 255).
asn1_encode_proto_ver(#{protocol := mqtt, proto_ver := V}) when ?IS_VER(V) ->
{mqtt, V};
asn1_encode_proto_ver(#{protocol := 'mqtt-sn', proto_ver := V}) when ?IS_VER(V) ->
{'mqtt-sn', V};
asn1_encode_proto_ver(#{protocol := coap, proto_ver := V}) when ?IS_VER(V) ->
{coap, V};
asn1_encode_proto_ver(_) ->
asn1_NOVALUE.
-undef(IS_VER).
asn1_deserialize_misc(asn1_NOVALUE, Message) ->
Message;
asn1_deserialize_misc(MiscData, Message0) ->
lists:foldl(
fun
({flag, #'MiscFlag'{key = Key, value = Val}}, Acc) ->
Flags = maps:put(binary_to_atom(Key, utf8), Val, Acc#message.flags),
Acc#message{flags = Flags};
({header, #'MiscProperty'{key = Key, value = Val}}, Acc) ->
Headers = maps:put(binary_to_term(Key), binary_to_term(Val), Acc#message.headers),
Acc#message{headers = Headers};
({property, #'MiscProperty'{key = Key, value = Val}}, Acc) ->
#message{headers = Headers0} = Acc,
Headers = maps:update_with(
properties,
fun(Props) ->
maps:put(binary_to_term(Key), binary_to_term(Val), Props)
end,
Headers0
),
Acc#message{headers = Headers};
({clientAttr, #'ClientAttr'{key = Key, value = Val}}, Acc) ->
#message{headers = Headers0} = Acc,
Headers = maps:update_with(
client_attrs,
fun(Props) ->
maps:put(Key, Val, Props)
end,
Headers0
),
Acc#message{headers = Headers};
({extra, #'MiscProperty'{key = Key, value = Val}}, Acc) ->
Extra = maps:put(binary_to_term(Key), binary_to_term(Val), Acc#message.extra),
Acc#message{extra = Extra}
end,
Message0,
MiscData
).
encode_ip_port(PortSize, {A0, A1, A2, A3}, Port) ->
<<A0:8, A1:8, A2:8, A3:8, Port:PortSize>>;
encode_ip_port(PortSize, {A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, AA, AB, AC, AD, AE, AF}, Port) ->
<<A0:8, A1:8, A2:8, A3:8, A4:8, A5:8, A6:8, A7:8, A8:8, A9:8, AA:8, AB:8, AC:8, AD:8, AE:8,
AF:8, Port:PortSize>>;
encode_ip_port(_, _, _) ->
asn1_NOVALUE.
decode_ip_port(PortSize, Blob) ->
case Blob of
<<A0:8, A1:8, A2:8, A3:8, Port:PortSize>> ->
{{A0, A1, A2, A3}, Port};
<<A0:8, A1:8, A2:8, A3:8, A4:8, A5:8, A6:8, A7:8, A8:8, A9:8, AA:8, AB:8, AC:8, AD:8, AE:8,
AF:8, Port:PortSize>> ->
{{A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, AA, AB, AC, AD, AE, AF}, Port};
_ ->
asn1_NOVALUE
end.
-ifdef(TEST).
test_messages() ->
[
#message{
id = <<"message_id_val">>,
qos = 2,
from = <<"from_val">>,
flags = #{sys => true, dup => true},
topic = <<"topic/value">>,
payload = [<<"foo">>, <<"bar">>],
timestamp = 42424242,
extra = #{}
},
#message{
id = <<0, 6, 28, 54, 12, 158, 221, 191, 244, 69, 0, 0, 13, 214, 0, 3>>,
qos = 0,
from = <<"MzE3MjU5NzA4NDY3MzcwNzg0NDYxNzI5NDg0NDk4NTM0NDA">>,
flags = #{dup => true, retain => true, sys => true},
headers = #{
peername => {{127, 0, 0, 1}, 34560},
protocol => mqtt,
username => <<"foobar">>,
proto_ver => 5,
peerhost => {1, 1, 1, 1},
properties =>
#{
'Content-Type' => <<"text/json">>,
'User-Property' => [{<<"foo">>, <<"bar">>}, {<<"baz">>, <<"quux">>}],
'Message-Expiry-Interval' => 10001,
'Payload-Format-Indicator' => 1
}
},
topic = <<"foo/bar">>,
payload = <<"foo">>,
timestamp = 1719868325813,
extra = #{}
},
#message{
id = <<>>,
from = undefined,
flags = #{other_flag => true},
headers = #{
properties =>
#{
'Payload-Format-Indicator' => 1,
'Message-Expiry-Interval' => 1 bsl 32 - 1,
'Response-Topic' => <<"foo/bar/baz">>,
'Correlation-Data' => <<"correlation data">>,
'Content-Type' => <<"text/json">>,
'User-Property' => [{<<"foo">>, <<"bar">>}, {<<"baz">>, <<"quux">>}],
junk => garbage,
{34, 33, 2} => more_garbage
},
junk => garbage
},
topic = <<"foo/bar">>,
payload = <<"foo">>,
timestamp = 171986,
extra = #{}
},
#message{
id = <<>>,
from = undefined,
headers = #{
protocol => "some_protocol",
proto_ver => 42,
peername => "some.fancy.peername:222",
peerhost => "some.fancy.peerhost"
},
topic = <<"foo/bar">>,
payload = <<"foo">>,
timestamp = 171986,
extra = #{}
}
].
v1_serialize_deserialize_test_() ->
[
assert_transcode(v1, Msg)
|| Msg <- test_messages()
].
asn1_serialize_deserialize_test_() ->
[
assert_transcode(asn1, Msg)
|| Msg <- test_messages()
].
assert_transcode(Schema, Msg) ->
fun() ->
Blob = serialize(Schema, Msg),
?debugFmt("encoded size (~p) = ~p~n", [Schema, size(Blob)]),
assert_eq(Msg, deserialize(Schema, Blob))
end.
assert_eq(Expect, Got) ->
?assertEqual(
emqx_ds_test_helpers:message_canonical_form(Expect),
emqx_ds_test_helpers:message_canonical_form(Got),
{Expect, Got}
).
-endif.

View File

@ -36,7 +36,7 @@
make_delete_iterator/5,
update_iterator/4,
next/6,
delete_next/6,
delete_next/7,
handle_event/4
]).
@ -156,7 +156,9 @@
-define(DIM_TOPIC, 1).
-define(DIM_TS, 2).
-define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]).
-define(DS_LTS_COUNTERS, [
?DS_BITFIELD_LTS_SEEK_COUNTER, ?DS_BITFIELD_LTS_NEXT_COUNTER, ?DS_BITFIELD_LTS_COLLISION_COUNTER
]).
%% GVar used for idle detection:
-define(IDLE_DETECT, idle_detect).
@ -196,7 +198,7 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
case SPrev of
#s{trie = TriePrev} ->
ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev),
?tp(bitfield_lts_inherited_trie, #{}),
?tp(layout_inherited_lts_trie, #{}),
ok;
undefined ->
ok
@ -495,14 +497,19 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
rocksdb:iterator_close(ITHandle)
end.
delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) ->
delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now, IsCurrent) ->
%% Compute safe cutoff time.
%% It's the point in time where the last complete epoch ends, so we need to know
%% the current time to compute it.
init_counters(),
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
SafeCutoffTime = ?EPOCH(Schema, Now) bsl TSOffset,
try
delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize)
case delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) of
{ok, _It, 0, 0} when not IsCurrent ->
{ok, end_of_stream};
Result ->
Result
end
after
report_counters(Shard)
end.
@ -596,7 +603,7 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key
fun
('+') ->
any;
(TopicLevel) when is_binary(TopicLevel) ->
(TopicLevel) when is_binary(TopicLevel); TopicLevel =:= '' ->
{'=', hash_topic_level(TopicLevel)}
end,
Varying
@ -632,7 +639,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
Key1 ->
%% assert
true = Key1 > Key0,
inc_counter(?DS_LTS_SEEK_COUNTER),
inc_counter(?DS_BITFIELD_LTS_SEEK_COUNTER),
case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
{ok, Key, Val} ->
{N, It, Acc} = traverse_interval(
@ -658,7 +665,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
Acc = [{Key, Msg} | Acc0],
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1);
false ->
inc_counter(?DS_LTS_COLLISION_COUNTER),
inc_counter(?DS_BITFIELD_LTS_COLLISION_COUNTER),
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N)
end;
overflow ->
@ -670,7 +677,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
{0, It, Acc};
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) ->
inc_counter(?DS_LTS_NEXT_COUNTER),
inc_counter(?DS_BITFIELD_LTS_NEXT_COUNTER),
case rocksdb:iterator_move(ITHandle, next) of
{ok, Key, Val} ->
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N);
@ -690,7 +697,7 @@ delete_next_loop(LoopContext0) ->
iterated_over := AccIter0,
it_handle := ITHandle
} = LoopContext0,
inc_counter(?DS_LTS_SEEK_COUNTER),
inc_counter(?DS_BITFIELD_LTS_SEEK_COUNTER),
#{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0,
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
overflow ->
@ -772,7 +779,7 @@ delete_traverse_interval1(LoopContext0) ->
iterated_over := AccIter,
storage_iter := It
} = LoopContext0,
inc_counter(?DS_LTS_NEXT_COUNTER),
inc_counter(?DS_BITFIELD_LTS_NEXT_COUNTER),
case rocksdb:iterator_move(ITHandle, next) of
{ok, Key, Val} ->
delete_traverse_interval(LoopContext0#{
@ -831,6 +838,8 @@ threshold_fun(0) ->
threshold_fun(_) ->
20.
hash_topic_level('') ->
hash_topic_level(<<>>);
hash_topic_level(TopicLevel) ->
<<Int:64, _/binary>> = erlang:md5(TopicLevel),
Int.
@ -896,7 +905,7 @@ restore_trie(TopicIndexBytes, DB, CF) ->
{ok, IT} = rocksdb:iterator(DB, CF, []),
try
Dump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
TrieOpts = #{persist_callback => PersistCallback, static_key_size => TopicIndexBytes},
TrieOpts = #{persist_callback => PersistCallback, static_key_bits => TopicIndexBytes * 8},
emqx_ds_lts:trie_restore(TrieOpts, Dump)
after
rocksdb:iterator_close(IT)
@ -933,9 +942,11 @@ init_counters() ->
ok.
report_counters(Shard) ->
emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_LTS_SEEK_COUNTER)),
emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_LTS_NEXT_COUNTER)),
emqx_ds_builtin_metrics:inc_lts_collision_counter(Shard, get(?DS_LTS_COLLISION_COUNTER)),
emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_BITFIELD_LTS_SEEK_COUNTER)),
emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_BITFIELD_LTS_NEXT_COUNTER)),
emqx_ds_builtin_metrics:inc_lts_collision_counter(
Shard, get(?DS_BITFIELD_LTS_COLLISION_COUNTER)
),
_ = [erase(I) || I <- ?DS_LTS_COUNTERS],
ok.

View File

@ -261,6 +261,11 @@
) ->
[_Stream].
-callback get_delete_streams(
shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time()
) ->
[_Stream].
-callback make_iterator(
shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
) ->
@ -282,9 +287,12 @@
DeleteIterator,
emqx_ds:delete_selector(),
pos_integer(),
emqx_ds:time()
emqx_ds:time(),
_IsCurrentGeneration :: boolean()
) ->
{ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
{ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}
| {ok, end_of_stream}
| emqx_ds:error(_).
-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
[CustomEvent].
@ -307,6 +315,8 @@
drop_shard(Shard) ->
ok = rocksdb:destroy(db_dir(Shard), []).
%% @doc This is a convenicence wrapper that combines `prepare' and
%% `commit' operations.
-spec store_batch(
shard_id(),
[{emqx_ds:time(), emqx_types:message()}],
@ -323,6 +333,15 @@ store_batch(Shard, Messages, Options) ->
Error
end.
%% @doc Transform a batch of messages into a "cooked batch" that can
%% be stored in the transaction log or transfered over the network.
%%
%% Important: the caller MUST ensure that timestamps within the shard
%% form a strictly increasing monotonic sequence through out the whole
%% lifetime of the shard.
%%
%% The underlying storage layout MAY use timestamp as a unique message
%% ID.
-spec prepare_batch(
shard_id(),
[{emqx_ds:time(), emqx_types:message()}],
@ -355,6 +374,10 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
prepare_batch(_Shard, [], _Options) ->
ignore.
%% @doc Commit cooked batch to the storage.
%%
%% The underlying storage layout must guarantee that this operation is
%% idempotent.
-spec commit_batch(
shard_id(),
cooked_batch(),
@ -511,15 +534,12 @@ delete_next(
) ->
case generation_get(Shard, GenId) of
#{module := Mod, data := GenData} ->
Current = generation_current(Shard),
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of
{ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
%% This is a past generation. Storage layer won't write
%% any more messages here. The iterator reached the end:
%% the stream has been fully replayed.
{ok, end_of_stream};
IsCurrent = GenId =:= generation_current(Shard),
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now, IsCurrent) of
{ok, GenIter, NumDeleted, _IteratedOver} ->
{ok, Iter#{?enc := GenIter}, NumDeleted};
EOS = {ok, end_of_stream} ->
EOS;
Error = {error, _} ->
Error
end;

View File

@ -39,7 +39,7 @@
make_delete_iterator/5,
update_iterator/4,
next/6,
delete_next/6
delete_next/7
]).
%% internal exports:
@ -169,7 +169,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) ->
{ok, It, lists:reverse(Messages)}
end.
delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurrent) ->
#delete_it{
topic_filter = TopicFilter,
start_time = StartTime,
@ -198,7 +198,12 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
),
rocksdb:iterator_close(ITHandle),
It = It0#delete_it{last_seen_message_key = Key},
{ok, It, NumDeleted, NumIterated}.
case IsCurrent of
false when NumDeleted =:= 0, NumIterated =:= 0 ->
{ok, end_of_stream};
_ ->
{ok, It, NumDeleted, NumIterated}
end.
%%================================================================================
%% Internal functions

View File

@ -0,0 +1,749 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_skipstream_lts).
-behaviour(emqx_ds_storage_layer).
%% API:
-export([]).
%% behavior callbacks:
-export([
create/5,
open/5,
drop/5,
prepare_batch/4,
commit_batch/4,
get_streams/4,
get_delete_streams/4,
make_iterator/5,
make_delete_iterator/5,
update_iterator/4,
next/6,
delete_next/7
]).
%% internal exports:
-export([]).
-export_type([schema/0, s/0]).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_ds_metrics.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-elvis([{elvis_style, nesting_level, disable}]).
%%================================================================================
%% Type declarations
%%================================================================================
%% keys:
-define(cooked_payloads, 6).
-define(cooked_lts_ops, 7).
-define(lts_persist_ops, emqx_ds_storage_skipstream_lts_ops).
%% Width of the wildcard layer, in bits:
-define(wcb, 16).
-type wildcard_idx() :: 0..16#ffff.
%% Width of the timestamp, in bits:
-define(tsb, 64).
-define(max_ts, 16#ffffffffffffffff).
-type ts() :: 0..?max_ts.
-type wildcard_hash() :: binary().
%% Permanent state:
-type schema() ::
#{
wildcard_hash_bytes := pos_integer(),
topic_index_bytes := pos_integer(),
keep_message_id := boolean(),
serialization_schema := emqx_ds_msg_serializer:schema(),
with_guid := boolean()
}.
%% Runtime state:
-record(s, {
db :: rocksdb:db_handle(),
data_cf :: rocksdb:cf_handle(),
trie :: emqx_ds_lts:trie(),
trie_cf :: rocksdb:cf_handle(),
serialization_schema :: emqx_ds_msg_serializer:schema(),
hash_bytes :: pos_integer(),
with_guid :: boolean()
}).
-type s() :: #s{}.
-record(stream, {
static_index :: emqx_ds_lts:static_key()
}).
-record(it, {
static_index :: emqx_ds_lts:static_key(),
%% Minimal timestamp of the next message:
ts :: ts(),
%% Compressed topic filter:
compressed_tf :: binary()
}).
%% Level iterator:
-record(l, {
n :: non_neg_integer(),
handle :: rocksdb:itr_handle(),
hash :: binary()
}).
%%================================================================================
%% API functions
%%================================================================================
%%================================================================================
%% behavior callbacks
%%================================================================================
create(_ShardId, DBHandle, GenId, Schema0, SPrev) ->
Defaults = #{
wildcard_hash_bytes => 8,
topic_index_bytes => 8,
serialization_schema => asn1,
with_guid => false
},
Schema = maps:merge(Defaults, Schema0),
ok = emqx_ds_msg_serializer:check_schema(maps:get(serialization_schema, Schema)),
DataCFName = data_cf(GenId),
TrieCFName = trie_cf(GenId),
{ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []),
{ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []),
case SPrev of
#s{trie = TriePrev} ->
ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev),
?tp(layout_inherited_lts_trie, #{}),
ok;
undefined ->
ok
end,
{Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
open(_Shard, DBHandle, GenId, CFRefs, #{
topic_index_bytes := TIBytes,
wildcard_hash_bytes := WCBytes,
serialization_schema := SSchema,
with_guid := WithGuid
}) ->
{_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
{_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
Trie = restore_trie(TIBytes, DBHandle, TrieCF),
#s{
db = DBHandle,
data_cf = DataCF,
trie_cf = TrieCF,
trie = Trie,
hash_bytes = WCBytes,
serialization_schema = SSchema,
with_guid = WithGuid
}.
drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF, trie = Trie}) ->
emqx_ds_lts:destroy(Trie),
ok = rocksdb:drop_column_family(DBHandle, DataCF),
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
ok.
prepare_batch(
_ShardId,
S = #s{trie = Trie, hash_bytes = HashBytes},
Messages,
_Options
) ->
_ = erase(?lts_persist_ops),
Payloads =
lists:flatmap(
fun({Timestamp, Msg = #message{topic = Topic}}) ->
Tokens = words(Topic),
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
%% TODO: is it possible to create index during the
%% commit phase to avoid transferring indexes through
%% the translog?
[
{mk_key(Static, 0, <<>>, Timestamp), serialize(S, Varying, Msg)}
| mk_index(HashBytes, Static, Timestamp, Varying)
]
end,
Messages
),
{ok, #{
?cooked_payloads => Payloads,
?cooked_lts_ops => pop_lts_persist_ops()
}}.
commit_batch(
_ShardId,
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads},
Options
) ->
{ok, Batch} = rocksdb:batch(),
try
%% Commit LTS trie to the storage:
lists:foreach(
fun({Key, Val}) ->
ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val))
end,
LtsOps
),
%% Apply LTS ops to the memory cache:
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
%% Commit payloads:
lists:foreach(
fun({Key, Val}) ->
ok = rocksdb:batch_put(Batch, DataCF, Key, Val)
end,
Payloads
),
Result = rocksdb:write_batch(DB, Batch, [
{disable_wal, not maps:get(durable, Options, true)}
]),
%% NOTE
%% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to
%% observe until there's `{no_slowdown, true}` in write options.
case Result of
ok ->
ok;
{error, {error, Reason}} ->
{error, unrecoverable, {rocksdb, Reason}}
end
after
rocksdb:release_batch(Batch)
end.
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
get_streams(Trie, TopicFilter).
get_delete_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
get_streams(Trie, TopicFilter).
make_iterator(_Shard, #s{trie = Trie}, #stream{static_index = StaticIdx}, TopicFilter, StartTime) ->
{ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx),
CompressedTF = emqx_ds_lts:compress_topic(StaticIdx, TopicStructure, TopicFilter),
{ok, #it{
static_index = StaticIdx,
ts = StartTime,
compressed_tf = emqx_topic:join(CompressedTF)
}}.
make_delete_iterator(Shard, Data, Stream, TopicFilter, StartTime) ->
make_iterator(Shard, Data, Stream, TopicFilter, StartTime).
update_iterator(_Shard, _Data, OldIter, DSKey) ->
case match_ds_key(OldIter#it.static_index, DSKey) of
false ->
{error, unrecoverable, "Invalid datastream key"};
TS ->
{ok, OldIter#it{ts = TS}}
end.
next(ShardId = {_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) ->
init_counters(),
Iterators = init_iterators(S, It),
%% ?tp(notice, skipstream_init_iters, #{it => It, its => Iterators}),
try
case next_loop(Shard, S, It, Iterators, BatchSize, TMax) of
{ok, _, []} when not IsCurrent ->
{ok, end_of_stream};
Result ->
Result
end
after
free_iterators(Iterators),
collect_counters(ShardId)
end.
delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) ->
case next(Shard, S, It0, BatchSize, Now, IsCurrent) of
{ok, It, KVs} ->
batch_delete(S, It, Selector, KVs);
Ret ->
Ret
end.
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================
%% Internal functions
%%================================================================================
%% Loop context:
-record(ctx, {
shard,
%% Generation runtime state
s,
%% RocksDB iterators:
iters,
%% Cached topic structure for the static index:
topic_structure,
%% Maximum time:
tmax,
%% Compressed topic filter, split into words:
filter
}).
get_streams(Trie, TopicFilter) ->
lists:map(
fun({Static, _Varying}) ->
#stream{static_index = Static}
end,
emqx_ds_lts:match_topics(Trie, TopicFilter)
).
%%%%%%%% Value (de)serialization %%%%%%%%%%
serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg0) ->
%% Replace original topic with the varying parts:
Msg = Msg0#message{
id =
case WithGuid of
true -> Msg0#message.id;
false -> <<>>
end,
topic = emqx_topic:join(Varying)
},
emqx_ds_msg_serializer:serialize(SSchema, Msg).
enrich(
#ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}},
DSKey,
Msg0
) ->
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))),
Msg0#message{
topic = Topic,
id =
case WithGuid of
true -> Msg0#message.id;
false -> fake_guid(Shard, DSKey)
end
}.
deserialize(
#s{serialization_schema = SSchema},
Blob
) ->
emqx_ds_msg_serializer:deserialize(SSchema, Blob).
fake_guid(_Shard, DSKey) ->
%% Both guid and MD5 are 16 bytes:
crypto:hash(md5, DSKey).
%%%%%%%% Deletion %%%%%%%%%%
batch_delete(#s{hash_bytes = HashBytes, db = DB, data_cf = CF}, It, Selector, KVs) ->
#it{static_index = Static, compressed_tf = CompressedTF} = It,
{Indices, _} = lists:foldl(
fun
('+', {Acc, WildcardIdx}) ->
{Acc, WildcardIdx + 1};
(LevelFilter, {Acc0, WildcardIdx}) ->
Acc = [{WildcardIdx, hash(HashBytes, LevelFilter)} | Acc0],
{Acc, WildcardIdx + 1}
end,
{[], 1},
words(CompressedTF)
),
KeyFamily = [{0, <<>>} | Indices],
{ok, Batch} = rocksdb:batch(),
try
Ndeleted = lists:foldl(
fun({MsgKey, Val}, Acc) ->
case Selector(Val) of
true ->
do_delete(CF, Batch, Static, KeyFamily, MsgKey),
Acc + 1;
false ->
Acc
end
end,
0,
KVs
),
case rocksdb:write_batch(DB, Batch, []) of
ok ->
{ok, It, Ndeleted, length(KVs)};
{error, {error, Reason}} ->
{error, unrecoverable, {rocksdb, Reason}}
end
after
rocksdb:release_batch(Batch)
end.
do_delete(CF, Batch, Static, KeyFamily, MsgKey) ->
TS = match_ds_key(Static, MsgKey),
lists:foreach(
fun({WildcardIdx, Hash}) ->
ok = rocksdb:batch_delete(Batch, CF, mk_key(Static, WildcardIdx, Hash, TS))
end,
KeyFamily
).
%%%%%%%% Iteration %%%%%%%%%%
init_iterators(S, #it{static_index = Static, compressed_tf = CompressedTF}) ->
do_init_iterators(S, Static, words(CompressedTF), 1).
do_init_iterators(S, Static, ['+' | TopicFilter], WildcardLevel) ->
%% Ignore wildcard levels in the topic filter:
do_init_iterators(S, Static, TopicFilter, WildcardLevel + 1);
do_init_iterators(S, Static, [Constraint | TopicFilter], WildcardLevel) ->
%% Create iterator for the index stream:
#s{hash_bytes = HashBytes, db = DB, data_cf = DataCF} = S,
Hash = hash(HashBytes, Constraint),
{ok, ItHandle} = rocksdb:iterator(DB, DataCF, get_key_range(Static, WildcardLevel, Hash)),
It = #l{
n = WildcardLevel,
handle = ItHandle,
hash = Hash
},
[It | do_init_iterators(S, Static, TopicFilter, WildcardLevel + 1)];
do_init_iterators(S, Static, [], _WildcardLevel) ->
%% Create an iterator for the data stream:
#s{db = DB, data_cf = DataCF} = S,
Hash = <<>>,
{ok, ItHandle} = rocksdb:iterator(DB, DataCF, get_key_range(Static, 0, Hash)),
[
#l{
n = 0,
handle = ItHandle,
hash = Hash
}
].
next_loop(
Shard,
S = #s{trie = Trie},
It = #it{static_index = StaticIdx, ts = TS, compressed_tf = CompressedTF},
Iterators,
BatchSize,
TMax
) ->
TopicStructure =
case emqx_ds_lts:reverse_lookup(Trie, StaticIdx) of
{ok, Rev} ->
Rev;
undefined ->
throw(#{
msg => "LTS trie missing key",
key => StaticIdx
})
end,
Ctx = #ctx{
shard = Shard,
s = S,
iters = Iterators,
topic_structure = TopicStructure,
filter = words(CompressedTF),
tmax = TMax
},
next_loop(Ctx, It, BatchSize, {seek, TS}, []).
next_loop(_Ctx, It, 0, Op, Acc) ->
finalize_loop(It, Op, Acc);
next_loop(Ctx, It0, BatchSize, Op, Acc) ->
%% ?tp(notice, skipstream_loop, #{
%% ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op
%% }),
#ctx{s = S, tmax = TMax, iters = Iterators} = Ctx,
#it{static_index = StaticIdx, compressed_tf = CompressedTF} = It0,
case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of
none ->
%% ?tp(notice, skipstream_loop_result, #{r => none}),
inc_counter(?DS_SKIPSTREAM_LTS_EOS),
finalize_loop(It0, Op, Acc);
{seek, TS} when TS > TMax ->
%% ?tp(notice, skipstream_loop_result, #{r => seek_future, ts => TS}),
inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
finalize_loop(It0, {seek, TS}, Acc);
{ok, TS, _Key, _Msg0} when TS > TMax ->
%% ?tp(notice, skipstream_loop_result, #{r => ok_future, ts => TS, key => _Key}),
inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
finalize_loop(It0, {seek, TS}, Acc);
{seek, TS} ->
%% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}),
It = It0#it{ts = TS},
next_loop(Ctx, It, BatchSize, {seek, TS}, Acc);
{ok, TS, DSKey, Msg0} ->
%% ?tp(notice, skipstream_loop_result, #{r => ok, ts => TS, key => Key}),
Message = enrich(Ctx, DSKey, Msg0),
It = It0#it{ts = TS},
next_loop(Ctx, It, BatchSize - 1, next, [{DSKey, Message} | Acc])
end.
finalize_loop(It0, Op, Acc) ->
case Op of
next -> NextTS = It0#it.ts + 1;
{seek, NextTS} -> ok
end,
It = It0#it{ts = NextTS},
{ok, It, lists:reverse(Acc)}.
next_step(
S, StaticIdx, CompressedTF, [#l{hash = Hash, handle = IH, n = N} | Iterators], ExpectedTS, Op
) ->
Result =
case Op of
next ->
inc_counter(?DS_SKIPSTREAM_LTS_NEXT),
rocksdb:iterator_move(IH, next);
{seek, TS} ->
inc_counter(?DS_SKIPSTREAM_LTS_SEEK),
rocksdb:iterator_move(IH, {seek, mk_key(StaticIdx, N, Hash, TS)})
end,
case Result of
{error, invalid_iterator} ->
none;
{ok, Key, Blob} ->
case match_key(StaticIdx, N, Hash, Key) of
false ->
%% This should not happen, since we set boundaries
%% to the iterators, and overflow to a different
%% key prefix should be caught by the previous
%% clause:
none;
NextTS when ExpectedTS =:= undefined; NextTS =:= ExpectedTS ->
%% We found a key that corresponds to the
%% timestamp we expect.
%% ?tp(notice, ?MODULE_STRING "_step_hit", #{
%% next_ts => NextTS, expected => ExpectedTS, n => N
%% }),
case Iterators of
[] ->
%% This is data stream as well. Check
%% message for hash collisions and return
%% value:
Msg0 = deserialize(S, Blob),
case emqx_topic:match(Msg0#message.topic, CompressedTF) of
true ->
inc_counter(?DS_SKIPSTREAM_LTS_HIT),
{ok, NextTS, Key, Msg0};
false ->
%% Hash collision. Advance to the
%% next timestamp:
inc_counter(?DS_SKIPSTREAM_LTS_HASH_COLLISION),
{seek, NextTS + 1}
end;
_ ->
%% This is index stream. Keep going:
next_step(S, StaticIdx, CompressedTF, Iterators, NextTS, {seek, NextTS})
end;
NextTS when NextTS > ExpectedTS, N > 0 ->
%% Next index level is not what we expect. Reset
%% search to the first wilcard index, but continue
%% from `NextTS'.
%%
%% Note: if `NextTS > ExpectedTS' and `N =:= 0',
%% it means the upper (replication) level is
%% broken and supplied us NextTS that advenced
%% past the point of time that can be safely read.
%% We don't handle it here.
inc_counter(?DS_SKIPSTREAM_LTS_MISS),
{seek, NextTS}
end
end.
free_iterators(Its) ->
lists:foreach(
fun(#l{handle = IH}) ->
ok = rocksdb:iterator_close(IH)
end,
Its
).
%%%%%%%% Indexes %%%%%%%%%%
mk_index(HashBytes, Static, Timestamp, Varying) ->
mk_index(HashBytes, Static, Timestamp, 1, Varying, []).
mk_index(HashBytes, Static, Timestamp, N, [TopicLevel | Varying], Acc) ->
Op = {mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), <<>>},
mk_index(HashBytes, Static, Timestamp, N + 1, Varying, [Op | Acc]);
mk_index(_HashBytes, _Static, _Timestamp, _N, [], Acc) ->
Acc.
%%%%%%%% Keys %%%%%%%%%%
get_key_range(StaticIdx, WildcardIdx, Hash) ->
[
{iterate_lower_bound, mk_key(StaticIdx, WildcardIdx, Hash, 0)},
{iterate_upper_bound, mk_key(StaticIdx, WildcardIdx, Hash, ?max_ts)}
].
-spec match_ds_key(emqx_ds_lts:static_key(), binary()) -> ts() | false.
match_ds_key(StaticIdx, Key) ->
match_key(StaticIdx, 0, <<>>, Key).
-spec match_key(emqx_ds_lts:static_key(), wildcard_idx(), wildcard_hash(), binary()) ->
ts() | false.
match_key(StaticIdx, 0, <<>>, Key) ->
TSz = size(StaticIdx),
case Key of
<<StaticIdx:TSz/binary, 0:?wcb, Timestamp:?tsb>> ->
Timestamp;
_ ->
false
end;
match_key(StaticIdx, Idx, Hash, Key) when Idx > 0 ->
Tsz = size(StaticIdx),
Hsz = size(Hash),
case Key of
<<StaticIdx:Tsz/binary, Idx:?wcb, Hash:Hsz/binary, Timestamp:?tsb>> ->
Timestamp;
_ ->
false
end.
-spec mk_key(emqx_ds_lts:static_key(), wildcard_idx(), wildcard_hash(), ts()) -> binary().
mk_key(StaticIdx, 0, <<>>, Timestamp) ->
%% Data stream is identified by wildcard level = 0
<<StaticIdx/binary, 0:?wcb, Timestamp:?tsb>>;
mk_key(StaticIdx, N, Hash, Timestamp) when N > 0 ->
%% Index stream:
<<StaticIdx/binary, N:?wcb, Hash/binary, Timestamp:?tsb>>.
hash(HashBytes, '') ->
hash(HashBytes, <<>>);
hash(HashBytes, TopicLevel) ->
{Hash, _} = split_binary(erlang:md5(TopicLevel), HashBytes),
Hash.
%%%%%%%% LTS %%%%%%%%%%
%% TODO: don't hardcode the thresholds
threshold_fun(0) ->
100;
threshold_fun(_) ->
10.
-spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
restore_trie(StaticIdxBytes, DB, CF) ->
PersistCallback = fun(Key, Val) ->
push_lts_persist_op(Key, Val),
ok
end,
{ok, IT} = rocksdb:iterator(DB, CF, []),
try
Dump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
TrieOpts = #{
persist_callback => PersistCallback,
static_key_bytes => StaticIdxBytes,
reverse_lookups => true
},
emqx_ds_lts:trie_restore(TrieOpts, Dump)
after
rocksdb:iterator_close(IT)
end.
-spec copy_previous_trie(rocksdb:db_handle(), rocksdb:cf_handle(), emqx_ds_lts:trie()) ->
ok.
copy_previous_trie(DB, TrieCF, TriePrev) ->
{ok, Batch} = rocksdb:batch(),
lists:foreach(
fun({Key, Val}) ->
ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val))
end,
emqx_ds_lts:trie_dump(TriePrev, wildcard)
),
Result = rocksdb:write_batch(DB, Batch, []),
rocksdb:release_batch(Batch),
Result.
push_lts_persist_op(Key, Val) ->
case erlang:get(?lts_persist_ops) of
undefined ->
erlang:put(?lts_persist_ops, [{Key, Val}]);
L when is_list(L) ->
erlang:put(?lts_persist_ops, [{Key, Val} | L])
end.
pop_lts_persist_ops() ->
case erlang:erase(?lts_persist_ops) of
undefined ->
[];
L when is_list(L) ->
L
end.
read_persisted_trie(IT, {ok, KeyB, ValB}) ->
[
{binary_to_term(KeyB), binary_to_term(ValB)}
| read_persisted_trie(IT, rocksdb:iterator_move(IT, next))
];
read_persisted_trie(_IT, {error, invalid_iterator}) ->
[].
%%%%%%%% Column families %%%%%%%%%%
%% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
data_cf(GenId) ->
"emqx_ds_storage_skipstream_lts_data" ++ integer_to_list(GenId).
%% @doc Generate a column family ID for the trie
-spec trie_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
trie_cf(GenId) ->
"emqx_ds_storage_skipstream_lts_trie" ++ integer_to_list(GenId).
%%%%%%%% Topic encoding %%%%%%%%%%
words(<<>>) ->
[];
words(Bin) ->
emqx_topic:words(Bin).
%%%%%%%% Counters %%%%%%%%%%
-define(COUNTERS, [
?DS_SKIPSTREAM_LTS_SEEK,
?DS_SKIPSTREAM_LTS_NEXT,
?DS_SKIPSTREAM_LTS_HASH_COLLISION,
?DS_SKIPSTREAM_LTS_HIT,
?DS_SKIPSTREAM_LTS_MISS,
?DS_SKIPSTREAM_LTS_FUTURE,
?DS_SKIPSTREAM_LTS_EOS
]).
inc_counter(Counter) ->
N = get(Counter),
put(Counter, N + 1).
init_counters() ->
_ = [put(I, 0) || I <- ?COUNTERS],
ok.
collect_counters(Shard) ->
lists:foreach(
fun(Key) ->
emqx_ds_builtin_metrics:collect_shard_counter(Shard, Key, get(Key))
end,
?COUNTERS
).

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_bitfield_lts_SUITE).
-module(emqx_ds_storage_layout_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -23,23 +23,34 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(FUTURE, (1 bsl 64 - 1)).
-define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG, #{
-define(DB_CONFIG(CONFIG), #{
backend => builtin_local,
storage => {emqx_ds_storage_bitfield_lts, #{}},
storage => ?config(layout, CONFIG),
n_shards => 1
}).
-define(COMPACT_CONFIG, #{
backend => builtin_local,
storage =>
{emqx_ds_storage_bitfield_lts, #{
bits_per_wildcard_level => 8
}},
n_shards => 1,
replication_factor => 1
}).
all() ->
[
{group, bitfield_lts},
{group, skipstream_lts}
].
init_per_group(Group, Config) ->
LayoutConf =
case Group of
skipstream_lts ->
{emqx_ds_storage_skipstream_lts, #{with_guid => true}};
bitfield_lts ->
{emqx_ds_storage_bitfield_lts, #{}}
end,
[{layout, LayoutConf} | Config].
end_per_group(_Group, Config) ->
Config.
%% Smoke test of store function
t_store(_Config) ->
@ -53,7 +64,7 @@ t_store(_Config) ->
payload = Payload,
timestamp = PublishedAt
},
?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [{PublishedAt, Msg}], #{})).
?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])).
%% Smoke test for iteration through a concrete topic
t_iterate(_Config) ->
@ -61,15 +72,17 @@ t_iterate(_Config) ->
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch),
%% Iterate through individual topics:
[
begin
[{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
[{Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
ct:pal("Streams for ~p: {~p, ~p}", [Topic, Rank, Stream]),
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0),
ct:pal("Iterator for ~p: ~p", [Topic, It]),
{ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(
?SHARD, It, 100, emqx_ds:timestamp_us()
),
@ -91,10 +104,10 @@ t_delete(_Config) ->
Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch),
%% Iterate through topics:
StartTime = 0,
@ -109,23 +122,21 @@ t_delete(_Config) ->
Messages = [Msg || {_DSKey, Msg} <- replay(?SHARD, TopicFilter, StartTime)],
MessagesByTopic = maps:groups_from_list(fun emqx_message:topic/1, Messages),
?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}),
?assertEqual(20, length(Messages)),
ok.
?assertEqual(20, length(Messages)).
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
%% Smoke test that verifies that concrete topics are mapped to
%% individual streams, unless there's too many of them.
t_get_streams(_Config) ->
t_get_streams(Config) ->
%% Prepare data (without wildcards):
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch),
GetStream = fun(Topic) ->
StartTime = 0,
emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime)
@ -136,7 +147,7 @@ t_get_streams(_Config) ->
[A] = GetStream(<<"a">>),
%% Restart shard to make sure trie is persisted and restored:
ok = emqx_ds:close_db(?FUNCTION_NAME),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)),
%% Verify that there are no "ghost streams" for topics that don't
%% have any messages:
[] = GetStream(<<"bar/foo">>),
@ -148,11 +159,11 @@ t_get_streams(_Config) ->
NewBatch = [
begin
B = integer_to_binary(I),
{100, make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)}
make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)
end
|| I <- lists:seq(1, 200)
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, NewBatch),
%% Check that "foo/bar/baz" topic now appears in two streams:
%% "foo/bar/baz" and "foo/bar/+":
NewStreams = lists:sort(GetStream("foo/bar/baz")),
@ -168,7 +179,7 @@ t_get_streams(_Config) ->
?assert(lists:member(A, AllStreams)),
ok.
t_new_generation_inherit_trie(_Config) ->
t_new_generation_inherit_trie(Config) ->
%% This test checks that we inherit the previous generation's LTS when creating a new
%% generation.
?check_trace(
@ -176,25 +187,25 @@ t_new_generation_inherit_trie(_Config) ->
%% Create a bunch of topics to be learned in the first generation
TS1 = 500,
Batch1 = [
{TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))
|| I <- lists:seq(1, 200),
Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch1),
%% Now we create a new generation with the same LTS module. It should inherit the
%% learned trie.
ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000),
%% Restart the shard, to verify that LTS is persisted.
ok = emqx_ds:close_db(?FUNCTION_NAME),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)),
%% Store a batch of messages with the same set of topics.
TS2 = 1_500,
Batch2 = [
{TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))
|| I <- lists:seq(1, 200),
Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
%% We should get only two streams for wildcard query, for "foo" and for "bar".
?assertMatch(
[_Foo, _Bar],
@ -203,29 +214,30 @@ t_new_generation_inherit_trie(_Config) ->
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(bitfield_lts_inherited_trie, Trace)),
?assertMatch([_], ?of_kind(layout_inherited_lts_trie, Trace)),
ok
end
),
ok.
t_replay(_Config) ->
t_replay(Config) ->
%% Create concrete topics:
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],
Timestamps = lists:seq(1, 10_000, 100),
Values = lists:seq(1, 1_000, 100),
Batch1 = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
|| Topic <- Topics, PublishedAt <- Timestamps
make_message(Val, Topic, bin(Val))
|| Topic <- Topics, Val <- Values
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch1),
%% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
Batch2 = [
{TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))}
|| I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
make_message(Val, make_topic([wildcard, Prefix, suffix, Suffix]), bin(Val))
|| Prefix <- lists:seq(1, 200), Val <- Values, Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
timer:sleep(5_000),
%% Check various topic filters:
Messages = [M || {_TS, M} <- Batch1 ++ Batch2],
Messages = Batch1 ++ Batch2,
%% Missing topics (no ghost messages):
?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)),
%% Regular topics:
@ -238,7 +250,7 @@ t_replay(_Config) ->
?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
%% Restart the DB to make sure trie is persisted and restored:
ok = emqx_ds:close_db(?FUNCTION_NAME),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)),
%% Learned wildcard topics:
?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
@ -314,6 +326,9 @@ t_non_atomic_store_batch(_Config) ->
).
check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
?tp(notice, ?MODULE_STRING "_check", #{
shard => Shard, tf => TopicFilter, start_time => StartTime
}),
ExpectedFiltered = lists:filter(
fun(#message{topic = Topic, timestamp = TS}) ->
emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime
@ -325,17 +340,9 @@ check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
begin
Dump = dump_messages(Shard, TopicFilter, StartTime),
verify_dump(TopicFilter, StartTime, Dump),
Missing = ExpectedFiltered -- Dump,
Extras = Dump -- ExpectedFiltered,
?assertMatch(
#{missing := [], unexpected := []},
#{
missing => Missing,
unexpected => Extras,
topic_filter => TopicFilter,
start_time => StartTime
}
)
emqx_ds_test_helpers:assert_same_set(ExpectedFiltered, Dump, #{
topic_filter => TopicFilter, start_time => StartTime
})
end,
[]
),
@ -362,6 +369,7 @@ verify_dump(TopicFilter, StartTime, Dump) ->
dump_messages(Shard, TopicFilter, StartTime) ->
Streams = emqx_ds_storage_layer:get_streams(Shard, parse_topic(TopicFilter), StartTime),
ct:pal("Streams for ~p:~n ~p", [TopicFilter, Streams]),
lists:flatmap(
fun({_Rank, Stream}) ->
dump_stream(Shard, Stream, TopicFilter, StartTime)
@ -374,6 +382,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
{ok, Iterator} = emqx_ds_storage_layer:make_iterator(
Shard, Stream, parse_topic(TopicFilter), StartTime
),
ct:pal("Iterator for ~p at stream ~p:~n ~p", [TopicFilter, Stream, Iterator]),
Loop = fun
F(It, 0) ->
error({too_many_iterations, It});
@ -502,24 +511,31 @@ bin(X) ->
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{bitfield_lts, TCs},
{skipstream_lts, TCs}
].
suite() -> [{timetrap, {seconds, 20}}].
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
WorkDir = emqx_cth_suite:work_dir(Config),
Apps = emqx_cth_suite:start(
[emqx_ds_builtin_local],
#{work_dir => emqx_cth_suite:work_dir(Config)}
#{work_dir => WorkDir}
),
[{apps, Apps} | Config].
[{apps, Apps}, {work_dir, WorkDir} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
ok = emqx_cth_suite:stop(Apps),
emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
ok.
init_per_testcase(TC, Config) ->
ok = emqx_ds:open_db(TC, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)),
Config.
end_per_testcase(TC, _Config) ->
@ -558,7 +574,7 @@ delete(Shard, Iterators, Selector) ->
fun(Iterator0, {AccIterators, NAcc}) ->
case
emqx_ds_storage_layer:delete_next(
Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us()
Shard, Iterator0, Selector, 10, ?FUTURE
)
of
{ok, end_of_stream} ->
@ -591,7 +607,7 @@ replay(_Shard, []) ->
replay(Shard, Iterators) ->
{NewIterators0, Messages0} = lists:foldl(
fun(Iterator0, {AccIterators, AccMessages}) ->
case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of
case emqx_ds_storage_layer:next(Shard, Iterator0, 10, ?FUTURE) of
{ok, end_of_stream} ->
{AccIterators, AccMessages};
{ok, _Iterator1, []} ->

View File

@ -122,6 +122,7 @@ topic_messages(TestCase, ClientId, N) ->
fun() ->
NBin = integer_to_binary(N),
Msg = #message{
id = <<N:128>>,
from = ClientId,
topic = client_topic(TestCase, ClientId),
timestamp = N * 100,
@ -148,8 +149,7 @@ do_ds_topic_generation_stream(DB, Node, Shard, It0) ->
?ON(
Node,
begin
Now = emqx_ds_replication_layer:current_timestamp(DB, Shard),
emqx_ds_storage_layer:next(Shard, It0, 1, Now)
emqx_ds_storage_layer:next(Shard, It0, 1, _Now = 1 bsl 63)
end
)
of
@ -233,15 +233,60 @@ transitions(Node, DB) ->
end
).
%% Stream comparison
%% Message comparison
%% Try to eliminate any ambiguity in the message representation.
message_canonical_form(Msg0 = #message{}) ->
message_canonical_form(emqx_message:to_map(Msg0));
message_canonical_form(#{flags := Flags0, headers := Headers0, payload := Payload0} = Msg) ->
%% Remove flags that are false:
Flags = maps:filter(
fun(_Key, Val) -> Val end,
Flags0
),
Msg#{flags := Flags, payload := iolist_to_binary(Payload0)}.
sublist(L) ->
PrintMax = 20,
case length(L) of
0 ->
[];
N when N > PrintMax ->
lists:sublist(L, 1, PrintMax) ++ ['...', N - PrintMax, 'more'];
_ ->
L
end.
message_set(L) ->
ordsets:from_list([message_canonical_form(I) || I <- L]).
message_set_subtract(A, B) ->
ordsets:subtract(message_set(A), message_set(B)).
assert_same_set(Expected, Got) ->
assert_same_set(Expected, Got, #{}).
assert_same_set(Expected, Got, Comment) ->
SE = message_set(Expected),
SG = message_set(Got),
case {ordsets:subtract(SE, SG), ordsets:subtract(SG, SE)} of
{[], []} ->
ok;
{Missing, Unexpected} ->
error(Comment#{
matching => sublist(ordsets:intersection(SE, SG)),
missing => sublist(Missing),
unexpected => sublist(Unexpected)
})
end.
message_eq(Fields, {_Key, Msg1 = #message{}}, Msg2) ->
message_eq(Fields, Msg1, Msg2);
message_eq(Fields, Msg1, {_Key, Msg2 = #message{}}) ->
message_eq(Fields, Msg1, Msg2);
message_eq(Fields, Msg1 = #message{}, Msg2 = #message{}) ->
maps:with(Fields, emqx_message:to_map(Msg1)) =:=
maps:with(Fields, emqx_message:to_map(Msg2)).
maps:with(Fields, message_canonical_form(Msg1)) =:=
maps:with(Fields, message_canonical_form(Msg2)).
%% Consuming streams and iterators
@ -304,6 +349,7 @@ ds_topic_stream(DB, ClientId, TopicBin, Node) ->
{DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)}
end
),
ct:pal("Streams for ~p, ~p @ ~p:~n ~p", [ClientId, TopicBin, Node, DSStreams]),
%% Sort streams by their rank Y, and chain them together:
emqx_utils_stream:chain([
ds_topic_generation_stream(DB, Node, ShardId, Topic, S)

View File

@ -512,9 +512,16 @@ emqx_collect(K = ?DS_BUFFER_BYTES, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_BUFFER_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])).
emqx_collect(K = ?DS_BITFIELD_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_BITFIELD_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_BITFIELD_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_SKIPSTREAM_LTS_SEEK, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_SKIPSTREAM_LTS_NEXT, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_SKIPSTREAM_LTS_HASH_COLLISION, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_SKIPSTREAM_LTS_HIT, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_SKIPSTREAM_LTS_MISS, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_SKIPSTREAM_LTS_FUTURE, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = ?DS_SKIPSTREAM_LTS_EOS, D) -> counter_metrics(?MG(K, D, [])).
%%--------------------------------------------------------------------
%% Indicators

View File

@ -0,0 +1,7 @@
Add a new version of `wildcard_optimized` storage layout for the durable storage.
Improvements:
- New layout does not have an inherent latency
- MQTT messages are serialized into a much more space-efficient format

View File

@ -47,7 +47,8 @@
emqx_exproto_v_1_connection_adapter_bhvr,
emqx_exproto_v_1_connection_unary_handler_client,
emqx_exhook_v_2_hook_provider_client,
emqx_exhook_v_2_hook_provider_bhvr
emqx_exhook_v_2_hook_provider_bhvr,
'DurableMessage'
]},
{plt_location, "."},
{plt_prefix, "emqx_dialyzer"},