diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 564c99c00..0120f09d4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -93,13 +93,27 @@ add_subscription(TopicFilterBin, DSSessionID) -> {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( DSSessionID, TopicFilter ), - ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID), + Ctx = #{ + iterator_id => IteratorID, + start_time => StartMS, + is_new => IsNew + }, + ?tp(persistent_session_ds_iterator_added, Ctx), + ?tp_span( + persistent_session_ds_open_iterators, + Ctx, + ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) + ), {ok, IteratorID, IsNew} end ). -spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> + ?tp(persistent_session_ds_will_open_iterators, #{ + iterator_id => IteratorID, + start_time => StartMS + }), Nodes = emqx:running_nodes(), Results = emqx_persistent_session_ds_proto_v1:open_iterator( Nodes, TopicFilter, StartMS, IteratorID diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index ddcb3234c..cbbad0aa2 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -17,7 +17,7 @@ -module(emqx_cth_cluster). -export([start/2]). --export([stop/1]). +-export([stop/1, stop_node/1]). -export([share_load_module/2]). -export([node_name/1]). @@ -80,7 +80,12 @@ when %% Working directory %% Everything a test produces should go here. Each node's stuff should go in its %% own directory. - work_dir := file:name() + work_dir := file:name(), + %% Usually, we want to ensure the node / test suite starts from a clean slate. + %% However, sometimes, we may want to test restarting a node. For such + %% situations, we need to disable this check to allow resuming from an existing + %% state. + skip_clean_suite_state_check => boolean() }. start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), @@ -124,12 +129,14 @@ mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) -> Node = node_name(Name), BasePort = base_port(N), WorkDir = maps:get(work_dir, ClusterOpts), + SkipCleanSuiteStateCheck = maps:get(skip_clean_suite_state_check, ClusterOpts, false), Defaults = #{ name => Node, role => core, apps => [], base_port => BasePort, work_dir => filename:join([WorkDir, Node]), + skip_clean_suite_state_check => SkipCleanSuiteStateCheck, driver => ct_slave }, maps:merge(Defaults, NodeOpts). @@ -288,17 +295,20 @@ load_apps(Node, #{apps := Apps}) -> erpc:call(Node, emqx_cth_suite, load_apps, [Apps]). start_apps_clustering(Node, #{apps := Apps} = Spec) -> - SuiteOpts = maps:with([work_dir], Spec), + SuiteOpts = suite_opts(Spec), AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING], _Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]), ok. start_apps(Node, #{apps := Apps} = Spec) -> - SuiteOpts = maps:with([work_dir], Spec), + SuiteOpts = suite_opts(Spec), AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)], _Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]), ok. +suite_opts(Spec) -> + maps:with([work_dir, skip_clean_suite_state_check], Spec). + maybe_join_cluster(_Node, #{role := replicant}) -> ok; maybe_join_cluster(Node, Spec) -> diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 9b3e58da4..dbe9423da 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -358,6 +358,8 @@ stop_apps(Apps) -> %% +verify_clean_suite_state(#{skip_clean_suite_state_check := true}) -> + ok; verify_clean_suite_state(#{work_dir := WorkDir}) -> {ok, []} = file:list_dir(WorkDir), none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none), diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 62702b3bc..b669be889 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -183,6 +183,8 @@ t_session_subscription_iterators(Config) -> ok end, ?assertMatch([_], IteratorIds), + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node2)), [IteratorId] = IteratorIds, ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), ExpectedMessages = [Message2, Message3], @@ -280,3 +282,9 @@ cluster() -> get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. + +get_all_iterator_ids(Node) -> + Fn = fun(K, _V, Acc) -> [K | Acc] end, + erpc:call(Node, fun() -> + emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) + end). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index ac4649d94..9bc7924e8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -18,7 +18,8 @@ restore_iterator/2, discard_iterator/2, is_iterator_present/2, - discard_iterator_prefix/2 + discard_iterator_prefix/2, + foldl_iterator_prefix/4 ]). %% behaviour callbacks: @@ -204,6 +205,16 @@ discard_iterator(Shard, ReplayID) -> discard_iterator_prefix(Shard, KeyPrefix) -> do_discard_iterator_prefix(Shard, KeyPrefix). +-spec foldl_iterator_prefix( + emqx_ds:shard(), + binary(), + fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc), + Acc +) -> {ok, Acc} | {error, _TODO} when + Acc :: term(). +foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -414,26 +425,31 @@ restore_iterator_state( open_restore_iterator(meta_get_gen(Shard, Gen), It, State). do_discard_iterator_prefix(Shard, KeyPrefix) -> + #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), + Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok). + +do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of {ok, It} -> NextAction = {seek, KeyPrefix}, - do_discard_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction); + do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc); Error -> Error end. -do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction) -> +do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) -> case rocksdb:iterator_move(It, NextAction) of - {ok, K = <>, _V} -> - ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS), - do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, next); + {ok, K = <>, V} -> + NewAcc = Fn(K, V, Acc), + do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc); {ok, _K, _V} -> ok = rocksdb:iterator_close(It), - ok; + {ok, Acc}; {error, invalid_iterator} -> ok = rocksdb:iterator_close(It), - ok; + {ok, Acc}; Error -> ok = rocksdb:iterator_close(It), Error diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl new file mode 100644 index 000000000..8a2d18c0d --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -0,0 +1,178 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(DS_SHARD, <<"local">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% avoid inter-suite flakiness... + application:stop(emqx), + application:stop(emqx_durable_storage), + TCApps = emqx_cth_suite:start( + app_specs(), + #{work_dir => ?config(priv_dir, Config)} + ), + [{tc_apps, TCApps} | Config]. + +end_per_suite(Config) -> + TCApps = ?config(tc_apps, Config), + emqx_cth_suite:stop(TCApps), + ok. + +init_per_testcase(t_session_subscription_idempotency, Config) -> + Cluster = cluster(#{n => 1}), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), + [{cluster, Cluster}, {nodes, Nodes} | Config]; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_session_subscription_idempotency, Config) -> + Nodes = ?config(nodes, Config), + ok = emqx_cth_cluster:stop(Nodes), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +cluster(#{n := N}) -> + Node1 = ds_SUITE1, + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + listeners => true, + apps => app_specs() + }, + [ + {Node1, Spec} + | lists:map( + fun(M) -> + Name = binary_to_atom(<<"ds_SUITE", (integer_to_binary(M))/binary>>), + {Name, Spec} + end, + lists:seq(2, N) + ) + ]. + +app_specs() -> + [ + emqx_durable_storage, + {emqx, #{ + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end, + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker, listeners]}] + }} + ]. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. + +get_all_iterator_ids(Node) -> + Fn = fun(K, _V, Acc) -> [K | Acc] end, + erpc:call(Node, fun() -> + emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) + end). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_session_subscription_idempotency(Config) -> + Cluster = ?config(cluster, Config), + [Node1] = ?config(nodes, Config), + Port = get_mqtt_port(Node1, tcp), + SubTopicFilter = <<"t/+">>, + ClientId = <<"myclientid">>, + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := persistent_session_ds_iterator_added}, + _NEvents0 = 1, + #{?snk_kind := will_restart_node}, + _Guard0 = true + ), + ?force_ordering( + #{?snk_kind := restarted_node}, + _NEvents1 = 1, + #{?snk_kind := persistent_session_ds_open_iterators, ?snk_span := start}, + _Guard1 = true + ), + + spawn_link(fun() -> + ?tp(will_restart_node, #{}), + ct:pal("stopping node ~p", [Node1]), + ok = emqx_cth_cluster:stop_node(Node1), + ct:pal("stopped node ~p; restarting...", [Node1]), + [Node1] = emqx_cth_cluster:start(Cluster, #{ + work_dir => ?config(priv_dir, Config), + skip_clean_suite_state_check => true + }), + ct:pal("node ~p restarted", [Node1]), + ?tp(restarted_node, #{}), + ok + end), + + ct:pal("starting 1"), + {ok, Client0} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client0), + ct:pal("subscribing 1"), + process_flag(trap_exit, true), + catch emqtt:subscribe(Client0, SubTopicFilter, qos2), + receive + {'EXIT', {shutdown, _}} -> + ok + after 0 -> ok + end, + process_flag(trap_exit, false), + + {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), + ct:pal("starting 2"), + {ok, Client1} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client1), + ct:pal("subscribing 2"), + {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), + + ok = emqtt:stop(Client1), + + ok + end, + fun(Trace) -> + ct:pal("trace:\n ~p", [Trace]), + %% Exactly one iterator should have been opened. + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), + ?assertMatch( + {_IsNew = false, ClientId, _}, + erpc:call(Node1, emqx_ds, session_open, [ClientId]) + ), + ok + end + ), + ok.