diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 98a2d36fa..6ff0de648 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -24,6 +24,8 @@ {deps, [ {emqx_utils, {path, "../emqx_utils"}}, {emqx_durable_storage, {path, "../emqx_durable_storage"}}, + {emqx_ds_builtin_local, {path, "../emqx_ds_builtin_local"}}, + {emqx_ds_backends, {path, "../emqx_ds_backends"}}, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, diff --git a/apps/emqx_ds_backends/README.md b/apps/emqx_ds_backends/README.md new file mode 100644 index 000000000..02986e0e1 --- /dev/null +++ b/apps/emqx_ds_backends/README.md @@ -0,0 +1,32 @@ +# EMQX Durable Storage Backends + +This is a placeholder OTP application that depends on all durable storage backends available in the release. +Starting it will ensure that all backends are properly loaded and registered. + +Consumers of `emqx_durable_storage` API should depend on this application instead of the parent `emqx_durable_storage`. + +# Features + +N/A + +# Limitation + +N/A + +# Documentation links + +N/A + +# Usage + +Any business application that creates DS databases should add this application as a dependency. + +# Configurations + +None + +# Other +N/A + +# Contributing +Please see our [contributing.md](../../CONTRIBUTING.md). diff --git a/apps/emqx_ds_backends/rebar.config b/apps/emqx_ds_backends/rebar.config new file mode 100644 index 000000000..7af4ea8e3 --- /dev/null +++ b/apps/emqx_ds_backends/rebar.config @@ -0,0 +1,5 @@ +%% -*- mode:erlang -*- +{deps, [ + {emqx_utils, {path, "../emqx_utils"}}, + {emqx_durable_storage, {path, "../emqx_durable_storage"}} +]}. diff --git a/apps/emqx_ds_backends/src/emqx_ds_backends.app.src b/apps/emqx_ds_backends/src/emqx_ds_backends.app.src new file mode 100644 index 000000000..5215124e4 --- /dev/null +++ b/apps/emqx_ds_backends/src/emqx_ds_backends.app.src @@ -0,0 +1,11 @@ +%% -*- mode: erlang -*- +{application, emqx_ds_backends, [ + {description, "A placeholder application that depends on all available DS backends"}, + % strict semver, bump manually! + {vsn, "0.1.0"}, + {modules, []}, + {registered, []}, + {applications, [kernel, stdlib, emqx_durable_storage, emqx_ds_builtin_local]}, + {optional_applications, [emqx_ds_builtin_raft]}, + {env, []} +]}. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl similarity index 60% rename from apps/emqx_durable_storage/test/emqx_ds_SUITE.erl rename to apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl index eb14456cb..11ea1417f 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_ds_SUITE). +-module(emqx_ds_backends_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -26,52 +26,27 @@ -define(N_SHARDS, 1). -opts() -> - #{ - backend => builtin, - storage => {emqx_ds_storage_reference, #{}}, - n_shards => ?N_SHARDS, - n_sites => 1, - replication_factor => 3, - replication_options => #{} - }. +opts(Config) -> + proplists:get_value(ds_conf, Config). %% A simple smoke test that verifies that opening/closing the DB %% doesn't crash, and not much else -t_00_smoke_open_drop(_Config) -> +t_00_smoke_open_drop(Config) -> DB = 'DB', - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), - %% Check metadata: - %% We have only one site: - [Site] = emqx_ds_replication_layer_meta:sites(), - %% Check all shards: - Shards = emqx_ds_replication_layer_meta:shards(DB), - %% Since there is only one site all shards should be allocated - %% to this site: - MyShards = emqx_ds_replication_layer_meta:my_shards(DB), - ?assertEqual(?N_SHARDS, length(Shards)), - lists:foreach( - fun(Shard) -> - ?assertEqual( - [Site], emqx_ds_replication_layer_meta:replica_set(DB, Shard) - ) - end, - Shards - ), - ?assertEqual(lists:sort(Shards), lists:sort(MyShards)), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), %% Reopen the DB and make sure the operation is idempotent: - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), %% Close the DB: ?assertMatch(ok, emqx_ds:drop_db(DB)). %% A simple smoke test that verifies that storing the messages doesn't %% crash -t_01_smoke_store(_Config) -> +t_01_smoke_store(Config) -> ?check_trace( #{timetrap => 10_000}, begin DB = default, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), Msg = message(<<"foo/bar">>, <<"foo">>, 0), ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])) end, @@ -80,9 +55,9 @@ t_01_smoke_store(_Config) -> %% A simple smoke test that verifies that getting the list of streams %% doesn't crash and that iterators can be opened. -t_02_smoke_get_streams_start_iter(_Config) -> +t_02_smoke_get_streams_start_iter(Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), StartTime = 0, TopicFilter = ['#'], [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), @@ -91,9 +66,9 @@ t_02_smoke_get_streams_start_iter(_Config) -> %% A simple smoke test that verifies that it's possible to iterate %% over messages. -t_03_smoke_iterate(_Config) -> +t_03_smoke_iterate(Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), StartTime = 0, TopicFilter = ['#'], Msgs = [ @@ -101,7 +76,7 @@ t_03_smoke_iterate(_Config) -> message(<<"foo">>, <<"2">>, 1), message(<<"bar/bar">>, <<"3">>, 2) ], - ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs, #{sync => true})), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0), @@ -112,9 +87,9 @@ t_03_smoke_iterate(_Config) -> %% to the external resources, such as clients' sessions, and they %% should always be able to continue replaying the topics from where %% they are left off. -t_04_restart(_Config) -> +t_04_restart(Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), TopicFilter = ['#'], StartTime = 0, Msgs = [ @@ -122,22 +97,22 @@ t_04_restart(_Config) -> message(<<"foo">>, <<"2">>, 1), message(<<"bar/bar">>, <<"3">>, 2) ], - ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs, #{sync => true})), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), %% Restart the application: ?tp(warning, emqx_ds_SUITE_restart_app, #{}), ok = application:stop(emqx_durable_storage), {ok, _} = application:ensure_all_started(emqx_durable_storage), - ok = emqx_ds:open_db(DB, opts()), + ok = emqx_ds:open_db(DB, opts(Config)), %% The old iterator should be still operational: {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0), ?assertEqual(Msgs, Batch, {Iter0, Iter}). %% Check that we can create iterators directly from DS keys. -t_05_update_iterator(_Config) -> +t_05_update_iterator(Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), TopicFilter = ['#'], StartTime = 0, Msgs = [ @@ -158,104 +133,42 @@ t_05_update_iterator(_Config) -> ?assertEqual(Msgs, [Msg0 | Batch], #{from_key => Iter1, final_iter => Iter}), ok. -t_06_update_config(_Config) -> +t_06_smoke_add_generation(Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), - TopicFilter = ['#'], + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + ?assertMatch( + [{_, _}], + maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)) + ), + ?assertMatch(ok, emqx_ds:add_generation(DB)), + ?assertMatch( + [{_, _}, {_, _}], + maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)) + ). - DataSet = update_data_set(), - - ToMsgs = fun(Datas) -> - lists:map( - fun({Topic, Payload}) -> - message(Topic, Payload, emqx_message:timestamp_now()) - end, - Datas - ) - end, - - {_, StartTimes, MsgsList} = - lists:foldl( - fun - (Datas, {true, TimeAcc, MsgAcc}) -> - Msgs = ToMsgs(Datas), - ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - {false, TimeAcc, [Msgs | MsgAcc]}; - (Datas, {Any, TimeAcc, MsgAcc}) -> - timer:sleep(500), - ?assertMatch(ok, emqx_ds:update_db_config(DB, opts())), - timer:sleep(500), - StartTime = emqx_message:timestamp_now(), - Msgs = ToMsgs(Datas), - ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]} - end, - {true, [emqx_message:timestamp_now()], []}, - DataSet - ), - - Checker = fun({StartTime, Msgs0}, Acc) -> - Msgs = Acc ++ Msgs0, - Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime), - ?assertEqual(Msgs, Batch, StartTime), - Msgs - end, - lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). - -t_07_add_generation(_Config) -> +t_07_smoke_update_config(Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), - TopicFilter = ['#'], - - DataSet = update_data_set(), - - ToMsgs = fun(Datas) -> - lists:map( - fun({Topic, Payload}) -> - message(Topic, Payload, emqx_message:timestamp_now()) - end, - Datas - ) - end, - - {_, StartTimes, MsgsList} = - lists:foldl( - fun - (Datas, {true, TimeAcc, MsgAcc}) -> - Msgs = ToMsgs(Datas), - ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - {false, TimeAcc, [Msgs | MsgAcc]}; - (Datas, {Any, TimeAcc, MsgAcc}) -> - timer:sleep(500), - ?assertMatch(ok, emqx_ds:add_generation(DB)), - timer:sleep(500), - StartTime = emqx_message:timestamp_now(), - Msgs = ToMsgs(Datas), - ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]} - end, - {true, [emqx_message:timestamp_now()], []}, - DataSet - ), - - Checker = fun({StartTime, Msgs0}, Acc) -> - Msgs = Acc ++ Msgs0, - Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime), - ?assertEqual(Msgs, Batch, StartTime), - Msgs - end, - lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + ?assertMatch( + [{_, _}], + maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)) + ), + ?assertMatch(ok, emqx_ds:update_db_config(DB, opts(Config))), + ?assertMatch( + [{_, _}, {_, _}], + maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)) + ). %% Verifies the basic usage of `list_generations_with_lifetimes' and `drop_generation'... %% 1) Cannot drop current generation. %% 2) All existing generations are returned by `list_generation_with_lifetimes'. %% 3) Dropping a generation removes it from the list. %% 4) Dropped generations stay dropped even after restarting the application. -t_08_smoke_list_drop_generation(_Config) -> +t_08_smoke_list_drop_generation(Config) -> DB = ?FUNCTION_NAME, ?check_trace( begin - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), %% Exactly one generation at first. Generations0 = emqx_ds:list_generations_with_lifetimes(DB), ?assertMatch( @@ -295,7 +208,7 @@ t_08_smoke_list_drop_generation(_Config) -> %% Should persist surviving generation list ok = application:stop(emqx_durable_storage), {ok, _} = application:ensure_all_started(emqx_durable_storage), - ok = emqx_ds:open_db(DB, opts()), + ok = emqx_ds:open_db(DB, opts(Config)), Generations3 = emqx_ds:list_generations_with_lifetimes(DB), ?assertMatch( @@ -310,12 +223,12 @@ t_08_smoke_list_drop_generation(_Config) -> ), ok. -t_09_atomic_store_batch(_Config) -> +t_09_atomic_store_batch(Config) -> DB = ?FUNCTION_NAME, ?check_trace( begin application:set_env(emqx_durable_storage, egress_batch_size, 1), - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), Msgs = [ message(<<"1">>, <<"1">>, 0), message(<<"2">>, <<"2">>, 1), @@ -335,12 +248,12 @@ t_09_atomic_store_batch(_Config) -> ), ok. -t_10_non_atomic_store_batch(_Config) -> +t_10_non_atomic_store_batch(Config) -> DB = ?FUNCTION_NAME, ?check_trace( begin application:set_env(emqx_durable_storage, egress_batch_size, 1), - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), Msgs = [ message(<<"1">>, <<"1">>, 0), message(<<"2">>, <<"2">>, 1), @@ -369,11 +282,11 @@ t_10_non_atomic_store_batch(_Config) -> ), ok. -t_smoke_delete_next(_Config) -> +t_smoke_delete_next(Config) -> DB = ?FUNCTION_NAME, ?check_trace( begin - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), StartTime = 0, TopicFilter = [<<"foo">>, '#'], Msgs = @@ -410,7 +323,7 @@ t_smoke_delete_next(_Config) -> ), ok. -t_drop_generation_with_never_used_iterator(_Config) -> +t_drop_generation_with_never_used_iterator(Config) -> %% This test checks how the iterator behaves when: %% 1) it's created at generation 1 and not consumed from. %% 2) generation 2 is created and 1 dropped. @@ -418,7 +331,7 @@ t_drop_generation_with_never_used_iterator(_Config) -> %% In this case, the iterator won't see any messages and the stream will end. DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), TopicFilter = emqx_topic:words(<<"foo/+">>), @@ -458,7 +371,7 @@ t_drop_generation_with_never_used_iterator(_Config) -> ok. -t_drop_generation_with_used_once_iterator(_Config) -> +t_drop_generation_with_used_once_iterator(Config) -> %% This test checks how the iterator behaves when: %% 1) it's created at generation 1 and consumes at least 1 message. %% 2) generation 2 is created and 1 dropped. @@ -466,7 +379,7 @@ t_drop_generation_with_used_once_iterator(_Config) -> %% In this case, the iterator should see no more messages and the stream will end. DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), TopicFilter = emqx_topic:words(<<"foo/+">>), @@ -499,12 +412,12 @@ t_drop_generation_with_used_once_iterator(_Config) -> emqx_ds_test_helpers:consume_iter(DB, Iter1) ). -t_drop_generation_update_iterator(_Config) -> +t_drop_generation_update_iterator(Config) -> %% This checks the behavior of `emqx_ds:update_iterator' after the generation %% underlying the iterator has been dropped. DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), TopicFilter = emqx_topic:words(<<"foo/+">>), @@ -528,12 +441,12 @@ t_drop_generation_update_iterator(_Config) -> emqx_ds:update_iterator(DB, Iter1, Key2) ). -t_make_iterator_stale_stream(_Config) -> +t_make_iterator_stale_stream(Config) -> %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying %% the stream has been dropped. DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), TopicFilter = emqx_topic:words(<<"foo/+">>), @@ -556,7 +469,7 @@ t_make_iterator_stale_stream(_Config) -> ok. -t_get_streams_concurrently_with_drop_generation(_Config) -> +t_get_streams_concurrently_with_drop_generation(Config) -> %% This checks that we can get all streams while a generation is dropped %% mid-iteration. @@ -564,7 +477,7 @@ t_get_streams_concurrently_with_drop_generation(_Config) -> ?check_trace( #{timetrap => 5_000}, begin - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)), ok = emqx_ds:add_generation(DB), @@ -593,171 +506,6 @@ t_get_streams_concurrently_with_drop_generation(_Config) -> [] ). -t_error_mapping_replication_layer(_Config) -> - %% This checks that the replication layer maps recoverable errors correctly. - - ok = emqx_ds_test_helpers:mock_rpc(), - ok = snabbkaffe:start_trace(), - - DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), - [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB), - - TopicFilter = emqx_topic:words(<<"foo/#">>), - Msgs = [ - message(<<"C1">>, <<"foo/bar">>, <<"1">>, 0), - message(<<"C1">>, <<"foo/baz">>, <<"2">>, 1), - message(<<"C2">>, <<"foo/foo">>, <<"3">>, 2), - message(<<"C3">>, <<"foo/xyz">>, <<"4">>, 3), - message(<<"C4">>, <<"foo/bar">>, <<"5">>, 4), - message(<<"C5">>, <<"foo/oof">>, <<"6">>, 5) - ], - - ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - - ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}), - ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}), - - Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0), - Iterators0 = lists:map( - fun({_Rank, S}) -> - {ok, Iter} = emqx_ds:make_iterator(DB, S, TopicFilter, 0), - Iter - end, - Streams0 - ), - - %% Disrupt the link to the second shard. - ok = emqx_ds_test_helpers:mock_rpc_result( - fun(_Node, emqx_ds_replication_layer, _Function, Args) -> - case Args of - [DB, Shard1 | _] -> passthrough; - [DB, Shard2 | _] -> unavailable - end - end - ), - - %% Result of `emqx_ds:get_streams/3` will just contain partial results, not an error. - Streams1 = emqx_ds:get_streams(DB, TopicFilter, 0), - ?assert( - length(Streams1) > 0 andalso length(Streams1) =< length(Streams0), - Streams1 - ), - - %% At least one of `emqx_ds:make_iterator/4` will end in an error. - Results1 = lists:map( - fun({_Rank, S}) -> - case emqx_ds:make_iterator(DB, S, TopicFilter, 0) of - Ok = {ok, _Iter} -> - Ok; - Error = {error, recoverable, {erpc, _}} -> - Error; - Other -> - ct:fail({unexpected_result, Other}) - end - end, - Streams0 - ), - ?assert( - length([error || {error, _, _} <- Results1]) > 0, - Results1 - ), - - %% At least one of `emqx_ds:next/3` over initial set of iterators will end in an error. - Results2 = lists:map( - fun(Iter) -> - case emqx_ds:next(DB, Iter, _BatchSize = 42) of - Ok = {ok, _Iter, [_ | _]} -> - Ok; - Error = {error, recoverable, {badrpc, _}} -> - Error; - Other -> - ct:fail({unexpected_result, Other}) - end - end, - Iterators0 - ), - ?assert( - length([error || {error, _, _} <- Results2]) > 0, - Results2 - ), - meck:unload(). - -%% This testcase verifies the behavior of `store_batch' operation -%% when the underlying code experiences recoverable or unrecoverable -%% problems. -t_store_batch_fail(_Config) -> - ?check_trace( - #{timetrap => 15_000}, - try - meck:new(emqx_ds_storage_layer, [passthrough, no_history]), - DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), - %% Success: - Batch1 = [ - message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1), - message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1) - ], - ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), - %% Inject unrecoverable error: - meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) -> - {error, unrecoverable, mock} - end), - Batch2 = [ - message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1), - message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1) - ], - ?assertMatch( - {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) - ), - meck:unload(emqx_ds_storage_layer), - %% Inject a recoveralbe error: - meck:new(ra, [passthrough, no_history]), - meck:expect(ra, process_command, fun(Servers, Shard, Command) -> - ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}), - {timeout, mock} - end), - Batch3 = [ - message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), - message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), - message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), - message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) - ], - %% Note: due to idempotency issues the number of retries - %% is currently set to 0: - ?assertMatch( - {error, recoverable, {timeout, mock}}, - emqx_ds:store_batch(DB, Batch3, #{sync => true}) - ), - meck:unload(ra), - ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), - lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) - after - meck:unload() - end, - [ - {"message ordering", fun(StoredMessages, _Trace) -> - [{_, Stream1}, {_, Stream2}] = StoredMessages, - ?assertMatch( - [ - #message{payload = <<"1">>}, - #message{payload = <<"2">>}, - #message{payload = <<"5">>}, - #message{payload = <<"7">>} - ], - Stream1 - ), - ?assertMatch( - [ - #message{payload = <<"6">>}, - #message{payload = <<"8">>} - ], - Stream2 - ) - end} - ] - ). - update_data_set() -> [ [ @@ -802,12 +550,46 @@ delete(DB, It0, Selector, BatchSize, Acc) -> %% CT callbacks -all() -> emqx_common_test_helpers:all(?MODULE). +-if(?EMQX_RELEASE_EDITION == ee). +all() -> + [{group, builtin_local}, {group, builtin_raft}]. +-else. +all() -> + [{group, builtin_local}]. +-endif. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {builtin_local, TCs}, + {builtin_raft, TCs} + ]. + +init_per_group(builtin_local, Config) -> + Conf = #{ + backend => builtin_local, + storage => {emqx_ds_storage_reference, #{}}, + n_shards => ?N_SHARDS + }, + [{ds_conf, Conf} | Config]; +init_per_group(builtin_raft, Config) -> + Conf = #{ + backend => builtin_raft, + storage => {emqx_ds_storage_reference, #{}}, + n_shards => ?N_SHARDS, + n_sites => 1, + replication_factor => 3, + replication_options => #{} + }, + [{ds_conf, Conf} | Config]. + +end_per_group(_Group, Config) -> + Config. init_per_suite(Config) -> emqx_common_test_helpers:clear_screen(), Apps = emqx_cth_suite:start( - [mria, emqx_durable_storage], + [mria, emqx_ds_backends], #{work_dir => ?config(priv_dir, Config)} ), [{apps, Apps} | Config]. @@ -820,7 +602,8 @@ init_per_testcase(_TC, Config) -> application:ensure_all_started(emqx_durable_storage), Config. -end_per_testcase(_TC, _Config) -> +end_per_testcase(TC, _Config) -> + ok = emqx_ds:drop_db(TC), snabbkaffe:stop(), ok = application:stop(emqx_durable_storage), mria:stop(), diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index e84abb78b..978da91a4 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -698,6 +698,81 @@ t_error_mapping_replication_layer(_Config) -> ), meck:unload(). +%% This testcase verifies the behavior of `store_batch' operation +%% when the underlying code experiences recoverable or unrecoverable +%% problems. +t_store_batch_fail(_Config) -> + ?check_trace( + #{timetrap => 15_000}, + try + meck:new(emqx_ds_storage_layer, [passthrough, no_history]), + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + %% Success: + Batch1 = [ + message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), + %% Inject unrecoverable error: + meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) -> + {error, unrecoverable, mock} + end), + Batch2 = [ + message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1) + ], + ?assertMatch( + {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) + ), + meck:unload(emqx_ds_storage_layer), + %% Inject a recoveralbe error: + meck:new(ra, [passthrough, no_history]), + meck:expect(ra, process_command, fun(Servers, Shard, Command) -> + ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}), + {timeout, mock} + end), + Batch3 = [ + message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), + message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), + message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), + message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) + ], + %% Note: due to idempotency issues the number of retries + %% is currently set to 0: + ?assertMatch( + {error, recoverable, {timeout, mock}}, + emqx_ds:store_batch(DB, Batch3, #{sync => true}) + ), + meck:unload(ra), + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), + lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) + after + meck:unload() + end, + [ + {"message ordering", fun(StoredMessages, _Trace) -> + [{_, Stream1}, {_, Stream2}] = StoredMessages, + ?assertMatch( + [ + #message{payload = <<"1">>}, + #message{payload = <<"2">>}, + #message{payload = <<"5">>}, + #message{payload = <<"7">>} + ], + Stream1 + ), + ?assertMatch( + [ + #message{payload = <<"6">>}, + #message{payload = <<"8">>} + ], + Stream2 + ) + end} + ] + ). + %% shard_server_info(Node, DB, Shard, Site, Info) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 3ca2dcefd..331c9806b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -87,6 +87,8 @@ %% Type declarations %%================================================================================ +-define(APP, emqx_durable_storage). + %% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending %% records over the wire. %% tags: diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index 7a20577d4..7bfa6efd3 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,10 +2,10 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.2.1"}, + {vsn, "0.3.0"}, {modules, []}, {registered, []}, - {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]}, + {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]}, {mod, {emqx_ds_app, []}}, {env, []} ]}. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index c3311c09b..277d9fd66 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -42,6 +42,7 @@ esasl, emqx_utils, emqx_durable_storage, + emqx_ds_backends, emqx_http_lib, emqx_resource, emqx_connector,