feat(ds): Add `builtin_local' backend

This commit is contained in:
ieQu1 2024-06-13 15:31:40 +02:00
parent a8ea0ae4e5
commit 279619fc80
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
11 changed files with 1792 additions and 11 deletions

View File

@ -0,0 +1,32 @@
# Local Backend for EMQX Durable Storage
# Features
This backend uses local RocksDB database to store data.
# Limitation
This backend cannot be used in a clustered EMQX setup.
# Documentation links
TBD
# Usage
TBD
# Configurations
TBD
# HTTP APIs
TBD
# Other
TBD
# Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).

View File

@ -0,0 +1,5 @@
%% -*- mode:erlang -*-
{deps, [
{emqx_durable_storage, {path, "../emqx_durable_storage"}}
]}.

View File

@ -0,0 +1,11 @@
%% -*- mode: erlang -*-
{application, emqx_ds_builtin_local, [
{description, "A DS backend that stores all data locally and thus doesn't support clustering."},
% strict semver, bump manually!
{vsn, "0.1.0"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rocksdb, emqx_durable_storage, emqx_utils]},
{mod, {emqx_ds_builtin_local_app, []}},
{env, []}
]}.

View File

@ -0,0 +1,372 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_builtin_local).
-behaviour(emqx_ds).
-behaviour(emqx_ds_buffer).
%% API:
-export([]).
%% behavior callbacks:
-export([
%% `emqx_ds':
open_db/2,
close_db/1,
add_generation/1,
update_db_config/2,
list_generations_with_lifetimes/1,
drop_generation/2,
drop_db/1,
store_batch/3,
get_streams/3,
get_delete_streams/3,
make_iterator/4,
make_delete_iterator/4,
update_iterator/3,
next/3,
delete_next/4,
%% `emqx_ds_buffer':
init_buffer/3,
flush_buffer/4,
shard_of_message/4
]).
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
-include_lib("emqx_utils/include/emqx_message.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
-define(tag, 1).
-define(shard, 2).
-define(enc, 3).
-define(IT, 61).
-define(DELETE_IT, 62).
-type shard() :: binary().
-opaque iterator() ::
#{
?tag := ?IT,
?shard := shard(),
?enc := term()
}.
-opaque delete_iterator() ::
#{
?tag := ?DELETE_IT,
?shard := shard(),
?enc := term()
}.
-type db_opts() ::
#{
backend := builtin_local,
storage := emqx_ds_storage_layer:prototype(),
n_shards := pos_integer()
}.
-type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
-define(stream(SHARD, INNER), [2, SHARD | INNER]).
-define(delete_stream(SHARD, INNER), [3, SHARD | INNER]).
%%================================================================================
%% API functions
%%================================================================================
%%================================================================================
%% behavior callbacks
%%================================================================================
-spec open_db(emqx_ds:db(), db_opts()) -> ok | {error, _}.
open_db(DB, CreateOpts) ->
case emqx_ds_builtin_local_sup:start_db(DB, CreateOpts) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
ok;
{error, Err} ->
{error, Err}
end.
-spec close_db(emqx_ds:db()) -> ok.
close_db(DB) ->
emqx_ds_builtin_local_sup:stop_db(DB).
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
add_generation(DB) ->
Shards = emqx_ds_builtin_local_meta:shards(DB),
Errors = lists:filtermap(
fun(Shard) ->
ShardId = {DB, Shard},
case
emqx_ds_storage_layer:add_generation(
ShardId, emqx_ds_builtin_local_meta:ensure_monotonic_timestamp(ShardId)
)
of
ok ->
false;
Error ->
{true, {Shard, Error}}
end
end,
Shards
),
case Errors of
[] -> ok;
_ -> {error, Errors}
end.
-spec update_db_config(emqx_ds:db(), db_opts()) -> ok | {error, _}.
update_db_config(DB, CreateOpts) ->
Opts = #{} = emqx_ds_builtin_local_meta:update_db_config(DB, CreateOpts),
lists:foreach(
fun(Shard) ->
ShardId = {DB, Shard},
emqx_ds_storage_layer:update_config(
ShardId, emqx_ds_builtin_local_meta:ensure_monotonic_timestamp(ShardId), Opts
)
end,
emqx_ds_builtin_local_meta:shards(DB)
).
-spec list_generations_with_lifetimes(emqx_ds:db()) ->
#{emqx_ds:generation_rank() => emqx_ds:generation_info()}.
list_generations_with_lifetimes(DB) ->
lists:foldl(
fun(Shard, Acc) ->
maps:fold(
fun(GenId, Data, Acc1) ->
Acc1#{{Shard, GenId} => Data}
end,
Acc,
emqx_ds_storage_layer:list_generations_with_lifetimes({DB, Shard})
)
end,
#{},
emqx_ds_builtin_local_meta:shards(DB)
).
-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
drop_generation(DB, {Shard, GenId}) ->
emqx_ds_storage_layer:drop_generation({DB, Shard}, GenId).
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
drop_db(DB) ->
close_db(DB),
lists:foreach(
fun(Shard) ->
emqx_ds_storage_layer:drop_shard({DB, Shard})
end,
emqx_ds_builtin_local_meta:shards(DB)
),
emqx_ds_builtin_local_meta:drop_db(DB).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) ->
try
emqx_ds_buffer:store_batch(DB, Messages, Opts)
catch
error:{Reason, _Call} when Reason == timeout; Reason == noproc ->
{error, recoverable, Reason}
end.
-record(bs, {options :: term()}).
-type buffer_state() :: #bs{}.
-spec init_buffer(emqx_ds:db(), shard(), _Options) -> {ok, buffer_state()}.
init_buffer(DB, Shard, Options) ->
ShardId = {DB, Shard},
case current_timestamp(ShardId) of
undefined ->
Latest = erlang:system_time(microsecond),
emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest);
_Latest ->
ok
end,
{ok, #bs{options = Options}}.
-spec flush_buffer(emqx_ds:db(), shard(), [emqx_types:message()], buffer_state()) ->
{buffer_state(), emqx_ds:store_batch_result()}.
flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) ->
{Latest, Batch} = assign_timestamps(current_timestamp({DB, Shard}), Messages),
Result = emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options),
emqx_ds_builtin_local_meta:set_current_timestamp({DB, Shard}, Latest),
{S0, Result}.
assign_timestamps(Latest, Messages) ->
assign_timestamps(Latest, Messages, []).
assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
case emqx_message:timestamp(MessageIn, microsecond) of
TimestampUs when TimestampUs > Latest ->
Message = assign_timestamp(TimestampUs, MessageIn),
assign_timestamps(TimestampUs, Rest, [Message | Acc]);
_Earlier ->
Message = assign_timestamp(Latest + 1, MessageIn),
assign_timestamps(Latest + 1, Rest, [Message | Acc])
end;
assign_timestamps(Latest, [], Acc) ->
{Latest, lists:reverse(Acc)}.
assign_timestamp(TimestampUs, Message) ->
{TimestampUs, Message}.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
N = emqx_ds_builtin_local_meta:n_shards(DB),
Hash =
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
end,
integer_to_binary(Hash).
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), emqx_ds:ds_specific_stream()}].
get_streams(DB, TopicFilter, StartTime) ->
Shards = emqx_ds_builtin_local_meta:shards(DB),
lists:flatmap(
fun(Shard) ->
Streams = emqx_ds_storage_layer:get_streams(
{DB, Shard}, TopicFilter, timestamp_to_timeus(StartTime)
),
lists:map(
fun({RankY, InnerStream}) ->
Rank = {Shard, RankY},
{Rank, ?stream(Shard, InnerStream)}
end,
Streams
)
end,
Shards
).
-spec make_iterator(
emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time()
) ->
emqx_ds:make_iterator_result(emqx_ds:ds_specific_iterator()).
make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
ShardId = {DB, Shard},
case
emqx_ds_storage_layer:make_iterator(
ShardId, InnerStream, TopicFilter, timestamp_to_timeus(StartTime)
)
of
{ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Error = {error, _, _} ->
Error
end.
-spec update_iterator(emqx_ds:db(), emqx_ds:ds_specific_iterator(), emqx_ds:message_key()) ->
emqx_ds:make_iterator_result(iterator()).
update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) ->
case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of
{ok, StorageIter} ->
{ok, Iter0#{?enc => StorageIter}};
Err = {error, _, _} ->
Err
end.
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
ShardId = {DB, Shard},
T0 = erlang:monotonic_time(microsecond),
Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)),
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0),
case Result of
{ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch};
Other ->
Other
end.
-spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[emqx_ds:ds_specific_delete_stream()].
get_delete_streams(DB, TopicFilter, StartTime) ->
Shards = emqx_ds_builtin_local_meta:shards(DB),
lists:flatmap(
fun(Shard) ->
Streams = emqx_ds_storage_layer:get_delete_streams(
{DB, Shard}, TopicFilter, timestamp_to_timeus(StartTime)
),
lists:map(
fun(InnerStream) ->
?delete_stream(Shard, InnerStream)
end,
Streams
)
end,
Shards
).
-spec make_delete_iterator(
emqx_ds:db(), emqx_ds:ds_specific_delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()
) ->
emqx_ds:make_delete_iterator_result(delete_iterator()).
make_delete_iterator(DB, ?delete_stream(Shard, InnerStream), TopicFilter, StartTime) ->
ShardId = {DB, Shard},
case
emqx_ds_storage_layer:make_delete_iterator(
ShardId, InnerStream, TopicFilter, timestamp_to_timeus(StartTime)
)
of
{ok, Iter} ->
{ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
Error = {error, _, _} ->
Error
end.
-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N) ->
ShardId = {DB, Shard},
case
emqx_ds_storage_layer:delete_next(
ShardId, StorageIter0, Selector, N, current_timestamp(ShardId)
)
of
{ok, StorageIter, Ndeleted} ->
{ok, Iter#{?enc => StorageIter}, Ndeleted};
{ok, end_of_stream} ->
{ok, end_of_stream};
Error ->
Error
end.
%%================================================================================
%% Internal exports
%%================================================================================
current_timestamp(ShardId) ->
emqx_ds_builtin_local_meta:current_timestamp(ShardId).
%%================================================================================
%% Internal functions
%%================================================================================
timestamp_to_timeus(TimestampMs) ->
TimestampMs * 1000.

View File

@ -0,0 +1,39 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_builtin_local_app).
%% API:
-export([]).
%% behavior callbacks:
-export([start/2]).
%%================================================================================
%% behavior callbacks
%%================================================================================
start(_StartType, _StartArgs) ->
emqx_ds:register_backend(builtin_local, emqx_ds_builtin_local),
%% TODO: fixme
{ok, self()}.
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================
%% Internal functions
%%================================================================================

View File

@ -0,0 +1,219 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Supervisor that contains all the processes that belong to a
%% given builtin DS database.
-module(emqx_ds_builtin_local_db_sup).
-behaviour(supervisor).
%% API:
-export([
start_db/2,
start_shard/1,
stop_shard/1,
terminate_storage/1,
restart_storage/1,
ensure_shard/1
]).
-export([which_dbs/0, which_shards/1]).
%% Debug:
-export([
get_shard_workers/1
]).
%% behaviour callbacks:
-export([init/1]).
%% internal exports:
-export([start_link_sup/2]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(via(REC), {via, gproc, {n, l, REC}}).
-define(db_sup, ?MODULE).
-define(shards_sup, emqx_ds_builtin_local_db_shards_sup).
-define(shard_sup, emqx_ds_builtin_local_db_shard_sup).
-record(?db_sup, {db}).
-record(?shards_sup, {db}).
-record(?shard_sup, {db, shard}).
%%================================================================================
%% API functions
%%================================================================================
-spec start_db(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) -> {ok, pid()}.
start_db(DB, Opts) ->
start_link_sup(#?db_sup{db = DB}, Opts).
-spec start_shard(emqx_ds_storage_layer:shard_id()) ->
supervisor:startchild_ret().
start_shard({DB, Shard}) ->
supervisor:start_child(?via(#?shards_sup{db = DB}), shard_spec(DB, Shard)).
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, not_found}.
stop_shard({DB, Shard}) ->
Sup = ?via(#?shards_sup{db = DB}),
case supervisor:terminate_child(Sup, Shard) of
ok ->
supervisor:delete_child(Sup, Shard);
{error, Reason} ->
{error, Reason}
end.
-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
terminate_storage({DB, Shard}) ->
Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
supervisor:terminate_child(Sup, {Shard, storage}).
-spec restart_storage(emqx_ds_storage_layer:shard_id()) -> {ok, _Child} | {error, _Reason}.
restart_storage({DB, Shard}) ->
Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
supervisor:restart_child(Sup, {Shard, storage}).
-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
ok | {error, _Reason}.
ensure_shard(Shard) ->
ensure_started(start_shard(Shard)).
-spec which_shards(emqx_ds:db()) ->
[_Child].
which_shards(DB) ->
supervisor:which_children(?via(#?shards_sup{db = DB})).
%% @doc Return the list of builtin DS databases that are currently
%% active on the node.
-spec which_dbs() -> [emqx_ds:db()].
which_dbs() ->
Key = {n, l, #?db_sup{_ = '_', db = '$1'}},
gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]).
%% @doc Get pids of all local shard servers for the given DB.
-spec get_shard_workers(emqx_ds:db()) -> #{_Shard => pid()}.
get_shard_workers(DB) ->
Shards = supervisor:which_children(?via(#?shards_sup{db = DB})),
L = lists:flatmap(
fun
({_Shard, Sup, _, _}) when is_pid(Sup) ->
[{Id, Pid} || {Id, Pid, _, _} <- supervisor:which_children(Sup), is_pid(Pid)];
(_) ->
[]
end,
Shards
),
maps:from_list(L).
%%================================================================================
%% behaviour callbacks
%%================================================================================
init({#?db_sup{db = DB}, DefaultOpts}) ->
%% Spec for the top-level supervisor for the database:
logger:notice("Starting DS DB ~p", [DB]),
emqx_ds_builtin_metrics:init_for_db(DB),
Opts = emqx_ds_builtin_local_meta:open_db(DB, DefaultOpts),
Children = [
sup_spec(#?shards_sup{db = DB}, Opts)
],
SupFlags = #{
strategy => one_for_all,
intensity => 0,
period => 1
},
{ok, {SupFlags, Children}};
init({#?shards_sup{db = DB}, _Opts}) ->
%% Spec for the supervisor that manages the supervisors for
%% each local shard of the DB:
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 1
},
Children = [shard_spec(DB, Shard) || Shard <- emqx_ds_builtin_local_meta:shards(DB)],
{ok, {SupFlags, Children}};
init({#?shard_sup{db = DB, shard = Shard}, _}) ->
SupFlags = #{
strategy => rest_for_one,
intensity => 10,
period => 100
},
Opts = emqx_ds_builtin_local_meta:db_config(DB),
Children = [
shard_storage_spec(DB, Shard, Opts),
shard_buffer_spec(DB, Shard, Opts)
],
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal exports
%%================================================================================
start_link_sup(Id, Options) ->
supervisor:start_link(?via(Id), ?MODULE, {Id, Options}).
%%================================================================================
%% Internal functions
%%================================================================================
sup_spec(Id, Options) ->
#{
id => element(1, Id),
start => {?MODULE, start_link_sup, [Id, Options]},
type => supervisor,
shutdown => infinity
}.
shard_spec(DB, Shard) ->
#{
id => Shard,
start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]},
shutdown => infinity,
restart => permanent,
type => supervisor
}.
shard_storage_spec(DB, Shard, Opts) ->
#{
id => {Shard, storage},
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Opts]},
shutdown => 5_000,
restart => permanent,
type => worker
}.
shard_buffer_spec(DB, Shard, Options) ->
#{
id => {Shard, buffer},
start => {emqx_ds_buffer, start_link, [emqx_ds_builtin_local, Options, DB, Shard]},
shutdown => 5_000,
restart => permanent,
type => worker
}.
ensure_started(Res) ->
case Res of
{ok, _Pid} ->
ok;
{error, {already_started, _Pid}} ->
ok;
{error, Reason} ->
{error, Reason}
end.

View File

@ -0,0 +1,204 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_builtin_local_meta).
-behaviour(gen_server).
%% API:
-export([
start_link/0,
open_db/2,
drop_db/1,
n_shards/1,
shards/1,
db_config/1,
update_db_config/2,
current_timestamp/1,
set_current_timestamp/2,
ensure_monotonic_timestamp/1
]).
%% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports:
-export([]).
-export_type([]).
-include_lib("stdlib/include/ms_transform.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
-define(META_TAB, emqx_ds_builtin_local_metadata_tab).
-record(?META_TAB, {
db :: emqx_ds:db(),
db_props :: emqx_ds_builtin_local:db_opts()
}).
%% We save timestamp of the last written message to a mnesia table.
%% The saved value is restored when the node restarts. This is needed
%% to create a timestamp that is truly monotonic even in presence of
%% node restarts.
-define(TS_TAB, emqx_ds_builtin_local_timestamp_tab).
-record(?TS_TAB, {
id :: emqx_ds_storage_layer:shard_id(),
latest :: integer()
}).
%%================================================================================
%% API functions
%%================================================================================
-define(SERVER, ?MODULE).
-spec start_link() -> {ok, pid()}.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec open_db(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) ->
emqx_ds_builtin_local:db_opts().
open_db(DB, CreateOpts = #{backend := builtin_local, storage := _, n_shards := _}) ->
transaction(
fun() ->
case mnesia:wread({?META_TAB, DB}) of
[] ->
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
CreateOpts;
[#?META_TAB{db_props = Opts}] ->
Opts
end
end
).
-spec drop_db(emqx_ds:db()) -> ok.
drop_db(DB) ->
transaction(
fun() ->
MS = ets:fun2ms(fun(#?TS_TAB{id = ID}) when element(1, ID) =:= DB ->
ID
end),
Timestamps = mnesia:select(?TS_TAB, MS, write),
[mnesia:delete({?TS_TAB, I}) || I <- Timestamps],
mnesia:delete({?META_TAB, DB})
end
).
-spec update_db_config(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) ->
emqx_ds_builtin_local:db_opts().
update_db_config(DB, Opts) ->
transaction(
fun() ->
mnesia:write(#?META_TAB{db = DB, db_props = Opts}),
Opts
end
).
-spec n_shards(emqx_ds:db()) -> pos_integer().
n_shards(DB) ->
#{n_shards := NShards} = db_config(DB),
NShards.
-spec shards(emqx_ds:db()) -> [emqx_ds_builtin_local:shard()].
shards(DB) ->
NShards = n_shards(DB),
[integer_to_binary(Shard) || Shard <- lists:seq(0, NShards - 1)].
-spec db_config(emqx_ds:db()) -> emqx_ds_builtin_local:db_opts().
db_config(DB) ->
case mnesia:dirty_read(?META_TAB, DB) of
[#?META_TAB{db_props = Props}] ->
Props;
[] ->
error({no_such_db, DB})
end.
-spec set_current_timestamp(emqx_ds_storage_layer:shard_id(), emqx_ds:time()) -> ok.
set_current_timestamp(ShardId, Time) ->
mria:dirty_write(?TS_TAB, #?TS_TAB{id = ShardId, latest = Time}).
-spec current_timestamp(emqx_ds_storage_layer:shard_id()) -> emqx_ds:time() | undefined.
current_timestamp(ShardId) ->
case mnesia:dirty_read(?TS_TAB, ShardId) of
[#?TS_TAB{latest = Latest}] ->
Latest;
[] ->
undefined
end.
-spec ensure_monotonic_timestamp(emqx_ds_storage_layer:shard_id()) -> emqx_ds:time().
ensure_monotonic_timestamp(ShardId) ->
mria:dirty_update_counter({?TS_TAB, ShardId}, 1).
%%================================================================================
%% behavior callbacks
%%================================================================================
-record(s, {}).
-define(timer_update, timer_update).
init([]) ->
process_flag(trap_exit, true),
ensure_tables(),
S = #s{},
{ok, S}.
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================
%% Internal functions
%%================================================================================
ensure_tables() ->
ok = mria:create_table(?META_TAB, [
{local_content, true},
{type, ordered_set},
{storage, disc_copies},
{record_name, ?META_TAB},
{attributes, record_info(fields, ?META_TAB)}
]),
ok = mria:create_table(?TS_TAB, [
{local_content, true},
{type, set},
{storage, disc_copies},
{record_name, ?TS_TAB},
{attributes, record_info(fields, ?TS_TAB)}
]).
transaction(Fun) ->
case mria:transaction(mria:local_content_shard(), Fun) of
{atomic, Result} ->
Result;
{aborted, Reason} ->
{error, Reason}
end.

View File

@ -0,0 +1,133 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This supervisor manages the global worker processes needed for
%% the functioning of builtin local databases, and all builtin local
%% databases that attach to it.
-module(emqx_ds_builtin_local_sup).
-behaviour(supervisor).
%% API:
-export([start_db/2, stop_db/1]).
%% behavior callbacks:
-export([init/1]).
%% internal exports:
-export([start_top/0, start_databases_sup/0]).
-export_type([]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(top, ?MODULE).
-define(databases, emqx_ds_builtin_local_db_sup).
%%================================================================================
%% API functions
%%================================================================================
-spec start_db(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) ->
supervisor:startchild_ret().
start_db(DB, Opts) ->
ensure_top(),
ChildSpec = #{
id => DB,
start => {?databases, start_db, [DB, Opts]},
type => supervisor,
shutdown => infinity
},
supervisor:start_child(?databases, ChildSpec).
-spec stop_db(emqx_ds:db()) -> ok.
stop_db(DB) ->
case whereis(?databases) of
Pid when is_pid(Pid) ->
_ = supervisor:terminate_child(?databases, DB),
_ = supervisor:delete_child(?databases, DB),
ok;
undefined ->
ok
end.
%%================================================================================
%% behavior callbacks
%%================================================================================
%% There are two layers of supervision:
%%
%% 1. top supervisor for the builtin backend. It contains the global
%% worker processes (like the metadata server), and `?databases'
%% supervisior.
%%
%% 2. `?databases': a `one_for_one' supervisor where each child is a
%% `db' supervisor that contains processes that represent the DB.
%% Chidren are attached dynamically to this one.
init(?top) ->
%% Children:
MetricsWorker = emqx_ds_builtin_metrics:child_spec(),
MetadataServer = #{
id => metadata_server,
start => {emqx_ds_builtin_local_meta, start_link, []},
restart => permanent,
type => worker,
shutdown => 5000
},
DBsSup = #{
id => ?databases,
start => {?MODULE, start_databases_sup, []},
restart => permanent,
type => supervisor,
shutdown => infinity
},
%%
SupFlags = #{
strategy => one_for_all,
intensity => 1,
period => 1,
auto_shutdown => never
},
{ok, {SupFlags, [MetricsWorker, MetadataServer, DBsSup]}};
init(?databases) ->
%% Children are added dynamically:
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 1
},
{ok, {SupFlags, []}}.
%%================================================================================
%% Internal exports
%%================================================================================
-spec start_top() -> {ok, pid()}.
start_top() ->
supervisor:start_link({local, ?top}, ?MODULE, ?top).
start_databases_sup() ->
supervisor:start_link({local, ?databases}, ?MODULE, ?databases).
%%================================================================================
%% Internal functions
%%================================================================================
ensure_top() ->
{ok, _} = emqx_ds_sup:attach_backend(builtin_local, {?MODULE, start_top, []}),
ok.

View File

@ -0,0 +1,346 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_builtin_local_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(N_SHARDS, 1).
opts(_Config) ->
#{
backend => builtin_local,
storage => {emqx_ds_storage_reference, #{}},
n_shards => ?N_SHARDS
}.
t_drop_generation_with_never_used_iterator(Config) ->
%% This test checks how the iterator behaves when:
%% 1) it's created at generation 1 and not consumed from.
%% 2) generation 2 is created and 1 dropped.
%% 3) iteration begins.
%% In this case, the iterator won't see any messages and the stream will end.
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
TopicFilter = emqx_topic:words(<<"foo/+">>),
StartTime = 0,
Msgs0 = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
ok = emqx_ds:add_generation(DB),
ok = emqx_ds:drop_generation(DB, GenId0),
Now = emqx_message:timestamp_now(),
Msgs1 = [
message(<<"foo/bar">>, <<"3">>, Now + 100),
message(<<"foo/baz">>, <<"4">>, Now + 101)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
?assertError(
{error, unrecoverable, generation_not_found},
emqx_ds_test_helpers:consume_iter(DB, Iter0)
),
%% New iterator for the new stream will only see the later messages.
[{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
?assertNotEqual(Stream0, Stream1),
{ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime),
{ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}),
?assertNotEqual(end_of_stream, Iter),
?assertEqual(Msgs1, Batch),
ok.
t_drop_generation_with_used_once_iterator(Config) ->
%% This test checks how the iterator behaves when:
%% 1) it's created at generation 1 and consumes at least 1 message.
%% 2) generation 2 is created and 1 dropped.
%% 3) iteration continues.
%% In this case, the iterator should see no more messages and the stream will end.
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
TopicFilter = emqx_topic:words(<<"foo/+">>),
StartTime = 0,
Msgs0 =
[Msg0 | _] = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
{ok, Iter1, Batch1} = emqx_ds:next(DB, Iter0, 1),
?assertNotEqual(end_of_stream, Iter1),
?assertEqual([Msg0], [Msg || {_Key, Msg} <- Batch1]),
ok = emqx_ds:add_generation(DB),
ok = emqx_ds:drop_generation(DB, GenId0),
Now = emqx_message:timestamp_now(),
Msgs1 = [
message(<<"foo/bar">>, <<"3">>, Now + 100),
message(<<"foo/baz">>, <<"4">>, Now + 101)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
?assertError(
{error, unrecoverable, generation_not_found},
emqx_ds_test_helpers:consume_iter(DB, Iter1)
).
t_drop_generation_update_iterator(Config) ->
%% This checks the behavior of `emqx_ds:update_iterator' after the generation
%% underlying the iterator has been dropped.
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
TopicFilter = emqx_topic:words(<<"foo/+">>),
StartTime = 0,
Msgs0 = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
{ok, Iter1, _Batch1} = emqx_ds:next(DB, Iter0, 1),
{ok, _Iter2, [{Key2, _Msg}]} = emqx_ds:next(DB, Iter1, 1),
ok = emqx_ds:add_generation(DB),
ok = emqx_ds:drop_generation(DB, GenId0),
?assertEqual(
{error, unrecoverable, generation_not_found},
emqx_ds:update_iterator(DB, Iter1, Key2)
).
t_make_iterator_stale_stream(Config) ->
%% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying
%% the stream has been dropped.
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
TopicFilter = emqx_topic:words(<<"foo/+">>),
StartTime = 0,
Msgs0 = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
[{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
ok = emqx_ds:add_generation(DB),
ok = emqx_ds:drop_generation(DB, GenId0),
?assertEqual(
{error, unrecoverable, generation_not_found},
emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime)
),
ok.
t_get_streams_concurrently_with_drop_generation(Config) ->
%% This checks that we can get all streams while a generation is dropped
%% mid-iteration.
DB = ?FUNCTION_NAME,
?check_trace(
#{timetrap => 5_000},
begin
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
[GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
ok = emqx_ds:add_generation(DB),
ok = emqx_ds:add_generation(DB),
%% All streams
TopicFilter = emqx_topic:words(<<"foo/+">>),
StartTime = 0,
?assertMatch([_, _, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
?force_ordering(
#{?snk_kind := dropped_gen},
#{?snk_kind := get_streams_get_gen}
),
spawn_link(fun() ->
{ok, _} = ?block_until(#{?snk_kind := get_streams_all_gens}),
ok = emqx_ds:drop_generation(DB, GenId0),
?tp(dropped_gen, #{})
end),
?assertMatch([_, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
ok
end,
[]
).
%% This testcase verifies the behavior of `store_batch' operation
%% when the underlying code experiences recoverable or unrecoverable
%% problems.
t_store_batch_fail(Config) ->
?check_trace(
#{timetrap => 15_000},
try
meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, (opts(Config))#{n_shards => 2})),
%% Success:
Batch1 = [
message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
%% Inject unrecoverable error:
meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
{error, unrecoverable, mock}
end),
Batch2 = [
message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1),
message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1)
],
?assertMatch(
{error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
),
%% Inject a recoveralbe error:
meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
{error, recoverable, mock}
end),
Batch3 = [
message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2),
message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2),
message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3),
message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3)
],
%% Note: due to idempotency issues the number of retries
%% is currently set to 0:
?assertMatch(
{error, recoverable, mock},
emqx_ds:store_batch(DB, Batch3, #{sync => true})
),
meck:unload(emqx_ds_storage_layer),
?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
after
meck:unload()
end,
[
{"message ordering", fun(StoredMessages, _Trace) ->
[{_, MessagesFromStream1}, {_, MessagesFromStream2}] = StoredMessages,
emqx_ds_test_helpers:diff_messages(
[payload],
[
#message{payload = <<"1">>},
#message{payload = <<"2">>},
#message{payload = <<"5">>},
#message{payload = <<"7">>}
],
MessagesFromStream1
),
emqx_ds_test_helpers:diff_messages(
[payload],
[
#message{payload = <<"6">>},
#message{payload = <<"8">>}
],
MessagesFromStream2
)
end}
]
).
message(ClientId, Topic, Payload, PublishedAt) ->
Msg = message(Topic, Payload, PublishedAt),
Msg#message{from = ClientId}.
message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
delete(DB, It, Selector, BatchSize) ->
delete(DB, It, Selector, BatchSize, 0).
delete(DB, It0, Selector, BatchSize, Acc) ->
case emqx_ds:delete_next(DB, It0, Selector, BatchSize) of
{ok, It, 0} ->
{ok, It, Acc};
{ok, It, NumDeleted} ->
delete(DB, It, BatchSize, Selector, Acc + NumDeleted);
{ok, end_of_stream} ->
{ok, end_of_stream, Acc};
Ret ->
Ret
end.
%% CT callbacks
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Apps = emqx_cth_suite:start(
[mria, emqx_ds_builtin_local],
#{work_dir => ?config(priv_dir, Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.
init_per_testcase(_TC, Config) ->
application:ensure_all_started(emqx_durable_storage),
Config.
end_per_testcase(_TC, _Config) ->
snabbkaffe:stop(),
ok = application:stop(emqx_durable_storage),
mria:stop(),
_ = mnesia:delete_schema([node()]),
ok.

View File

@ -0,0 +1,423 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Buffer servers are responsible for collecting batches from the
%% local processes, sharding and repackaging them.
-module(emqx_ds_buffer).
-behaviour(gen_server).
%% API:
-export([start_link/4, store_batch/3]).
%% behavior callbacks:
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports:
-export([]).
-export_type([]).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
-define(flush, flush).
-define(cbm(DB), {?MODULE, DB}).
-record(enqueue_req, {
messages :: [emqx_types:message()],
sync :: boolean(),
atomic :: boolean(),
n_messages :: non_neg_integer(),
payload_bytes :: non_neg_integer()
}).
-callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}.
-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) ->
{State, ok | {error, recoverable | unrecoverable, _}}.
-callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) ->
_Shard.
%%================================================================================
%% API functions
%%================================================================================
-spec start_link(module(), _CallbackOptions, emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
{ok, pid()}.
start_link(CallbackModule, CallbackOptions, DB, Shard) ->
gen_server:start_link(
?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], []
).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) ->
Sync = maps:get(sync, Opts, true),
Atomic = maps:get(atomic, Opts, false),
%% Usually we expect all messages in the batch to go into the
%% single shard, so this function is optimized for the happy case.
case shards_of_batch(DB, Messages) of
[{Shard, {NMsgs, NBytes}}] ->
%% Happy case:
enqueue_call_or_cast(
?via(DB, Shard),
#enqueue_req{
messages = Messages,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
payload_bytes = NBytes
}
);
[_, _ | _] when Atomic ->
%% It's impossible to commit a batch to multiple shards
%% atomically
{error, unrecoverable, atomic_commit_to_multiple_shards};
_Shards ->
%% Use a slower implementation for the unlikely case:
repackage_messages(DB, Messages, Sync)
end.
%%================================================================================
%% behavior callbacks
%%================================================================================
-record(s, {
callback_module :: module(),
callback_state :: term(),
db :: emqx_ds:db(),
shard :: emqx_ds_replication_layer:shard_id(),
metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
n_retries = 0 :: non_neg_integer(),
%% FIXME: Currently max_retries is always 0, because replication
%% layer doesn't guarantee idempotency. Retrying would create
%% duplicate messages.
max_retries = 0 :: non_neg_integer(),
n = 0 :: non_neg_integer(),
n_bytes = 0 :: non_neg_integer(),
tref :: undefined | reference(),
queue :: queue:queue(emqx_types:message()),
pending_replies = [] :: [gen_server:from()]
}).
init([CBM, CBMOptions, DB, Shard]) ->
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}),
MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard),
ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId),
{ok, CallbackS} = CBM:init_buffer(DB, Shard, CBMOptions),
S = #s{
callback_module = CBM,
callback_state = CallbackS,
db = DB,
shard = Shard,
metrics_id = MetricsId,
queue = queue:new()
},
persistent_term:put(?cbm(DB), {CBM, CBMOptions}),
{ok, S}.
format_status(Status) ->
maps:map(
fun
(state, #s{db = DB, shard = Shard, queue = Q}) ->
#{
db => DB,
shard => Shard,
queue => queue:len(Q)
};
(_, Val) ->
Val
end,
Status
).
handle_call(
#enqueue_req{
messages = Msgs,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
payload_bytes = NBytes
},
From,
S0 = #s{pending_replies = Replies0}
) ->
S = S0#s{pending_replies = [From | Replies0]},
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(
#enqueue_req{
messages = Msgs,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
payload_bytes = NBytes
},
S
) ->
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(?flush, S) ->
{noreply, flush(S)};
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, #s{db = DB}) ->
persistent_term:erase(?cbm(DB)),
ok.
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================
%% Internal functions
%%================================================================================
enqueue(
Sync,
Atomic,
Msgs,
BatchSize,
BatchBytes,
S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0}
) ->
%% At this point we don't split the batches, even when they aren't
%% atomic. It wouldn't win us anything in terms of memory, and
%% EMQX currently feeds data to DS in very small batches, so
%% granularity should be fine enough.
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
NMsgs = NMsgs0 + BatchSize,
NBytes = NBytes0 + BatchBytes,
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of
true ->
%% Adding this batch would cause buffer to overflow. Flush
%% it now, and retry:
S1 = flush(S0),
enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
false ->
%% The buffer is empty, we enqueue the atomic batch in its
%% entirety:
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
case NMsgs >= NMax orelse NBytes >= NBytesMax of
true ->
flush(S1);
false ->
ensure_timer(S1)
end
end.
shard_of_message(DB, Message, ShardBy) ->
{CBM, Options} = persistent_term:get(?cbm(DB)),
CBM:shard_of_message(DB, Message, ShardBy, Options).
-define(COOLDOWN_MIN, 1000).
-define(COOLDOWN_MAX, 5000).
flush(S) ->
do_flush(cancel_timer(S)).
do_flush(S0 = #s{n = 0}) ->
S0;
do_flush(
S0 = #s{
callback_module = CBM,
callback_state = CallbackS0,
queue = Q,
pending_replies = Replies,
db = DB,
shard = Shard,
metrics_id = Metrics,
n_retries = Retries,
max_retries = MaxRetries
}
) ->
Messages = queue:to_list(Q),
T0 = erlang:monotonic_time(microsecond),
{CallbackS, Result} = CBM:flush_buffer(DB, Shard, Messages, CallbackS0),
S = S0#s{callback_state = CallbackS},
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0),
case Result of
ok ->
emqx_ds_builtin_metrics:inc_egress_batches(Metrics),
emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n),
emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes),
?tp(
emqx_ds_replication_layer_egress_flush,
#{db => DB, shard => Shard, batch => Messages}
),
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
erlang:garbage_collect(),
S#s{
callback_state = CallbackS,
n = 0,
n_bytes = 0,
queue = queue:new(),
pending_replies = []
};
{timeout, ServerId} when Retries < MaxRetries ->
%% Note: this is a hot loop, so we report error messages
%% with `debug' level to avoid wiping the logs. Instead,
%% error the detection must rely on the metrics. Debug
%% logging can be enabled for the particular egress server
%% via logger domain.
?tp(
debug,
emqx_ds_replication_layer_egress_flush_retry,
#{db => DB, shard => Shard, reason => timeout, server_id => ServerId}
),
%% Retry sending the batch:
emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
erlang:garbage_collect(),
%% We block the gen_server until the next retry.
BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
timer:sleep(BlockTime),
S#s{n_retries = Retries + 1};
Err ->
?tp(
debug,
emqx_ds_replication_layer_egress_flush_failed,
#{db => DB, shard => Shard, error => Err}
),
emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
Reply =
case Err of
{error, _, _} -> Err;
{timeout, ServerId} -> {error, recoverable, {timeout, ServerId}};
_ -> {error, unrecoverable, Err}
end,
lists:foreach(
fun(From) -> gen_server:reply(From, Reply) end, Replies
),
erlang:garbage_collect(),
S#s{
n = 0,
n_bytes = 0,
queue = queue:new(),
pending_replies = [],
n_retries = 0
}
end.
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
[{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}]
when
NMessages :: non_neg_integer(),
NBytes :: non_neg_integer().
shards_of_batch(DB, Messages) ->
maps:to_list(
lists:foldl(
fun(Message, Acc) ->
%% TODO: sharding strategy must be part of the DS DB schema:
Shard = shard_of_message(DB, Message, clientid),
Size = payload_size(Message),
maps:update_with(
Shard,
fun({N, S}) ->
{N + 1, S + Size}
end,
{1, Size},
Acc
)
end,
#{},
Messages
)
).
repackage_messages(DB, Messages, Sync) ->
Batches = lists:foldl(
fun(Message, Acc) ->
Shard = shard_of_message(DB, Message, clientid),
Size = payload_size(Message),
maps:update_with(
Shard,
fun({N, S, Msgs}) ->
{N + 1, S + Size, [Message | Msgs]}
end,
{1, Size, [Message]},
Acc
)
end,
#{},
Messages
),
maps:fold(
fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) ->
Err = enqueue_call_or_cast(
?via(DB, Shard),
#enqueue_req{
messages = lists:reverse(RevMessages),
sync = Sync,
atomic = false,
n_messages = NMsgs,
payload_bytes = ByteSize
}
),
compose_errors(ErrAcc, Err)
end,
ok,
Batches
).
enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) ->
gen_server:call(To, Req, infinity);
enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) ->
gen_server:cast(To, Req).
compose_errors(ErrAcc, ok) ->
ErrAcc;
compose_errors(ok, Err) ->
Err;
compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) ->
{error, unrecoverable, Err};
compose_errors(ErrAcc, _Err) ->
ErrAcc.
ensure_timer(S = #s{tref = undefined}) ->
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
Tref = erlang:send_after(Interval, self(), ?flush),
S#s{tref = Tref};
ensure_timer(S) ->
S.
cancel_timer(S = #s{tref = undefined}) ->
S;
cancel_timer(S = #s{tref = TRef}) ->
_ = erlang:cancel_timer(TRef),
S#s{tref = undefined}.
%% @doc Return approximate size of the MQTT message (it doesn't take
%% all things into account, for example headers and extras)
payload_size(#message{payload = P, topic = T}) ->
size(P) + size(T).

View File

@ -26,16 +26,13 @@
-define(SHARD, shard(?FUNCTION_NAME)). -define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG, #{ -define(DEFAULT_CONFIG, #{
backend => builtin, backend => builtin_local,
storage => {emqx_ds_storage_bitfield_lts, #{}}, storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 1, n_shards => 1
n_sites => 1,
replication_factor => 1,
replication_options => #{}
}). }).
-define(COMPACT_CONFIG, #{ -define(COMPACT_CONFIG, #{
backend => builtin, backend => builtin_local,
storage => storage =>
{emqx_ds_storage_bitfield_lts, #{ {emqx_ds_storage_bitfield_lts, #{
bits_per_wildcard_level => 8 bits_per_wildcard_level => 8
@ -138,8 +135,8 @@ t_get_streams(_Config) ->
[FooBarBaz] = GetStream(<<"foo/bar/baz">>), [FooBarBaz] = GetStream(<<"foo/bar/baz">>),
[A] = GetStream(<<"a">>), [A] = GetStream(<<"a">>),
%% Restart shard to make sure trie is persisted and restored: %% Restart shard to make sure trie is persisted and restored:
ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME), ok = emqx_ds:close_db(?FUNCTION_NAME),
{ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}), ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
%% Verify that there are no "ghost streams" for topics that don't %% Verify that there are no "ghost streams" for topics that don't
%% have any messages: %% have any messages:
[] = GetStream(<<"bar/foo">>), [] = GetStream(<<"bar/foo">>),
@ -241,8 +238,8 @@ t_replay(_Config) ->
?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)), ?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)),
?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)), ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
%% Restart the DB to make sure trie is persisted and restored: %% Restart the DB to make sure trie is persisted and restored:
ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME), ok = emqx_ds:close_db(?FUNCTION_NAME),
{ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}), ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
%% Learned wildcard topics: %% Learned wildcard topics:
?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])), ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)), ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
@ -512,7 +509,7 @@ suite() -> [{timetrap, {seconds, 20}}].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(), emqx_common_test_helpers:clear_screen(),
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[emqx_durable_storage], [emqx_ds_builtin_local],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
[{apps, Apps} | Config]. [{apps, Apps} | Config].