178 lines
6.1 KiB
Erlang
178 lines
6.1 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_ds_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("stdlib/include/assert.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-define(N_SHARDS, 1).
|
|
|
|
opts() ->
|
|
#{
|
|
backend => builtin,
|
|
storage => {emqx_ds_storage_reference, #{}},
|
|
n_shards => ?N_SHARDS,
|
|
replication_factor => 3
|
|
}.
|
|
|
|
%% A simple smoke test that verifies that opening/closing the DB
|
|
%% doesn't crash, and not much else
|
|
t_00_smoke_open_drop(_Config) ->
|
|
DB = 'DB',
|
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
|
%% Check metadata:
|
|
%% We have only one site:
|
|
[Site] = emqx_ds_replication_layer_meta:sites(),
|
|
%% Check all shards:
|
|
Shards = emqx_ds_replication_layer_meta:shards(DB),
|
|
%% Since there is only one site all shards should be allocated
|
|
%% to this site:
|
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
|
?assertEqual(?N_SHARDS, length(Shards)),
|
|
lists:foreach(
|
|
fun(Shard) ->
|
|
?assertEqual(
|
|
{ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
|
|
),
|
|
?assertEqual(
|
|
[Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
|
|
),
|
|
%% Check that the leader is eleected;
|
|
?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard))
|
|
end,
|
|
Shards
|
|
),
|
|
?assertEqual(lists:sort(Shards), lists:sort(MyShards)),
|
|
%% Reopen the DB and make sure the operation is idempotent:
|
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
|
%% Close the DB:
|
|
?assertMatch(ok, emqx_ds:drop_db(DB)).
|
|
|
|
%% A simple smoke test that verifies that storing the messages doesn't
|
|
%% crash
|
|
t_01_smoke_store(_Config) ->
|
|
DB = default,
|
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
|
Msg = message(<<"foo/bar">>, <<"foo">>, 0),
|
|
?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])).
|
|
|
|
%% A simple smoke test that verifies that getting the list of streams
|
|
%% doesn't crash and that iterators can be opened.
|
|
t_02_smoke_get_streams_start_iter(_Config) ->
|
|
DB = ?FUNCTION_NAME,
|
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
|
StartTime = 0,
|
|
TopicFilter = ['#'],
|
|
[{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
|
?assertMatch({_, _}, Rank),
|
|
?assertMatch({ok, _Iter}, emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime)).
|
|
|
|
%% A simple smoke test that verifies that it's possible to iterate
|
|
%% over messages.
|
|
t_03_smoke_iterate(_Config) ->
|
|
DB = ?FUNCTION_NAME,
|
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
|
StartTime = 0,
|
|
TopicFilter = ['#'],
|
|
Msgs = [
|
|
message(<<"foo/bar">>, <<"1">>, 0),
|
|
message(<<"foo">>, <<"2">>, 1),
|
|
message(<<"bar/bar">>, <<"3">>, 2)
|
|
],
|
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
|
|
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
|
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
|
|
{ok, Iter, Batch} = iterate(DB, Iter0, 1),
|
|
?assertEqual(Msgs, Batch, {Iter0, Iter}).
|
|
|
|
%% Verify that iterators survive restart of the application. This is
|
|
%% an important property, since the lifetime of the iterators is tied
|
|
%% to the external resources, such as clients' sessions, and they
|
|
%% should always be able to continue replaying the topics from where
|
|
%% they are left off.
|
|
t_04_restart(_Config) ->
|
|
DB = ?FUNCTION_NAME,
|
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
|
TopicFilter = ['#'],
|
|
StartTime = 0,
|
|
Msgs = [
|
|
message(<<"foo/bar">>, <<"1">>, 0),
|
|
message(<<"foo">>, <<"2">>, 1),
|
|
message(<<"bar/bar">>, <<"3">>, 2)
|
|
],
|
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
|
|
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
|
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
|
|
%% Restart the application:
|
|
?tp(warning, emqx_ds_SUITE_restart_app, #{}),
|
|
ok = application:stop(emqx_durable_storage),
|
|
{ok, _} = application:ensure_all_started(emqx_durable_storage),
|
|
ok = emqx_ds:open_db(DB, opts()),
|
|
%% The old iterator should be still operational:
|
|
{ok, Iter, Batch} = iterate(DB, Iter0, 1),
|
|
?assertEqual(Msgs, Batch, {Iter0, Iter}).
|
|
|
|
message(Topic, Payload, PublishedAt) ->
|
|
#message{
|
|
topic = Topic,
|
|
payload = Payload,
|
|
timestamp = PublishedAt,
|
|
id = emqx_guid:gen()
|
|
}.
|
|
|
|
iterate(DB, It, BatchSize) ->
|
|
iterate(DB, It, BatchSize, []).
|
|
|
|
iterate(DB, It0, BatchSize, Acc) ->
|
|
case emqx_ds:next(DB, It0, BatchSize) of
|
|
{ok, It, []} ->
|
|
{ok, It, Acc};
|
|
{ok, It, Msgs} ->
|
|
iterate(DB, It, BatchSize, Acc ++ Msgs);
|
|
Ret ->
|
|
Ret
|
|
end.
|
|
|
|
%% CT callbacks
|
|
|
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
Apps = emqx_cth_suite:start(
|
|
[mria, emqx_durable_storage],
|
|
#{work_dir => ?config(priv_dir, Config)}
|
|
),
|
|
[{apps, Apps} | Config].
|
|
|
|
end_per_suite(Config) ->
|
|
ok = emqx_cth_suite:stop(?config(apps, Config)),
|
|
ok.
|
|
|
|
init_per_testcase(_TC, Config) ->
|
|
application:ensure_all_started(emqx_durable_storage),
|
|
Config.
|
|
|
|
end_per_testcase(_TC, _Config) ->
|
|
ok = application:stop(emqx_durable_storage),
|
|
mria:stop(),
|
|
_ = mnesia:delete_schema([node()]),
|
|
ok.
|