diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 9a89b0519..04c57aa80 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -29,10 +29,6 @@ erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) ). --define(diff_opts, #{ - context => 20, window => 1000, compare_fun => fun message_eq/2 -}). - opts() -> opts(#{}). @@ -78,7 +74,9 @@ t_replication_transfers_snapshots('end', Config) -> t_replication_transfers_snapshots(Config) -> NMsgs = 400, NClients = 5, - {Stream, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + {Stream, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs + ), Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), _Specs = [_, SpecOffline | _] = ?config(specs, Config), @@ -100,7 +98,7 @@ t_replication_transfers_snapshots(Config) -> ok = emqx_cth_cluster:stop_node(NodeOffline), %% Fill the storage with messages and few additional generations. - apply_stream(?DB, Nodes -- [NodeOffline], Stream), + emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream), %% Restart the node. [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), @@ -126,7 +124,7 @@ t_replication_transfers_snapshots(Config) -> ok = timer:sleep(3_000), %% Check that the DB has been restored: - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, [] ). @@ -154,7 +152,9 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, - {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs + ), Nodes = [N1, N2 | _] = ?config(nodes, Config), ?check_trace( #{timetrap => 30_000}, @@ -205,9 +205,9 @@ t_rebalance(Config) -> ], %% 2. Start filling the storage: - apply_stream(?DB, Nodes, Stream), + emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream), timer:sleep(5000), - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams), [ ?defer_assert( ?assertEqual( @@ -233,8 +233,10 @@ t_rebalance(Config) -> %% Scale down the cluster by removing the first node. ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), - ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + ct:pal("Transitions (~p -> ~p): ~p~n", [ + Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB) + ]), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))), %% Verify that at the end each node is now responsible for each shard. ?defer_assert( @@ -245,7 +247,7 @@ t_rebalance(Config) -> ), %% Verify that the messages are once again preserved after the rebalance: - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, [] ). @@ -294,7 +296,7 @@ t_join_leave_errors(Config) -> %% Should be no-op. ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), - ?assertEqual([], transitions(N1, ?DB)), + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)), %% Impossible to leave the last site. ?assertEqual( @@ -305,12 +307,12 @@ t_join_leave_errors(Config) -> %% "Move" the DB to the other node. ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertMatch([_ | _], transitions(N1, ?DB)), - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), %% Should be no-op. ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertEqual([], transitions(N1, ?DB)). + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)). t_rebalance_chaotic_converges(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -335,7 +337,9 @@ t_rebalance_chaotic_converges(Config) -> Nodes = [N1, N2, N3] = ?config(nodes, Config), NClients = 5, - {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs + ), ?check_trace( #{}, @@ -385,10 +389,10 @@ t_rebalance_chaotic_converges(Config) -> "Initially, the DB is assigned to [S1, S2]" ), - apply_stream(?DB, Nodes, Stream), + emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream), %% Wait for the last transition to complete. - ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + ?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?defer_assert( ?assertEqual( @@ -401,7 +405,7 @@ t_rebalance_chaotic_converges(Config) -> timer:sleep(5000), %% Check that all messages are still there. - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, [] ). @@ -447,7 +451,7 @@ t_rebalance_offline_restarts(Config) -> %% Shut down N3 and then remove it from the DB. ok = emqx_cth_cluster:stop_node(N3), ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), - Transitions = transitions(N1, ?DB), + Transitions = emqx_ds_test_helpers:transitions(N1, ?DB), ct:pal("Transitions: ~p~n", [Transitions]), %% Wait until at least one transition completes. @@ -462,7 +466,7 @@ t_rebalance_offline_restarts(Config) -> ), %% Target state should still be reached eventually. - ?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))), + ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). %% @@ -491,10 +495,6 @@ ds_repl_meta(Node, Fun, Args) -> ds_repl_shard(Node, Fun, Args) -> erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). -transitions(Node, DB) -> - Shards = shards(Node, DB), - [{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])]. - shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). @@ -557,196 +557,3 @@ init_per_testcase(TCName, Config0) -> end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config). - -without_extra(L) -> - [I#message{extra = #{}} || I <- L]. - -%% Consume data from the DS storage on a given node as a stream: --type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). - -%% Create a stream from the topic (wildcards are NOT supported for a -%% good reason: order of messages is implementation-dependent!). -%% -%% Note: stream produces messages with keys --spec ds_topic_stream(binary(), binary(), node()) -> ds_stream(). -ds_topic_stream(ClientId, TopicBin, Node) -> - Topic = emqx_topic:words(TopicBin), - Shard = shard_of_clientid(Node, ClientId), - {ShardId, DSStreams} = - ?ON( - Node, - begin - DBShard = {?DB, Shard}, - {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} - end - ), - %% Sort streams by their rank Y, and chain them together: - emqx_utils_stream:chain([ - ds_topic_generation_stream(Node, ShardId, Topic, S) - || {_RankY, S} <- lists:sort(DSStreams) - ]). - -ds_topic_generation_stream(Node, Shard, Topic, Stream) -> - {ok, Iterator} = ?ON( - Node, - emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0) - ), - do_ds_topic_generation_stream(Node, Shard, Iterator). - -do_ds_topic_generation_stream(Node, Shard, It0) -> - fun() -> - case - ?ON( - Node, - begin - Now = emqx_ds_replication_layer:current_timestamp(?DB, Shard), - emqx_ds_storage_layer:next(Shard, It0, 1, Now) - end - ) - of - {ok, _It, []} -> - []; - {ok, end_of_stream} -> - []; - {ok, It, [KeyMsg]} -> - [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)] - end - end. - -%% Payload generation: - -apply_stream(DB, Nodes, Stream) -> - apply_stream( - DB, - emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), - Stream, - 0 - ). - -apply_stream(DB, NodeStream0, Stream0, N) -> - case emqx_utils_stream:next(Stream0) of - [] -> - ?tp(all_done, #{}); - [Msg = #message{} | Stream] -> - [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), - ?tp( - test_push_message, - maps:merge( - emqx_message:to_map(Msg), - #{n => N} - ) - ), - ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), - apply_stream(DB, NodeStream, Stream, N + 1); - [add_generation | Stream] -> - %% FIXME: - [_Node | NodeStream] = emqx_utils_stream:next(NodeStream0), - %% add_generation(Node, DB), - apply_stream(DB, NodeStream, Stream, N); - [{Node, Operation, Arg} | Stream] when - Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites - -> - ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), - %% Apply the transition. - ?assertEqual(ok, ds_repl_meta(Node, Operation, [DB, Arg])), - %% Give some time for at least one transition to complete. - Transitions = transitions(Node, ?DB), - ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), - ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), - apply_stream(DB, NodeStream0, Stream, N); - [Fun | Stream] when is_function(Fun) -> - Fun(), - apply_stream(DB, NodeStream0, Stream, N) - end. - -%% @doc Create an infinite list of messages from a given client: -interleaved_topic_messages(TestCase, NClients, NMsgs) -> - %% List of fake client IDs: - Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], - TopicStreams = [ - {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))} - || ClientId <- Clients - ], - %% Interleaved stream of messages: - Stream = emqx_utils_stream:interleave( - [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true - ), - {Stream, TopicStreams}. - -topic_messages(TestCase, ClientId) -> - topic_messages(TestCase, ClientId, 0). - -topic_messages(TestCase, ClientId, N) -> - fun() -> - Msg = #message{ - from = ClientId, - topic = client_topic(TestCase, ClientId), - timestamp = N * 100, - payload = integer_to_binary(N) - }, - [Msg | topic_messages(TestCase, ClientId, N + 1)] - end. - -client_topic(TestCase, ClientId) when is_atom(TestCase) -> - client_topic(atom_to_binary(TestCase, utf8), ClientId); -client_topic(TestCase, ClientId) when is_binary(TestCase) -> - <>. - -message_eq(Msg1, {_Key, Msg2}) -> - %% Timestamps can be modified by the replication layer, ignore them: - Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. - -%% Stream comparison: - --spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok. -verify_stream_effects(TestCase, Nodes0, L) -> - Checked = lists:flatmap( - fun({ClientId, Stream}) -> - Nodes = nodes_of_clientid(ClientId, Nodes0), - ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), - ?defer_assert( - ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId]) - ), - [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes] - end, - L - ), - ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")). - --spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. -verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> - ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), - ?defer_assert( - begin - snabbkaffe_diff:assert_lists_eq( - ExpectedStream, - ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node), - ?diff_opts - ), - ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) - end - ). - -%% Find which nodes from the list contain the shards for the given -%% client ID: -nodes_of_clientid(ClientId, Nodes = [N0 | _]) -> - Shard = shard_of_clientid(N0, ClientId), - SiteNodes = ?ON( - N0, - begin - Sites = emqx_ds_replication_layer_meta:replica_set(?DB, Shard), - lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites) - end - ), - lists:filter( - fun(N) -> - lists:member(N, SiteNodes) - end, - Nodes - ). - -shard_of_clientid(Node, ClientId) -> - ?ON( - Node, - emqx_ds_replication_layer:shard_of_message(?DB, #message{from = ClientId}, clientid) - ). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl index eaddab0c6..39158c7ef 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl @@ -23,7 +23,7 @@ -include_lib("stdlib/include/assert.hrl"). opts() -> - #{storage => {emqx_ds_storage_bitfield_lts, #{}}}. + #{storage => {emqx_ds_storage_reference, #{}}}. %% diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 26469c685..3a0145199 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -19,6 +19,12 @@ -compile(nowarn_export_all). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(ON(NODE, BODY), + erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +). %% RPC mocking @@ -59,8 +65,220 @@ mock_rpc_result(gen_rpc, ExpectFun) -> end end). +%% Consume data from the DS storage on a given node as a stream: +-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). + +%% @doc Create an infinite list of messages from a given client: +interleaved_topic_messages(TestCase, NClients, NMsgs) -> + %% List of fake client IDs: + Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], + TopicStreams = [ + {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))} + || ClientId <- Clients + ], + %% Interleaved stream of messages: + Stream = emqx_utils_stream:interleave( + [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true + ), + {Stream, TopicStreams}. + +topic_messages(TestCase, ClientId) -> + topic_messages(TestCase, ClientId, 0). + +topic_messages(TestCase, ClientId, N) -> + fun() -> + Msg = #message{ + from = ClientId, + topic = client_topic(TestCase, ClientId), + timestamp = N * 100, + payload = integer_to_binary(N) + }, + [Msg | topic_messages(TestCase, ClientId, N + 1)] + end. + +client_topic(TestCase, ClientId) when is_atom(TestCase) -> + client_topic(atom_to_binary(TestCase, utf8), ClientId); +client_topic(TestCase, ClientId) when is_binary(TestCase) -> + <>. + +ds_topic_generation_stream(DB, Node, Shard, Topic, Stream) -> + {ok, Iterator} = ?ON( + Node, + emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0) + ), + do_ds_topic_generation_stream(DB, Node, Shard, Iterator). + +do_ds_topic_generation_stream(DB, Node, Shard, It0) -> + fun() -> + case + ?ON( + Node, + begin + Now = emqx_ds_replication_layer:current_timestamp(DB, Shard), + emqx_ds_storage_layer:next(Shard, It0, 1, Now) + end + ) + of + {ok, _It, []} -> + []; + {ok, end_of_stream} -> + []; + {ok, It, [KeyMsg]} -> + [KeyMsg | do_ds_topic_generation_stream(DB, Node, Shard, It)] + end + end. + +%% Payload generation: + +apply_stream(DB, Nodes, Stream) -> + apply_stream( + DB, + emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), + Stream, + 0 + ). + +apply_stream(DB, NodeStream0, Stream0, N) -> + case emqx_utils_stream:next(Stream0) of + [] -> + ?tp(all_done, #{}); + [Msg = #message{} | Stream] -> + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + ?tp( + test_push_message, + maps:merge( + emqx_message:to_map(Msg), + #{n => N} + ) + ), + ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), + apply_stream(DB, NodeStream, Stream, N + 1); + [add_generation | Stream] -> + %% FIXME: + [_Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + %% add_generation(Node, DB), + apply_stream(DB, NodeStream, Stream, N); + [{Node, Operation, Arg} | Stream] when + Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + -> + ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), + %% Apply the transition. + ?assertEqual( + ok, + ?ON( + Node, + emqx_ds_replication_layer_meta:Operation(DB, Arg) + ) + ), + %% Give some time for at least one transition to complete. + Transitions = transitions(Node, DB), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), + apply_stream(DB, NodeStream0, Stream, N); + [Fun | Stream] when is_function(Fun) -> + Fun(), + apply_stream(DB, NodeStream0, Stream, N) + end. + +transitions(Node, DB) -> + ?ON( + Node, + begin + Shards = emqx_ds_replication_layer_meta:shards(DB), + [ + {S, T} + || S <- Shards, T <- emqx_ds_replication_layer_meta:replica_set_transitions(DB, S) + ] + end + ). + +%% Stream comparison + +message_eq(Msg1, {_Key, Msg2}) -> + %% Timestamps can be modified by the replication layer, ignore them: + Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. + %% Consuming streams and iterators +-spec verify_stream_effects(atom(), binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> + ok. +verify_stream_effects(DB, TestCase, Nodes0, L) -> + Checked = lists:flatmap( + fun({ClientId, Stream}) -> + Nodes = nodes_of_clientid(DB, ClientId, Nodes0), + ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), + ?defer_assert( + ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId]) + ), + [verify_stream_effects(DB, TestCase, Node, ClientId, Stream) || Node <- Nodes] + end, + L + ), + ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")). + +-spec verify_stream_effects(atom(), binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. +verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) -> + ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), + DiffOpts = #{context => 20, window => 1000, compare_fun => fun message_eq/2}, + ?defer_assert( + begin + snabbkaffe_diff:assert_lists_eq( + ExpectedStream, + ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node), + DiffOpts + ), + ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) + end + ). + +%% Create a stream from the topic (wildcards are NOT supported for a +%% good reason: order of messages is implementation-dependent!). +%% +%% Note: stream produces messages with keys +-spec ds_topic_stream(atom(), binary(), binary(), node()) -> ds_stream(). +ds_topic_stream(DB, ClientId, TopicBin, Node) -> + Topic = emqx_topic:words(TopicBin), + Shard = shard_of_clientid(DB, Node, ClientId), + {ShardId, DSStreams} = + ?ON( + Node, + begin + DBShard = {DB, Shard}, + {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} + end + ), + %% Sort streams by their rank Y, and chain them together: + emqx_utils_stream:chain([ + ds_topic_generation_stream(DB, Node, ShardId, Topic, S) + || {_RankY, S} <- lists:sort(DSStreams) + ]). + +%% Find which nodes from the list contain the shards for the given +%% client ID: +nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) -> + Shard = shard_of_clientid(DB, N0, ClientId), + SiteNodes = ?ON( + N0, + begin + Sites = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites) + end + ), + lists:filter( + fun(N) -> + lists:member(N, SiteNodes) + end, + Nodes + ). + +shard_of_clientid(DB, Node, ClientId) -> + ?ON( + Node, + emqx_ds_replication_layer:shard_of_message(DB, #message{from = ClientId}, clientid) + ). + +%% Consume eagerly: + consume(DB, TopicFilter) -> consume(DB, TopicFilter, 0).