diff --git a/apps/emqx_ds_builtin_local/README.md b/apps/emqx_ds_builtin_local/README.md new file mode 100644 index 000000000..fec609493 --- /dev/null +++ b/apps/emqx_ds_builtin_local/README.md @@ -0,0 +1,32 @@ +# Local Backend for EMQX Durable Storage + +# Features + +This backend uses local RocksDB database to store data. + +# Limitation + +This backend cannot be used in a clustered EMQX setup. + +# Documentation links + +TBD + +# Usage + +TBD + +# Configurations + +TBD + +# HTTP APIs + +TBD + +# Other + +TBD + +# Contributing +Please see our [contributing.md](../../CONTRIBUTING.md). diff --git a/apps/emqx_ds_builtin_local/rebar.config b/apps/emqx_ds_builtin_local/rebar.config new file mode 100644 index 000000000..d70aa75e0 --- /dev/null +++ b/apps/emqx_ds_builtin_local/rebar.config @@ -0,0 +1,5 @@ +%% -*- mode:erlang -*- + +{deps, [ + {emqx_durable_storage, {path, "../emqx_durable_storage"}} +]}. diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.app.src b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.app.src new file mode 100644 index 000000000..e7531ae45 --- /dev/null +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.app.src @@ -0,0 +1,11 @@ +%% -*- mode: erlang -*- +{application, emqx_ds_builtin_local, [ + {description, "A DS backend that stores all data locally and thus doesn't support clustering."}, + % strict semver, bump manually! + {vsn, "0.1.0"}, + {modules, []}, + {registered, []}, + {applications, [kernel, stdlib, rocksdb, emqx_durable_storage, emqx_utils]}, + {mod, {emqx_ds_builtin_local_app, []}}, + {env, []} +]}. diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl new file mode 100644 index 000000000..d7e5972ab --- /dev/null +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -0,0 +1,372 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_builtin_local). + +-behaviour(emqx_ds). +-behaviour(emqx_ds_buffer). + +%% API: +-export([]). + +%% behavior callbacks: +-export([ + %% `emqx_ds': + open_db/2, + close_db/1, + add_generation/1, + update_db_config/2, + list_generations_with_lifetimes/1, + drop_generation/2, + drop_db/1, + store_batch/3, + get_streams/3, + get_delete_streams/3, + make_iterator/4, + make_delete_iterator/4, + update_iterator/3, + next/3, + delete_next/4, + + %% `emqx_ds_buffer': + init_buffer/3, + flush_buffer/4, + shard_of_message/4 +]). + +-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). + +-include_lib("emqx_utils/include/emqx_message.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(tag, 1). +-define(shard, 2). +-define(enc, 3). + +-define(IT, 61). +-define(DELETE_IT, 62). + +-type shard() :: binary(). + +-opaque iterator() :: + #{ + ?tag := ?IT, + ?shard := shard(), + ?enc := term() + }. + +-opaque delete_iterator() :: + #{ + ?tag := ?DELETE_IT, + ?shard := shard(), + ?enc := term() + }. + +-type db_opts() :: + #{ + backend := builtin_local, + storage := emqx_ds_storage_layer:prototype(), + n_shards := pos_integer() + }. + +-type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}. + +-define(stream(SHARD, INNER), [2, SHARD | INNER]). +-define(delete_stream(SHARD, INNER), [3, SHARD | INNER]). + +%%================================================================================ +%% API functions +%%================================================================================ + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +-spec open_db(emqx_ds:db(), db_opts()) -> ok | {error, _}. +open_db(DB, CreateOpts) -> + case emqx_ds_builtin_local_sup:start_db(DB, CreateOpts) of + {ok, _} -> + ok; + {error, {already_started, _}} -> + ok; + {error, Err} -> + {error, Err} + end. + +-spec close_db(emqx_ds:db()) -> ok. +close_db(DB) -> + emqx_ds_builtin_local_sup:stop_db(DB). + +-spec add_generation(emqx_ds:db()) -> ok | {error, _}. +add_generation(DB) -> + Shards = emqx_ds_builtin_local_meta:shards(DB), + Errors = lists:filtermap( + fun(Shard) -> + ShardId = {DB, Shard}, + case + emqx_ds_storage_layer:add_generation( + ShardId, emqx_ds_builtin_local_meta:ensure_monotonic_timestamp(ShardId) + ) + of + ok -> + false; + Error -> + {true, {Shard, Error}} + end + end, + Shards + ), + case Errors of + [] -> ok; + _ -> {error, Errors} + end. + +-spec update_db_config(emqx_ds:db(), db_opts()) -> ok | {error, _}. +update_db_config(DB, CreateOpts) -> + Opts = #{} = emqx_ds_builtin_local_meta:update_db_config(DB, CreateOpts), + lists:foreach( + fun(Shard) -> + ShardId = {DB, Shard}, + emqx_ds_storage_layer:update_config( + ShardId, emqx_ds_builtin_local_meta:ensure_monotonic_timestamp(ShardId), Opts + ) + end, + emqx_ds_builtin_local_meta:shards(DB) + ). + +-spec list_generations_with_lifetimes(emqx_ds:db()) -> + #{emqx_ds:generation_rank() => emqx_ds:generation_info()}. +list_generations_with_lifetimes(DB) -> + lists:foldl( + fun(Shard, Acc) -> + maps:fold( + fun(GenId, Data, Acc1) -> + Acc1#{{Shard, GenId} => Data} + end, + Acc, + emqx_ds_storage_layer:list_generations_with_lifetimes({DB, Shard}) + ) + end, + #{}, + emqx_ds_builtin_local_meta:shards(DB) + ). + +-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}. +drop_generation(DB, {Shard, GenId}) -> + emqx_ds_storage_layer:drop_generation({DB, Shard}, GenId). + +-spec drop_db(emqx_ds:db()) -> ok | {error, _}. +drop_db(DB) -> + close_db(DB), + lists:foreach( + fun(Shard) -> + emqx_ds_storage_layer:drop_shard({DB, Shard}) + end, + emqx_ds_builtin_local_meta:shards(DB) + ), + emqx_ds_builtin_local_meta:drop_db(DB). + +-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + emqx_ds:store_batch_result(). +store_batch(DB, Messages, Opts) -> + try + emqx_ds_buffer:store_batch(DB, Messages, Opts) + catch + error:{Reason, _Call} when Reason == timeout; Reason == noproc -> + {error, recoverable, Reason} + end. + +-record(bs, {options :: term()}). +-type buffer_state() :: #bs{}. + +-spec init_buffer(emqx_ds:db(), shard(), _Options) -> {ok, buffer_state()}. +init_buffer(DB, Shard, Options) -> + ShardId = {DB, Shard}, + case current_timestamp(ShardId) of + undefined -> + Latest = erlang:system_time(microsecond), + emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest); + _Latest -> + ok + end, + {ok, #bs{options = Options}}. + +-spec flush_buffer(emqx_ds:db(), shard(), [emqx_types:message()], buffer_state()) -> + {buffer_state(), emqx_ds:store_batch_result()}. +flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) -> + {Latest, Batch} = assign_timestamps(current_timestamp({DB, Shard}), Messages), + Result = emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options), + emqx_ds_builtin_local_meta:set_current_timestamp({DB, Shard}, Latest), + {S0, Result}. + +assign_timestamps(Latest, Messages) -> + assign_timestamps(Latest, Messages, []). + +assign_timestamps(Latest, [MessageIn | Rest], Acc) -> + case emqx_message:timestamp(MessageIn, microsecond) of + TimestampUs when TimestampUs > Latest -> + Message = assign_timestamp(TimestampUs, MessageIn), + assign_timestamps(TimestampUs, Rest, [Message | Acc]); + _Earlier -> + Message = assign_timestamp(Latest + 1, MessageIn), + assign_timestamps(Latest + 1, Rest, [Message | Acc]) + end; +assign_timestamps(Latest, [], Acc) -> + {Latest, lists:reverse(Acc)}. + +assign_timestamp(TimestampUs, Message) -> + {TimestampUs, Message}. + +-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard(). +shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> + N = emqx_ds_builtin_local_meta:n_shards(DB), + Hash = + case SerializeBy of + clientid -> erlang:phash2(From, N); + topic -> erlang:phash2(Topic, N) + end, + integer_to_binary(Hash). + +-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [{emqx_ds:stream_rank(), emqx_ds:ds_specific_stream()}]. +get_streams(DB, TopicFilter, StartTime) -> + Shards = emqx_ds_builtin_local_meta:shards(DB), + lists:flatmap( + fun(Shard) -> + Streams = emqx_ds_storage_layer:get_streams( + {DB, Shard}, TopicFilter, timestamp_to_timeus(StartTime) + ), + lists:map( + fun({RankY, InnerStream}) -> + Rank = {Shard, RankY}, + {Rank, ?stream(Shard, InnerStream)} + end, + Streams + ) + end, + Shards + ). + +-spec make_iterator( + emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time() +) -> + emqx_ds:make_iterator_result(emqx_ds:ds_specific_iterator()). +make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) -> + ShardId = {DB, Shard}, + case + emqx_ds_storage_layer:make_iterator( + ShardId, InnerStream, TopicFilter, timestamp_to_timeus(StartTime) + ) + of + {ok, Iter} -> + {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; + Error = {error, _, _} -> + Error + end. + +-spec update_iterator(emqx_ds:db(), emqx_ds:ds_specific_iterator(), emqx_ds:message_key()) -> + emqx_ds:make_iterator_result(iterator()). +update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) -> + case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of + {ok, StorageIter} -> + {ok, Iter0#{?enc => StorageIter}}; + Err = {error, _, _} -> + Err + end. + +-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). +next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) -> + ShardId = {DB, Shard}, + T0 = erlang:monotonic_time(microsecond), + Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0), + case Result of + {ok, StorageIter, Batch} -> + Iter = Iter0#{?enc := StorageIter}, + {ok, Iter, Batch}; + Other -> + Other + end. + +-spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [emqx_ds:ds_specific_delete_stream()]. +get_delete_streams(DB, TopicFilter, StartTime) -> + Shards = emqx_ds_builtin_local_meta:shards(DB), + lists:flatmap( + fun(Shard) -> + Streams = emqx_ds_storage_layer:get_delete_streams( + {DB, Shard}, TopicFilter, timestamp_to_timeus(StartTime) + ), + lists:map( + fun(InnerStream) -> + ?delete_stream(Shard, InnerStream) + end, + Streams + ) + end, + Shards + ). + +-spec make_delete_iterator( + emqx_ds:db(), emqx_ds:ds_specific_delete_stream(), emqx_ds:topic_filter(), emqx_ds:time() +) -> + emqx_ds:make_delete_iterator_result(delete_iterator()). +make_delete_iterator(DB, ?delete_stream(Shard, InnerStream), TopicFilter, StartTime) -> + ShardId = {DB, Shard}, + case + emqx_ds_storage_layer:make_delete_iterator( + ShardId, InnerStream, TopicFilter, timestamp_to_timeus(StartTime) + ) + of + {ok, Iter} -> + {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}}; + Error = {error, _, _} -> + Error + end. + +-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> + emqx_ds:delete_next_result(emqx_ds:delete_iterator()). +delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N) -> + ShardId = {DB, Shard}, + case + emqx_ds_storage_layer:delete_next( + ShardId, StorageIter0, Selector, N, current_timestamp(ShardId) + ) + of + {ok, StorageIter, Ndeleted} -> + {ok, Iter#{?enc => StorageIter}, Ndeleted}; + {ok, end_of_stream} -> + {ok, end_of_stream}; + Error -> + Error + end. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +current_timestamp(ShardId) -> + emqx_ds_builtin_local_meta:current_timestamp(ShardId). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +timestamp_to_timeus(TimestampMs) -> + TimestampMs * 1000. diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_app.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_app.erl new file mode 100644 index 000000000..1b64405d6 --- /dev/null +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_app.erl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_builtin_local_app). + +%% API: +-export([]). + +%% behavior callbacks: +-export([start/2]). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +start(_StartType, _StartArgs) -> + emqx_ds:register_backend(builtin_local, emqx_ds_builtin_local), + %% TODO: fixme + {ok, self()}. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl new file mode 100644 index 000000000..8776416e0 --- /dev/null +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl @@ -0,0 +1,219 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Supervisor that contains all the processes that belong to a +%% given builtin DS database. +-module(emqx_ds_builtin_local_db_sup). + +-behaviour(supervisor). + +%% API: +-export([ + start_db/2, + start_shard/1, + stop_shard/1, + terminate_storage/1, + restart_storage/1, + ensure_shard/1 +]). +-export([which_dbs/0, which_shards/1]). + +%% Debug: +-export([ + get_shard_workers/1 +]). + +%% behaviour callbacks: +-export([init/1]). + +%% internal exports: +-export([start_link_sup/2]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(via(REC), {via, gproc, {n, l, REC}}). + +-define(db_sup, ?MODULE). +-define(shards_sup, emqx_ds_builtin_local_db_shards_sup). +-define(shard_sup, emqx_ds_builtin_local_db_shard_sup). + +-record(?db_sup, {db}). +-record(?shards_sup, {db}). +-record(?shard_sup, {db, shard}). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec start_db(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) -> {ok, pid()}. +start_db(DB, Opts) -> + start_link_sup(#?db_sup{db = DB}, Opts). + +-spec start_shard(emqx_ds_storage_layer:shard_id()) -> + supervisor:startchild_ret(). +start_shard({DB, Shard}) -> + supervisor:start_child(?via(#?shards_sup{db = DB}), shard_spec(DB, Shard)). + +-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, not_found}. +stop_shard({DB, Shard}) -> + Sup = ?via(#?shards_sup{db = DB}), + case supervisor:terminate_child(Sup, Shard) of + ok -> + supervisor:delete_child(Sup, Shard); + {error, Reason} -> + {error, Reason} + end. + +-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}. +terminate_storage({DB, Shard}) -> + Sup = ?via(#?shard_sup{db = DB, shard = Shard}), + supervisor:terminate_child(Sup, {Shard, storage}). + +-spec restart_storage(emqx_ds_storage_layer:shard_id()) -> {ok, _Child} | {error, _Reason}. +restart_storage({DB, Shard}) -> + Sup = ?via(#?shard_sup{db = DB, shard = Shard}), + supervisor:restart_child(Sup, {Shard, storage}). + +-spec ensure_shard(emqx_ds_storage_layer:shard_id()) -> + ok | {error, _Reason}. +ensure_shard(Shard) -> + ensure_started(start_shard(Shard)). + +-spec which_shards(emqx_ds:db()) -> + [_Child]. +which_shards(DB) -> + supervisor:which_children(?via(#?shards_sup{db = DB})). + +%% @doc Return the list of builtin DS databases that are currently +%% active on the node. +-spec which_dbs() -> [emqx_ds:db()]. +which_dbs() -> + Key = {n, l, #?db_sup{_ = '_', db = '$1'}}, + gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]). + +%% @doc Get pids of all local shard servers for the given DB. +-spec get_shard_workers(emqx_ds:db()) -> #{_Shard => pid()}. +get_shard_workers(DB) -> + Shards = supervisor:which_children(?via(#?shards_sup{db = DB})), + L = lists:flatmap( + fun + ({_Shard, Sup, _, _}) when is_pid(Sup) -> + [{Id, Pid} || {Id, Pid, _, _} <- supervisor:which_children(Sup), is_pid(Pid)]; + (_) -> + [] + end, + Shards + ), + maps:from_list(L). + +%%================================================================================ +%% behaviour callbacks +%%================================================================================ + +init({#?db_sup{db = DB}, DefaultOpts}) -> + %% Spec for the top-level supervisor for the database: + logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_metrics:init_for_db(DB), + Opts = emqx_ds_builtin_local_meta:open_db(DB, DefaultOpts), + Children = [ + sup_spec(#?shards_sup{db = DB}, Opts) + ], + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, + {ok, {SupFlags, Children}}; +init({#?shards_sup{db = DB}, _Opts}) -> + %% Spec for the supervisor that manages the supervisors for + %% each local shard of the DB: + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 1 + }, + Children = [shard_spec(DB, Shard) || Shard <- emqx_ds_builtin_local_meta:shards(DB)], + {ok, {SupFlags, Children}}; +init({#?shard_sup{db = DB, shard = Shard}, _}) -> + SupFlags = #{ + strategy => rest_for_one, + intensity => 10, + period => 100 + }, + Opts = emqx_ds_builtin_local_meta:db_config(DB), + Children = [ + shard_storage_spec(DB, Shard, Opts), + shard_buffer_spec(DB, Shard, Opts) + ], + {ok, {SupFlags, Children}}. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +start_link_sup(Id, Options) -> + supervisor:start_link(?via(Id), ?MODULE, {Id, Options}). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +sup_spec(Id, Options) -> + #{ + id => element(1, Id), + start => {?MODULE, start_link_sup, [Id, Options]}, + type => supervisor, + shutdown => infinity + }. + +shard_spec(DB, Shard) -> + #{ + id => Shard, + start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]}, + shutdown => infinity, + restart => permanent, + type => supervisor + }. + +shard_storage_spec(DB, Shard, Opts) -> + #{ + id => {Shard, storage}, + start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Opts]}, + shutdown => 5_000, + restart => permanent, + type => worker + }. + +shard_buffer_spec(DB, Shard, Options) -> + #{ + id => {Shard, buffer}, + start => {emqx_ds_buffer, start_link, [emqx_ds_builtin_local, Options, DB, Shard]}, + shutdown => 5_000, + restart => permanent, + type => worker + }. + +ensure_started(Res) -> + case Res of + {ok, _Pid} -> + ok; + {error, {already_started, _Pid}} -> + ok; + {error, Reason} -> + {error, Reason} + end. diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl new file mode 100644 index 000000000..dbc68cd2c --- /dev/null +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl @@ -0,0 +1,204 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_builtin_local_meta). + +-behaviour(gen_server). + +%% API: +-export([ + start_link/0, + open_db/2, + drop_db/1, + n_shards/1, + shards/1, + db_config/1, + update_db_config/2, + + current_timestamp/1, + set_current_timestamp/2, + ensure_monotonic_timestamp/1 +]). + +%% behavior callbacks: +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +%% internal exports: +-export([]). + +-export_type([]). + +-include_lib("stdlib/include/ms_transform.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(META_TAB, emqx_ds_builtin_local_metadata_tab). +-record(?META_TAB, { + db :: emqx_ds:db(), + db_props :: emqx_ds_builtin_local:db_opts() +}). + +%% We save timestamp of the last written message to a mnesia table. +%% The saved value is restored when the node restarts. This is needed +%% to create a timestamp that is truly monotonic even in presence of +%% node restarts. +-define(TS_TAB, emqx_ds_builtin_local_timestamp_tab). +-record(?TS_TAB, { + id :: emqx_ds_storage_layer:shard_id(), + latest :: integer() +}). + +%%================================================================================ +%% API functions +%%================================================================================ + +-define(SERVER, ?MODULE). + +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +-spec open_db(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) -> + emqx_ds_builtin_local:db_opts(). +open_db(DB, CreateOpts = #{backend := builtin_local, storage := _, n_shards := _}) -> + transaction( + fun() -> + case mnesia:wread({?META_TAB, DB}) of + [] -> + mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), + CreateOpts; + [#?META_TAB{db_props = Opts}] -> + Opts + end + end + ). + +-spec drop_db(emqx_ds:db()) -> ok. +drop_db(DB) -> + transaction( + fun() -> + MS = ets:fun2ms(fun(#?TS_TAB{id = ID}) when element(1, ID) =:= DB -> + ID + end), + Timestamps = mnesia:select(?TS_TAB, MS, write), + [mnesia:delete({?TS_TAB, I}) || I <- Timestamps], + mnesia:delete({?META_TAB, DB}) + end + ). + +-spec update_db_config(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) -> + emqx_ds_builtin_local:db_opts(). +update_db_config(DB, Opts) -> + transaction( + fun() -> + mnesia:write(#?META_TAB{db = DB, db_props = Opts}), + Opts + end + ). + +-spec n_shards(emqx_ds:db()) -> pos_integer(). +n_shards(DB) -> + #{n_shards := NShards} = db_config(DB), + NShards. + +-spec shards(emqx_ds:db()) -> [emqx_ds_builtin_local:shard()]. +shards(DB) -> + NShards = n_shards(DB), + [integer_to_binary(Shard) || Shard <- lists:seq(0, NShards - 1)]. + +-spec db_config(emqx_ds:db()) -> emqx_ds_builtin_local:db_opts(). +db_config(DB) -> + case mnesia:dirty_read(?META_TAB, DB) of + [#?META_TAB{db_props = Props}] -> + Props; + [] -> + error({no_such_db, DB}) + end. + +-spec set_current_timestamp(emqx_ds_storage_layer:shard_id(), emqx_ds:time()) -> ok. +set_current_timestamp(ShardId, Time) -> + mria:dirty_write(?TS_TAB, #?TS_TAB{id = ShardId, latest = Time}). + +-spec current_timestamp(emqx_ds_storage_layer:shard_id()) -> emqx_ds:time() | undefined. +current_timestamp(ShardId) -> + case mnesia:dirty_read(?TS_TAB, ShardId) of + [#?TS_TAB{latest = Latest}] -> + Latest; + [] -> + undefined + end. + +-spec ensure_monotonic_timestamp(emqx_ds_storage_layer:shard_id()) -> emqx_ds:time(). +ensure_monotonic_timestamp(ShardId) -> + mria:dirty_update_counter({?TS_TAB, ShardId}, 1). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +-record(s, {}). +-define(timer_update, timer_update). + +init([]) -> + process_flag(trap_exit, true), + ensure_tables(), + S = #s{}, + {ok, S}. + +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ensure_tables() -> + ok = mria:create_table(?META_TAB, [ + {local_content, true}, + {type, ordered_set}, + {storage, disc_copies}, + {record_name, ?META_TAB}, + {attributes, record_info(fields, ?META_TAB)} + ]), + ok = mria:create_table(?TS_TAB, [ + {local_content, true}, + {type, set}, + {storage, disc_copies}, + {record_name, ?TS_TAB}, + {attributes, record_info(fields, ?TS_TAB)} + ]). + +transaction(Fun) -> + case mria:transaction(mria:local_content_shard(), Fun) of + {atomic, Result} -> + Result; + {aborted, Reason} -> + {error, Reason} + end. diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_sup.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_sup.erl new file mode 100644 index 000000000..5994588ec --- /dev/null +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_sup.erl @@ -0,0 +1,133 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This supervisor manages the global worker processes needed for +%% the functioning of builtin local databases, and all builtin local +%% databases that attach to it. +-module(emqx_ds_builtin_local_sup). + +-behaviour(supervisor). + +%% API: +-export([start_db/2, stop_db/1]). + +%% behavior callbacks: +-export([init/1]). + +%% internal exports: +-export([start_top/0, start_databases_sup/0]). + +-export_type([]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(top, ?MODULE). +-define(databases, emqx_ds_builtin_local_db_sup). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec start_db(emqx_ds:db(), emqx_ds_builtin_local:db_opts()) -> + supervisor:startchild_ret(). +start_db(DB, Opts) -> + ensure_top(), + ChildSpec = #{ + id => DB, + start => {?databases, start_db, [DB, Opts]}, + type => supervisor, + shutdown => infinity + }, + supervisor:start_child(?databases, ChildSpec). + +-spec stop_db(emqx_ds:db()) -> ok. +stop_db(DB) -> + case whereis(?databases) of + Pid when is_pid(Pid) -> + _ = supervisor:terminate_child(?databases, DB), + _ = supervisor:delete_child(?databases, DB), + ok; + undefined -> + ok + end. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%% There are two layers of supervision: +%% +%% 1. top supervisor for the builtin backend. It contains the global +%% worker processes (like the metadata server), and `?databases' +%% supervisior. +%% +%% 2. `?databases': a `one_for_one' supervisor where each child is a +%% `db' supervisor that contains processes that represent the DB. +%% Chidren are attached dynamically to this one. +init(?top) -> + %% Children: + MetricsWorker = emqx_ds_builtin_metrics:child_spec(), + MetadataServer = #{ + id => metadata_server, + start => {emqx_ds_builtin_local_meta, start_link, []}, + restart => permanent, + type => worker, + shutdown => 5000 + }, + DBsSup = #{ + id => ?databases, + start => {?MODULE, start_databases_sup, []}, + restart => permanent, + type => supervisor, + shutdown => infinity + }, + %% + SupFlags = #{ + strategy => one_for_all, + intensity => 1, + period => 1, + auto_shutdown => never + }, + {ok, {SupFlags, [MetricsWorker, MetadataServer, DBsSup]}}; +init(?databases) -> + %% Children are added dynamically: + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 1 + }, + {ok, {SupFlags, []}}. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-spec start_top() -> {ok, pid()}. +start_top() -> + supervisor:start_link({local, ?top}, ?MODULE, ?top). + +start_databases_sup() -> + supervisor:start_link({local, ?databases}, ?MODULE, ?databases). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ensure_top() -> + {ok, _} = emqx_ds_sup:attach_backend(builtin_local, {?MODULE, start_top, []}), + ok. diff --git a/apps/emqx_ds_builtin_local/test/emqx_ds_builtin_local_SUITE.erl b/apps/emqx_ds_builtin_local/test/emqx_ds_builtin_local_SUITE.erl new file mode 100644 index 000000000..67db21b4b --- /dev/null +++ b/apps/emqx_ds_builtin_local/test/emqx_ds_builtin_local_SUITE.erl @@ -0,0 +1,346 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_builtin_local_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(N_SHARDS, 1). + +opts(_Config) -> + #{ + backend => builtin_local, + storage => {emqx_ds_storage_reference, #{}}, + n_shards => ?N_SHARDS + }. + +t_drop_generation_with_never_used_iterator(Config) -> + %% This test checks how the iterator behaves when: + %% 1) it's created at generation 1 and not consumed from. + %% 2) generation 2 is created and 1 dropped. + %% 3) iteration begins. + %% In this case, the iterator won't see any messages and the stream will end. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + Now = emqx_message:timestamp_now(), + Msgs1 = [ + message(<<"foo/bar">>, <<"3">>, Now + 100), + message(<<"foo/baz">>, <<"4">>, Now + 101) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), + + ?assertError( + {error, unrecoverable, generation_not_found}, + emqx_ds_test_helpers:consume_iter(DB, Iter0) + ), + + %% New iterator for the new stream will only see the later messages. + [{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + ?assertNotEqual(Stream0, Stream1), + {ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime), + + {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}), + ?assertNotEqual(end_of_stream, Iter), + ?assertEqual(Msgs1, Batch), + + ok. + +t_drop_generation_with_used_once_iterator(Config) -> + %% This test checks how the iterator behaves when: + %% 1) it's created at generation 1 and consumes at least 1 message. + %% 2) generation 2 is created and 1 dropped. + %% 3) iteration continues. + %% In this case, the iterator should see no more messages and the stream will end. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = + [Msg0 | _] = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime), + {ok, Iter1, Batch1} = emqx_ds:next(DB, Iter0, 1), + ?assertNotEqual(end_of_stream, Iter1), + ?assertEqual([Msg0], [Msg || {_Key, Msg} <- Batch1]), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + Now = emqx_message:timestamp_now(), + Msgs1 = [ + message(<<"foo/bar">>, <<"3">>, Now + 100), + message(<<"foo/baz">>, <<"4">>, Now + 101) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), + + ?assertError( + {error, unrecoverable, generation_not_found}, + emqx_ds_test_helpers:consume_iter(DB, Iter1) + ). + +t_drop_generation_update_iterator(Config) -> + %% This checks the behavior of `emqx_ds:update_iterator' after the generation + %% underlying the iterator has been dropped. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime), + {ok, Iter1, _Batch1} = emqx_ds:next(DB, Iter0, 1), + {ok, _Iter2, [{Key2, _Msg}]} = emqx_ds:next(DB, Iter1, 1), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + ?assertEqual( + {error, unrecoverable, generation_not_found}, + emqx_ds:update_iterator(DB, Iter1, Key2) + ). + +t_make_iterator_stale_stream(Config) -> + %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying + %% the stream has been dropped. + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + Msgs0 = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + + [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:drop_generation(DB, GenId0), + + ?assertEqual( + {error, unrecoverable, generation_not_found}, + emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime) + ), + + ok. + +t_get_streams_concurrently_with_drop_generation(Config) -> + %% This checks that we can get all streams while a generation is dropped + %% mid-iteration. + + DB = ?FUNCTION_NAME, + ?check_trace( + #{timetrap => 5_000}, + begin + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + + [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), + ok = emqx_ds:add_generation(DB), + ok = emqx_ds:add_generation(DB), + + %% All streams + TopicFilter = emqx_topic:words(<<"foo/+">>), + StartTime = 0, + ?assertMatch([_, _, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)), + + ?force_ordering( + #{?snk_kind := dropped_gen}, + #{?snk_kind := get_streams_get_gen} + ), + + spawn_link(fun() -> + {ok, _} = ?block_until(#{?snk_kind := get_streams_all_gens}), + ok = emqx_ds:drop_generation(DB, GenId0), + ?tp(dropped_gen, #{}) + end), + + ?assertMatch([_, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)), + + ok + end, + [] + ). + +%% This testcase verifies the behavior of `store_batch' operation +%% when the underlying code experiences recoverable or unrecoverable +%% problems. +t_store_batch_fail(Config) -> + ?check_trace( + #{timetrap => 15_000}, + try + meck:new(emqx_ds_storage_layer, [passthrough, no_history]), + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, (opts(Config))#{n_shards => 2})), + %% Success: + Batch1 = [ + message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), + %% Inject unrecoverable error: + meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) -> + {error, unrecoverable, mock} + end), + Batch2 = [ + message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1) + ], + ?assertMatch( + {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) + ), + %% Inject a recoveralbe error: + meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) -> + {error, recoverable, mock} + end), + Batch3 = [ + message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), + message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), + message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), + message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) + ], + %% Note: due to idempotency issues the number of retries + %% is currently set to 0: + ?assertMatch( + {error, recoverable, mock}, + emqx_ds:store_batch(DB, Batch3, #{sync => true}) + ), + meck:unload(emqx_ds_storage_layer), + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), + lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) + after + meck:unload() + end, + [ + {"message ordering", fun(StoredMessages, _Trace) -> + [{_, MessagesFromStream1}, {_, MessagesFromStream2}] = StoredMessages, + emqx_ds_test_helpers:diff_messages( + [payload], + [ + #message{payload = <<"1">>}, + #message{payload = <<"2">>}, + #message{payload = <<"5">>}, + #message{payload = <<"7">>} + ], + MessagesFromStream1 + ), + emqx_ds_test_helpers:diff_messages( + [payload], + [ + #message{payload = <<"6">>}, + #message{payload = <<"8">>} + ], + MessagesFromStream2 + ) + end} + ] + ). + +message(ClientId, Topic, Payload, PublishedAt) -> + Msg = message(Topic, Payload, PublishedAt), + Msg#message{from = ClientId}. + +message(Topic, Payload, PublishedAt) -> + #message{ + topic = Topic, + payload = Payload, + timestamp = PublishedAt, + id = emqx_guid:gen() + }. + +delete(DB, It, Selector, BatchSize) -> + delete(DB, It, Selector, BatchSize, 0). + +delete(DB, It0, Selector, BatchSize, Acc) -> + case emqx_ds:delete_next(DB, It0, Selector, BatchSize) of + {ok, It, 0} -> + {ok, It, Acc}; + {ok, It, NumDeleted} -> + delete(DB, It, BatchSize, Selector, Acc + NumDeleted); + {ok, end_of_stream} -> + {ok, end_of_stream, Acc}; + Ret -> + Ret + end. + +%% CT callbacks + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), + Apps = emqx_cth_suite:start( + [mria, emqx_ds_builtin_local], + #{work_dir => ?config(priv_dir, Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +init_per_testcase(_TC, Config) -> + application:ensure_all_started(emqx_durable_storage), + Config. + +end_per_testcase(_TC, _Config) -> + snabbkaffe:stop(), + ok = application:stop(emqx_durable_storage), + mria:stop(), + _ = mnesia:delete_schema([node()]), + ok. diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl new file mode 100644 index 000000000..f6f6c6241 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -0,0 +1,423 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Buffer servers are responsible for collecting batches from the +%% local processes, sharding and repackaging them. +-module(emqx_ds_buffer). + +-behaviour(gen_server). + +%% API: +-export([start_link/4, store_batch/3]). + +%% behavior callbacks: +-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +%% internal exports: +-export([]). + +-export_type([]). + +-include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). +-define(flush, flush). + +-define(cbm(DB), {?MODULE, DB}). + +-record(enqueue_req, { + messages :: [emqx_types:message()], + sync :: boolean(), + atomic :: boolean(), + n_messages :: non_neg_integer(), + payload_bytes :: non_neg_integer() +}). + +-callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}. + +-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) -> + {State, ok | {error, recoverable | unrecoverable, _}}. + +-callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) -> + _Shard. + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec start_link(module(), _CallbackOptions, emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, pid()}. +start_link(CallbackModule, CallbackOptions, DB, Shard) -> + gen_server:start_link( + ?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], [] + ). + +-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + emqx_ds:store_batch_result(). +store_batch(DB, Messages, Opts) -> + Sync = maps:get(sync, Opts, true), + Atomic = maps:get(atomic, Opts, false), + %% Usually we expect all messages in the batch to go into the + %% single shard, so this function is optimized for the happy case. + case shards_of_batch(DB, Messages) of + [{Shard, {NMsgs, NBytes}}] -> + %% Happy case: + enqueue_call_or_cast( + ?via(DB, Shard), + #enqueue_req{ + messages = Messages, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + } + ); + [_, _ | _] when Atomic -> + %% It's impossible to commit a batch to multiple shards + %% atomically + {error, unrecoverable, atomic_commit_to_multiple_shards}; + _Shards -> + %% Use a slower implementation for the unlikely case: + repackage_messages(DB, Messages, Sync) + end. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +-record(s, { + callback_module :: module(), + callback_state :: term(), + db :: emqx_ds:db(), + shard :: emqx_ds_replication_layer:shard_id(), + metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), + n_retries = 0 :: non_neg_integer(), + %% FIXME: Currently max_retries is always 0, because replication + %% layer doesn't guarantee idempotency. Retrying would create + %% duplicate messages. + max_retries = 0 :: non_neg_integer(), + n = 0 :: non_neg_integer(), + n_bytes = 0 :: non_neg_integer(), + tref :: undefined | reference(), + queue :: queue:queue(emqx_types:message()), + pending_replies = [] :: [gen_server:from()] +}). + +init([CBM, CBMOptions, DB, Shard]) -> + process_flag(trap_exit, true), + process_flag(message_queue_data, off_heap), + logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}), + MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), + ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), + {ok, CallbackS} = CBM:init_buffer(DB, Shard, CBMOptions), + S = #s{ + callback_module = CBM, + callback_state = CallbackS, + db = DB, + shard = Shard, + metrics_id = MetricsId, + queue = queue:new() + }, + persistent_term:put(?cbm(DB), {CBM, CBMOptions}), + {ok, S}. + +format_status(Status) -> + maps:map( + fun + (state, #s{db = DB, shard = Shard, queue = Q}) -> + #{ + db => DB, + shard => Shard, + queue => queue:len(Q) + }; + (_, Val) -> + Val + end, + Status + ). + +handle_call( + #enqueue_req{ + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + From, + S0 = #s{pending_replies = Replies0} +) -> + S = S0#s{pending_replies = [From | Replies0]}, + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast( + #enqueue_req{ + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + S +) -> + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(?flush, S) -> + {noreply, flush(S)}; +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #s{db = DB}) -> + persistent_term:erase(?cbm(DB)), + ok. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ + +enqueue( + Sync, + Atomic, + Msgs, + BatchSize, + BatchBytes, + S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} +) -> + %% At this point we don't split the batches, even when they aren't + %% atomic. It wouldn't win us anything in terms of memory, and + %% EMQX currently feeds data to DS in very small batches, so + %% granularity should be fine enough. + NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), + NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), + NMsgs = NMsgs0 + BatchSize, + NBytes = NBytes0 + BatchBytes, + case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of + true -> + %% Adding this batch would cause buffer to overflow. Flush + %% it now, and retry: + S1 = flush(S0), + enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + false -> + %% The buffer is empty, we enqueue the atomic batch in its + %% entirety: + Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), + S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, + case NMsgs >= NMax orelse NBytes >= NBytesMax of + true -> + flush(S1); + false -> + ensure_timer(S1) + end + end. + +shard_of_message(DB, Message, ShardBy) -> + {CBM, Options} = persistent_term:get(?cbm(DB)), + CBM:shard_of_message(DB, Message, ShardBy, Options). + +-define(COOLDOWN_MIN, 1000). +-define(COOLDOWN_MAX, 5000). + +flush(S) -> + do_flush(cancel_timer(S)). + +do_flush(S0 = #s{n = 0}) -> + S0; +do_flush( + S0 = #s{ + callback_module = CBM, + callback_state = CallbackS0, + queue = Q, + pending_replies = Replies, + db = DB, + shard = Shard, + metrics_id = Metrics, + n_retries = Retries, + max_retries = MaxRetries + } +) -> + Messages = queue:to_list(Q), + T0 = erlang:monotonic_time(microsecond), + {CallbackS, Result} = CBM:flush_buffer(DB, Shard, Messages, CallbackS0), + S = S0#s{callback_state = CallbackS}, + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0), + case Result of + ok -> + emqx_ds_builtin_metrics:inc_egress_batches(Metrics), + emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), + emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), + ?tp( + emqx_ds_replication_layer_egress_flush, + #{db => DB, shard => Shard, batch => Messages} + ), + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + erlang:garbage_collect(), + S#s{ + callback_state = CallbackS, + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [] + }; + {timeout, ServerId} when Retries < MaxRetries -> + %% Note: this is a hot loop, so we report error messages + %% with `debug' level to avoid wiping the logs. Instead, + %% error the detection must rely on the metrics. Debug + %% logging can be enabled for the particular egress server + %% via logger domain. + ?tp( + debug, + emqx_ds_replication_layer_egress_flush_retry, + #{db => DB, shard => Shard, reason => timeout, server_id => ServerId} + ), + %% Retry sending the batch: + emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), + erlang:garbage_collect(), + %% We block the gen_server until the next retry. + BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + timer:sleep(BlockTime), + S#s{n_retries = Retries + 1}; + Err -> + ?tp( + debug, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, error => Err} + ), + emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), + Reply = + case Err of + {error, _, _} -> Err; + {timeout, ServerId} -> {error, recoverable, {timeout, ServerId}}; + _ -> {error, unrecoverable, Err} + end, + lists:foreach( + fun(From) -> gen_server:reply(From, Reply) end, Replies + ), + erlang:garbage_collect(), + S#s{ + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [], + n_retries = 0 + } + end. + +-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> + [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}] +when + NMessages :: non_neg_integer(), + NBytes :: non_neg_integer(). +shards_of_batch(DB, Messages) -> + maps:to_list( + lists:foldl( + fun(Message, Acc) -> + %% TODO: sharding strategy must be part of the DS DB schema: + Shard = shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S}) -> + {N + 1, S + Size} + end, + {1, Size}, + Acc + ) + end, + #{}, + Messages + ) + ). + +repackage_messages(DB, Messages, Sync) -> + Batches = lists:foldl( + fun(Message, Acc) -> + Shard = shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S, Msgs}) -> + {N + 1, S + Size, [Message | Msgs]} + end, + {1, Size, [Message]}, + Acc + ) + end, + #{}, + Messages + ), + maps:fold( + fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> + Err = enqueue_call_or_cast( + ?via(DB, Shard), + #enqueue_req{ + messages = lists:reverse(RevMessages), + sync = Sync, + atomic = false, + n_messages = NMsgs, + payload_bytes = ByteSize + } + ), + compose_errors(ErrAcc, Err) + end, + ok, + Batches + ). + +enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) -> + gen_server:call(To, Req, infinity); +enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) -> + gen_server:cast(To, Req). + +compose_errors(ErrAcc, ok) -> + ErrAcc; +compose_errors(ok, Err) -> + Err; +compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) -> + {error, unrecoverable, Err}; +compose_errors(ErrAcc, _Err) -> + ErrAcc. + +ensure_timer(S = #s{tref = undefined}) -> + Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), + Tref = erlang:send_after(Interval, self(), ?flush), + S#s{tref = Tref}; +ensure_timer(S) -> + S. + +cancel_timer(S = #s{tref = undefined}) -> + S; +cancel_timer(S = #s{tref = TRef}) -> + _ = erlang:cancel_timer(TRef), + S#s{tref = undefined}. + +%% @doc Return approximate size of the MQTT message (it doesn't take +%% all things into account, for example headers and extras) +payload_size(#message{payload = P, topic = T}) -> + size(P) + size(T). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 004096431..bf820e0bf 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -26,16 +26,13 @@ -define(SHARD, shard(?FUNCTION_NAME)). -define(DEFAULT_CONFIG, #{ - backend => builtin, + backend => builtin_local, storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 1, - n_sites => 1, - replication_factor => 1, - replication_options => #{} + n_shards => 1 }). -define(COMPACT_CONFIG, #{ - backend => builtin, + backend => builtin_local, storage => {emqx_ds_storage_bitfield_lts, #{ bits_per_wildcard_level => 8 @@ -138,8 +135,8 @@ t_get_streams(_Config) -> [FooBarBaz] = GetStream(<<"foo/bar/baz">>), [A] = GetStream(<<"a">>), %% Restart shard to make sure trie is persisted and restored: - ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME), - {ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}), + ok = emqx_ds:close_db(?FUNCTION_NAME), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), %% Verify that there are no "ghost streams" for topics that don't %% have any messages: [] = GetStream(<<"bar/foo">>), @@ -241,8 +238,8 @@ t_replay(_Config) -> ?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)), ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)), %% Restart the DB to make sure trie is persisted and restored: - ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME), - {ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}), + ok = emqx_ds:close_db(?FUNCTION_NAME), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), %% Learned wildcard topics: ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])), ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)), @@ -512,7 +509,7 @@ suite() -> [{timetrap, {seconds, 20}}]. init_per_suite(Config) -> emqx_common_test_helpers:clear_screen(), Apps = emqx_cth_suite:start( - [emqx_durable_storage], + [emqx_ds_builtin_local], #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{apps, Apps} | Config].