diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 78a8b76e0..2ee2b4ac5 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -23,6 +23,7 @@ %% `git_subdir` dependency in other projects. {deps, [ {emqx_utils, {path, "../emqx_utils"}}, + {emqx_durable_storage, {path, "../emqx_durable_storage"}}, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 928539f46..47f1ae4b4 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -16,7 +16,8 @@ sasl, os_mon, lc, - hocon + hocon, + emqx_durable_storage ]}, {mod, {emqx_app, []}}, {env, []}, diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index cb72986e7..59a397836 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -39,6 +39,7 @@ start(_Type, _Args) -> ok = maybe_load_config(), ok = emqx_persistent_session:init_db_backend(), + _ = emqx_persistent_session_ds:init(), ok = maybe_start_quicer(), ok = emqx_bpapi:start(), ok = emqx_alarm_handler:load(), diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index fef93768b..859f6fc91 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -225,6 +225,7 @@ publish(Msg) when is_record(Msg, message) -> []; Msg1 = #message{topic = Topic} -> emqx_persistent_session:persist_message(Msg1), + _ = emqx_persistent_session_ds:persist_message(Msg1), route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl new file mode 100644 index 000000000..27b4f0950 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -0,0 +1,86 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds). + +-export([init/0]). + +-export([persist_message/1]). + +-export([ + serialize_message/1, + deserialize_message/1 +]). + +%% FIXME +-define(DS_SHARD, <<"local">>). + +-define(WHEN_ENABLED(DO), + case is_store_enabled() of + true -> DO; + false -> {skipped, disabled} + end +). + +%% + +init() -> + ?WHEN_ENABLED( + ok = emqx_ds:ensure_shard(?DS_SHARD, #{ + dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) + }) + ). + +%% + +-spec persist_message(emqx_types:message()) -> + ok | {skipped, _Reason} | {error, _TODO}. +persist_message(Msg) -> + ?WHEN_ENABLED( + case needs_persistence(Msg) andalso find_subscribers(Msg) of + [_ | _] -> + store_message(Msg); + % [] -> + % {skipped, no_subscribers}; + false -> + {skipped, needs_no_persistence} + end + ). + +needs_persistence(Msg) -> + not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). + +store_message(Msg) -> + ID = emqx_message:id(Msg), + Timestamp = emqx_guid:timestamp(ID), + Topic = emqx_topic:words(emqx_message:topic(Msg)), + emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)). + +find_subscribers(_Msg) -> + [node()]. + +%% + +serialize_message(Msg) -> + term_to_binary(emqx_message:to_map(Msg)). + +deserialize_message(Bin) -> + emqx_message:from_map(binary_to_term(Bin)). + +%% + +is_store_enabled() -> + emqx_config:get([persistent_session_store, ds]). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ce1af66af..fb413405c 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -319,6 +319,14 @@ fields("persistent_session_store") -> desc => ?DESC(persistent_session_store_enabled) } )}, + {"ds", + sc( + boolean(), + #{ + default => false, + importance => ?IMPORTANCE_HIDDEN + } + )}, {"on_disc", sc( boolean(), diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl new file mode 100644 index 000000000..b818e3fec --- /dev/null +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -0,0 +1,116 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_messages_SUITE). + +-include_lib("stdlib/include/assert.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(NOW, + (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}])) +). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(emqx_durable_storage), + ok = emqx_common_test_helpers:start_apps([], fun + (emqx) -> + emqx_common_test_helpers:boot_modules(all), + emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>), + emqx_app:set_config_loader(?MODULE); + (_) -> + ok + end), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([]), + application:stop(emqx_durable_storage), + ok. + +t_messages_persisted(_Config) -> + C1 = connect(<>, true, 30), + C2 = connect(<>, false, 60), + C3 = connect(<>, false, undefined), + C4 = connect(<>, false, 0), + + CP = connect(<>, true, undefined), + + {ok, _, [1]} = emqtt:subscribe(C1, <<"client/+/topic">>, qos1), + {ok, _, [0]} = emqtt:subscribe(C2, <<"client/+/topic">>, qos0), + {ok, _, [1]} = emqtt:subscribe(C2, <<"random/+">>, qos1), + {ok, _, [2]} = emqtt:subscribe(C3, <<"client/#">>, qos2), + {ok, _, [0]} = emqtt:subscribe(C4, <<"random/#">>, qos0), + + Messages = [ + M1 = {<<"client/1/topic">>, <<"1">>}, + M2 = {<<"client/2/topic">>, <<"2">>}, + M3 = {<<"client/3/topic/sub">>, <<"3">>}, + M4 = {<<"client/4">>, <<"4">>}, + M5 = {<<"random/5">>, <<"5">>}, + M6 = {<<"random/6/topic">>, <<"6">>}, + M7 = {<<"client/7/topic">>, <<"7">>}, + M8 = {<<"client/8/topic/sub">>, <<"8">>}, + M9 = {<<"random/9">>, <<"9">>}, + M10 = {<<"random/10">>, <<"10">>} + ], + + Results = [emqtt:publish(CP, Topic, Payload, 1) || {Topic, Payload} <- Messages], + + ct:pal("Results = ~p", [Results]), + + Persisted = consume(<<"local">>, {['#'], 0}), + + ct:pal("Persisted = ~p", [Persisted]), + + ?assertEqual( + % [M1, M2, M5, M7, M9, M10], + [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10], + [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted] + ), + + ok. + +%% + +connect(ClientId, CleanStart, EI) -> + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {clean_start, CleanStart}, + {properties, + maps:from_list( + [{'Session-Expiry-Interval', EI} || is_integer(EI)] + )} + ]), + {ok, _} = emqtt:connect(Client), + Client. + +consume(Shard, Replay) -> + {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay), + consume(It). + +consume(It) -> + case emqx_ds_storage_layer:next(It) of + {value, Msg, NIt} -> + [emqx_persistent_session_ds:deserialize_message(Msg) | consume(NIt)]; + none -> + [] + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 230ca3f9f..9eccf8c16 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -16,6 +16,7 @@ -module(emqx_ds). %% API: +-export([ensure_shard/2]). %% Messages: -export([message_store/2, message_store/1, message_stats/0]). %% Iterator: @@ -79,6 +80,18 @@ %% API funcions %%================================================================================ +-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> + ok | {error, _Reason}. +ensure_shard(Shard, Options) -> + case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of + {ok, _Pid} -> + ok; + {error, {already_started, _Pid}} -> + ok; + {error, Reason} -> + {error, Reason} + end. + %%-------------------------------------------------------------------------------- %% Message %%-------------------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 5bb0423d5..57608e5cb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -175,7 +175,7 @@ cf :: rocksdb:cf_handle(), keymapper :: keymapper(), write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(), - read_options = [] :: emqx_ds_storage_layer:db_write_options() + read_options = [] :: emqx_ds_storage_layer:db_read_options() }). -record(it, { diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 43a399a1b..017423b02 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -6,7 +6,7 @@ -behaviour(gen_server). %% API: --export([start_link/1]). +-export([start_link/2]). -export([create_generation/3]). -export([store/5]). @@ -18,7 +18,8 @@ %% behaviour callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). --export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]). +-export_type([cf_refs/0, gen_id/0, options/0, state/0, iterator/0]). +-export_type([db_options/0, db_write_options/0, db_read_options/0]). -compile({inline, [meta_lookup/2]}). @@ -26,10 +27,16 @@ %% Type declarations %%================================================================================ -%% see rocksdb:db_options() -% -type options() :: proplists:proplist(). +-type options() :: #{ + dir => file:filename() +}. +%% see rocksdb:db_options() +-type db_options() :: proplists:proplist(). +%% see rocksdb:write_options() -type db_write_options() :: proplists:proplist(). +%% see rocksdb:read_options() +-type db_read_options() :: proplists:proplist(). -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. @@ -110,18 +117,16 @@ %% API funcions %%================================================================================ --spec start_link(emqx_ds:shard()) -> {ok, pid()}. -start_link(Shard) -> - gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []). +-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. +start_link(Shard, Options) -> + gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> {ok, gen_id()} | {error, nonmonotonic}. create_generation(Shard, Since, Config = {_Module, _Options}) -> gen_server:call(?REF(Shard), {create_generation, Since, Config}). --spec store( - emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary() -) -> +-spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), @@ -181,9 +186,9 @@ discard_iterator(Shard, ReplayID) -> %% behaviour callbacks %%================================================================================ -init([Shard]) -> +init({Shard, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Shard), + {ok, S0} = open_db(Shard, Options), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -265,16 +270,17 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}. -open_db(Shard) -> - Filename = binary_to_list(Shard), +-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. +open_db(Shard, Options) -> + DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} | emqx_ds_conf:db_options() ], + _ = filelib:ensure_dir(DBDir), ExistingCFs = - case rocksdb:list_column_families(Filename, DBOptions) of + case rocksdb:list_column_families(DBDir, DBOptions) of {ok, CFs} -> [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; % DB is not present. First start @@ -286,7 +292,7 @@ open_db(Shard) -> {?ITERATOR_CF, ?ITERATOR_CF_OPTS} | ExistingCFs ], - case rocksdb:open(Filename, DBOptions, ColumnFamilies) of + case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> {CFNames, _} = lists:unzip(ExistingCFs), {ok, #s{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index ed745df5f..56c8c760a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -6,7 +6,7 @@ -behaviour(supervisor). %% API: --export([start_link/0, start_shard/1, stop_shard/1]). +-export([start_link/0, start_shard/2, stop_shard/1]). %% behaviour callbacks: -export([init/1]). @@ -25,9 +25,10 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret(). -start_shard(Shard) -> - supervisor:start_child(?SUP, shard_child_spec(Shard)). +-spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> + supervisor:startchild_ret(). +start_shard(Shard, Options) -> + supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> @@ -51,11 +52,12 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec(). -shard_child_spec(Shard) -> +-spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> + supervisor:child_spec(). +shard_child_spec(Shard, Options) -> #{ id => Shard, - start => {emqx_ds_storage_layer, start_link, [Shard]}, + start => {emqx_ds_storage_layer, start_link, [Shard, Options]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index 7ea036536..944477306 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,7 +2,7 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc, mria]}, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index 46a1436bb..c5c227333 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -33,7 +33,7 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD). + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). %% Smoke test of store function t_store(_Config) -> @@ -263,7 +263,7 @@ end_per_suite(_Config) -> init_per_testcase(TC, Config) -> ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}), Config. end_per_testcase(TC, _Config) ->