feat(ds): add filter for message persistence
Fixes https://emqx.atlassian.net/browse/EMQX-10520
This commit is contained in:
parent
fe4640922d
commit
f1294736b7
|
@ -23,6 +23,7 @@
|
|||
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
||||
-define(CM_SHARD, emqx_cm_shard).
|
||||
-define(ROUTE_SHARD, route_shard).
|
||||
-define(PS_ROUTER_SHARD, ps_router_shard).
|
||||
|
||||
%% Banner
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
-module(emqx_persistent_session_ds).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-export([init/0]).
|
||||
|
@ -56,11 +57,13 @@
|
|||
%%
|
||||
|
||||
init() ->
|
||||
?WHEN_ENABLED(
|
||||
?WHEN_ENABLED(begin
|
||||
ok = emqx_ds:ensure_shard(?DS_SHARD, #{
|
||||
dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
|
||||
})
|
||||
).
|
||||
}),
|
||||
ok = emqx_persistent_session_ds_router:init_tables(),
|
||||
ok
|
||||
end).
|
||||
|
||||
%%
|
||||
|
||||
|
@ -71,8 +74,8 @@ persist_message(Msg) ->
|
|||
case needs_persistence(Msg) andalso find_subscribers(Msg) of
|
||||
[_ | _] ->
|
||||
store_message(Msg);
|
||||
% [] ->
|
||||
% {skipped, no_subscribers};
|
||||
[] ->
|
||||
{skipped, no_subscribers};
|
||||
false ->
|
||||
{skipped, needs_no_persistence}
|
||||
end
|
||||
|
@ -87,8 +90,8 @@ store_message(Msg) ->
|
|||
Topic = emqx_topic:words(emqx_message:topic(Msg)),
|
||||
emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
|
||||
|
||||
find_subscribers(_Msg) ->
|
||||
[node()].
|
||||
find_subscribers(#message{topic = Topic}) ->
|
||||
emqx_persistent_session_ds_router:match_routes(Topic).
|
||||
|
||||
open_session(ClientID) ->
|
||||
?WHEN_ENABLED(emqx_ds:session_open(ClientID)).
|
||||
|
@ -98,6 +101,7 @@ open_session(ClientID) ->
|
|||
add_subscription(TopicFilterBin, DSSessionID) ->
|
||||
?WHEN_ENABLED(
|
||||
begin
|
||||
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
|
||||
TopicFilter = emqx_topic:words(TopicFilterBin),
|
||||
{ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
|
||||
DSSessionID, TopicFilter
|
||||
|
@ -160,7 +164,9 @@ del_subscription(TopicFilterBin, DSSessionID) ->
|
|||
persistent_session_ds_iterator_delete,
|
||||
#{},
|
||||
emqx_ds:session_del_iterator(DSSessionID, TopicFilter)
|
||||
)
|
||||
),
|
||||
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID),
|
||||
ok
|
||||
end
|
||||
).
|
||||
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-ifndef(EMQX_PS_DS_HRL).
|
||||
-define(EMQX_PS_DS_HRL, true).
|
||||
|
||||
-define(PS_ROUTER_TAB, emqx_ds_ps_router).
|
||||
-define(PS_FILTERS_TAB, emqx_ds_ps_filters).
|
||||
|
||||
-type dest() :: emqx_ds:session_id().
|
||||
|
||||
-record(ps_route, {
|
||||
topic :: binary(),
|
||||
dest :: emqx_ds:session_id()
|
||||
}).
|
||||
-record(ps_routeidx, {
|
||||
entry :: emqx_topic_index:key(dest()),
|
||||
unused = [] :: nil()
|
||||
}).
|
||||
|
||||
-endif.
|
|
@ -0,0 +1,226 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_persistent_session_ds_router).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
|
||||
|
||||
-export([init_tables/0]).
|
||||
|
||||
%% Route APIs
|
||||
-export([
|
||||
do_add_route/2,
|
||||
do_delete_route/2,
|
||||
match_routes/1,
|
||||
lookup_routes/1,
|
||||
foldr_routes/2,
|
||||
foldl_routes/2
|
||||
]).
|
||||
|
||||
-export([cleanup_routes/1]).
|
||||
-export([print_routes/1]).
|
||||
-export([topics/0]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([has_route/2]).
|
||||
-endif.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Table Initialization
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init_tables() ->
|
||||
mria_config:set_dirty_shard(?PS_ROUTER_SHARD, true),
|
||||
ok = mria:create_table(?PS_ROUTER_TAB, [
|
||||
{type, bag},
|
||||
{rlog_shard, ?PS_ROUTER_SHARD},
|
||||
{storage, ram_copies},
|
||||
{record_name, ps_route},
|
||||
{attributes, record_info(fields, ps_route)},
|
||||
{storage_properties, [
|
||||
{ets, [
|
||||
{read_concurrency, true},
|
||||
{write_concurrency, true}
|
||||
]}
|
||||
]}
|
||||
]),
|
||||
ok = mria:create_table(?PS_FILTERS_TAB, [
|
||||
{type, ordered_set},
|
||||
{rlog_shard, ?PS_ROUTER_SHARD},
|
||||
{storage, ram_copies},
|
||||
{record_name, ps_routeidx},
|
||||
{attributes, record_info(fields, ps_routeidx)},
|
||||
{storage_properties, [
|
||||
{ets, [
|
||||
{read_concurrency, true},
|
||||
{write_concurrency, auto}
|
||||
]}
|
||||
]}
|
||||
]),
|
||||
ok = mria:wait_for_tables([?PS_ROUTER_TAB, ?PS_FILTERS_TAB]),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Route APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
||||
do_add_route(Topic, Dest) when is_binary(Topic) ->
|
||||
case has_route(Topic, Dest) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
mria_insert_route(Topic, Dest)
|
||||
end.
|
||||
|
||||
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
||||
do_delete_route(Topic, Dest) ->
|
||||
case emqx_trie_search:filter(Topic) of
|
||||
Words when is_list(Words) ->
|
||||
K = emqx_topic_index:make_key(Words, Dest),
|
||||
mria:dirty_delete(?PS_FILTERS_TAB, K);
|
||||
false ->
|
||||
mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest})
|
||||
end.
|
||||
|
||||
%% @doc Take a real topic (not filter) as input, return the matching topics and topic
|
||||
%% filters associated with route destination.
|
||||
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
||||
match_routes(Topic) when is_binary(Topic) ->
|
||||
lookup_route_tab(Topic) ++
|
||||
[match_to_route(M) || M <- match_filters(Topic)].
|
||||
|
||||
%% @doc Take a topic or filter as input, and return the existing routes with exactly
|
||||
%% this topic or filter.
|
||||
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
|
||||
lookup_routes(Topic) ->
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true ->
|
||||
Pat = #ps_routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
|
||||
[Dest || [Dest] <- ets:match(?PS_FILTERS_TAB, Pat)];
|
||||
false ->
|
||||
lookup_route_tab(Topic)
|
||||
end.
|
||||
|
||||
-spec has_route(emqx_types:topic(), dest()) -> boolean().
|
||||
has_route(Topic, Dest) ->
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true ->
|
||||
ets:member(?PS_FILTERS_TAB, emqx_topic_index:make_key(Topic, Dest));
|
||||
false ->
|
||||
has_route_tab_entry(Topic, Dest)
|
||||
end.
|
||||
|
||||
-spec topics() -> list(emqx_types:topic()).
|
||||
topics() ->
|
||||
Pat = #ps_routeidx{entry = '$1'},
|
||||
Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?PS_FILTERS_TAB, Pat)],
|
||||
list_route_tab_topics() ++ Filters.
|
||||
|
||||
%% @doc Print routes to a topic
|
||||
-spec print_routes(emqx_types:topic()) -> ok.
|
||||
print_routes(Topic) ->
|
||||
lists:foreach(
|
||||
fun(#ps_route{topic = To, dest = Dest}) ->
|
||||
io:format("~ts -> ~ts~n", [To, Dest])
|
||||
end,
|
||||
match_routes(Topic)
|
||||
).
|
||||
|
||||
-spec cleanup_routes(emqx_ds:session_id()) -> ok.
|
||||
cleanup_routes(DSSessionId) ->
|
||||
%% NOTE
|
||||
%% No point in transaction here because all the operations on filters table are dirty.
|
||||
ok = ets:foldl(
|
||||
fun(#ps_routeidx{entry = K}, ok) ->
|
||||
case get_dest_session_id(emqx_topic_index:get_id(K)) of
|
||||
DSSessionId ->
|
||||
mria:dirty_delete(?PS_FILTERS_TAB, K);
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
ok,
|
||||
?PS_FILTERS_TAB
|
||||
),
|
||||
ok = ets:foldl(
|
||||
fun(#ps_route{dest = Dest} = Route, ok) ->
|
||||
case get_dest_session_id(Dest) of
|
||||
DSSessionId ->
|
||||
mria:dirty_delete_object(?PS_ROUTER_TAB, Route);
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
ok,
|
||||
?PS_ROUTER_TAB
|
||||
).
|
||||
|
||||
-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
|
||||
foldl_routes(FoldFun, AccIn) ->
|
||||
fold_routes(foldl, FoldFun, AccIn).
|
||||
|
||||
-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
|
||||
foldr_routes(FoldFun, AccIn) ->
|
||||
fold_routes(foldr, FoldFun, AccIn).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
mria_insert_route(Topic, Dest) ->
|
||||
case emqx_trie_search:filter(Topic) of
|
||||
Words when is_list(Words) ->
|
||||
K = emqx_topic_index:make_key(Words, Dest),
|
||||
mria:dirty_write(?PS_FILTERS_TAB, #ps_routeidx{entry = K});
|
||||
false ->
|
||||
mria_route_tab_insert(#ps_route{topic = Topic, dest = Dest})
|
||||
end.
|
||||
|
||||
fold_routes(FunName, FoldFun, AccIn) ->
|
||||
FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
|
||||
Acc = ets:FunName(FoldFun, AccIn, ?PS_ROUTER_TAB),
|
||||
ets:FunName(FilterFoldFun, Acc, ?PS_FILTERS_TAB).
|
||||
|
||||
mk_filtertab_fold_fun(FoldFun) ->
|
||||
fun(#ps_routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
|
||||
|
||||
match_filters(Topic) ->
|
||||
emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
|
||||
|
||||
get_dest_session_id({_, DSSessionId}) ->
|
||||
DSSessionId;
|
||||
get_dest_session_id(DSSessionId) ->
|
||||
DSSessionId.
|
||||
|
||||
match_to_route(M) ->
|
||||
#ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
|
||||
|
||||
mria_route_tab_insert(Route) ->
|
||||
mria:dirty_write(?PS_ROUTER_TAB, Route).
|
||||
|
||||
lookup_route_tab(Topic) ->
|
||||
ets:lookup(?PS_ROUTER_TAB, Topic).
|
||||
|
||||
has_route_tab_entry(Topic, Dest) ->
|
||||
[] =/= ets:match(?PS_ROUTER_TAB, #ps_route{topic = Topic, dest = Dest}).
|
||||
|
||||
list_route_tab_topics() ->
|
||||
mnesia:dirty_all_keys(?PS_ROUTER_TAB).
|
||||
|
||||
mria_route_tab_delete(Route) ->
|
||||
mria:dirty_delete_object(?PS_ROUTER_TAB, Route).
|
|
@ -19,6 +19,7 @@
|
|||
-include_lib("stdlib/include/assert.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
@ -54,8 +55,12 @@ init_per_testcase(_TestCase, Config) ->
|
|||
end_per_testcase(t_session_subscription_iterators, Config) ->
|
||||
Nodes = ?config(nodes, Config),
|
||||
ok = emqx_cth_cluster:stop(Nodes),
|
||||
ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>),
|
||||
delete_all_messages(),
|
||||
ok;
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>),
|
||||
delete_all_messages(),
|
||||
ok.
|
||||
|
||||
t_messages_persisted(_Config) ->
|
||||
|
@ -75,12 +80,12 @@ t_messages_persisted(_Config) ->
|
|||
Messages = [
|
||||
M1 = {<<"client/1/topic">>, <<"1">>},
|
||||
M2 = {<<"client/2/topic">>, <<"2">>},
|
||||
M3 = {<<"client/3/topic/sub">>, <<"3">>},
|
||||
M4 = {<<"client/4">>, <<"4">>},
|
||||
_M3 = {<<"client/3/topic/sub">>, <<"3">>},
|
||||
_M4 = {<<"client/4">>, <<"4">>},
|
||||
M5 = {<<"random/5">>, <<"5">>},
|
||||
M6 = {<<"random/6/topic">>, <<"6">>},
|
||||
_M6 = {<<"random/6/topic">>, <<"6">>},
|
||||
M7 = {<<"client/7/topic">>, <<"7">>},
|
||||
M8 = {<<"client/8/topic/sub">>, <<"8">>},
|
||||
_M8 = {<<"client/8/topic/sub">>, <<"8">>},
|
||||
M9 = {<<"random/9">>, <<"9">>},
|
||||
M10 = {<<"random/10">>, <<"10">>}
|
||||
],
|
||||
|
@ -94,8 +99,53 @@ t_messages_persisted(_Config) ->
|
|||
ct:pal("Persisted = ~p", [Persisted]),
|
||||
|
||||
?assertEqual(
|
||||
% [M1, M2, M5, M7, M9, M10],
|
||||
[M1, M2, M3, M4, M5, M6, M7, M8, M9, M10],
|
||||
[M1, M2, M5, M7, M9, M10],
|
||||
[{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
|
||||
),
|
||||
|
||||
ok.
|
||||
|
||||
t_messages_persisted_2(_Config) ->
|
||||
Prefix = atom_to_binary(?FUNCTION_NAME),
|
||||
C1 = connect(<<Prefix/binary, "1">>, _CleanStart0 = true, _EI0 = 30),
|
||||
CP = connect(<<Prefix/binary, "-pub">>, _CleanStart1 = true, _EI1 = undefined),
|
||||
T = fun(T0) -> <<Prefix/binary, T0/binary>> end,
|
||||
|
||||
%% won't be persisted
|
||||
{ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
|
||||
emqtt:publish(CP, T(<<"random/topic">>), <<"0">>, 1),
|
||||
{ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
|
||||
emqtt:publish(CP, T(<<"client/1/topic">>), <<"1">>, 1),
|
||||
{ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
|
||||
emqtt:publish(CP, T(<<"client/2/topic">>), <<"2">>, 1),
|
||||
|
||||
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, T(<<"client/+/topic">>), qos1),
|
||||
{ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
|
||||
emqtt:publish(CP, T(<<"random/topic">>), <<"3">>, 1),
|
||||
%% will be persisted
|
||||
{ok, #{reason_code := ?RC_SUCCESS}} =
|
||||
emqtt:publish(CP, T(<<"client/1/topic">>), <<"4">>, 1),
|
||||
{ok, #{reason_code := ?RC_SUCCESS}} =
|
||||
emqtt:publish(CP, T(<<"client/2/topic">>), <<"5">>, 1),
|
||||
|
||||
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C1, T(<<"client/+/topic">>)),
|
||||
%% won't be persisted
|
||||
{ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
|
||||
emqtt:publish(CP, T(<<"random/topic">>), <<"6">>, 1),
|
||||
{ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
|
||||
emqtt:publish(CP, T(<<"client/1/topic">>), <<"7">>, 1),
|
||||
{ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
|
||||
emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1),
|
||||
|
||||
Persisted = consume(?DS_SHARD, {['#'], 0}),
|
||||
|
||||
ct:pal("Persisted = ~p", [Persisted]),
|
||||
|
||||
?assertEqual(
|
||||
[
|
||||
{T(<<"client/1/topic">>), <<"4">>},
|
||||
{T(<<"client/2/topic">>), <<"5">>}
|
||||
],
|
||||
[{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
|
||||
),
|
||||
|
||||
|
@ -224,6 +274,18 @@ consume(It) ->
|
|||
[]
|
||||
end.
|
||||
|
||||
delete_all_messages() ->
|
||||
Persisted = consume(?DS_SHARD, {['#'], 0}),
|
||||
lists:foreach(
|
||||
fun(Msg) ->
|
||||
GUID = emqx_message:id(Msg),
|
||||
Topic = emqx_topic:words(emqx_message:topic(Msg)),
|
||||
Timestamp = emqx_guid:timestamp(GUID),
|
||||
ok = emqx_ds_storage_layer:delete(?DS_SHARD, GUID, Timestamp, Topic)
|
||||
end,
|
||||
Persisted
|
||||
).
|
||||
|
||||
receive_messages(Count) ->
|
||||
receive_messages(Count, []).
|
||||
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_persistent_session_ds_router_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-define(R, emqx_persistent_session_ds_router).
|
||||
-define(DEF_DS_SESSION_ID, <<"some-client-id">>).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE]),
|
||||
AppSpecs = [
|
||||
emqx_durable_storage,
|
||||
{emqx, #{
|
||||
config => #{persistent_session_store => #{ds => true}},
|
||||
override_env => [{boot_modules, [broker]}]
|
||||
}}
|
||||
],
|
||||
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
|
||||
[{apps, Apps} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
ok = emqx_cth_suite:stop(?config(apps, Config)),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
clear_tables(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
clear_tables().
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
clear_tables() ->
|
||||
lists:foreach(
|
||||
fun mnesia:clear_table/1,
|
||||
[?PS_ROUTER_TAB, ?PS_FILTERS_TAB]
|
||||
).
|
||||
|
||||
add_route(TopicFilter) ->
|
||||
?R:do_add_route(TopicFilter, ?DEF_DS_SESSION_ID).
|
||||
|
||||
delete_route(TopicFilter) ->
|
||||
?R:do_delete_route(TopicFilter, ?DEF_DS_SESSION_ID).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
% t_lookup_routes(_) ->
|
||||
% error('TODO').
|
||||
|
||||
t_add_delete(_) ->
|
||||
add_route(<<"a/b/c">>),
|
||||
add_route(<<"a/b/c">>),
|
||||
add_route(<<"a/+/b">>),
|
||||
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
|
||||
delete_route(<<"a/b/c">>),
|
||||
delete_route(<<"a/+/b">>),
|
||||
?assertEqual([], ?R:topics()).
|
||||
|
||||
t_add_delete_incremental(_) ->
|
||||
add_route(<<"a/b/c">>),
|
||||
add_route(<<"a/+/c">>),
|
||||
add_route(<<"a/+/+">>),
|
||||
add_route(<<"a/b/#">>),
|
||||
add_route(<<"#">>),
|
||||
?assertEqual(
|
||||
[
|
||||
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
|
||||
],
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||
),
|
||||
delete_route(<<"a/+/c">>),
|
||||
?assertEqual(
|
||||
[
|
||||
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
|
||||
],
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||
),
|
||||
delete_route(<<"a/+/+">>),
|
||||
?assertEqual(
|
||||
[
|
||||
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
|
||||
],
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||
),
|
||||
delete_route(<<"a/b/#">>),
|
||||
?assertEqual(
|
||||
[
|
||||
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
|
||||
],
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||
),
|
||||
delete_route(<<"a/b/c">>),
|
||||
?assertEqual(
|
||||
[#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}],
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||
).
|
||||
|
||||
t_do_add_delete(_) ->
|
||||
add_route(<<"a/b/c">>),
|
||||
add_route(<<"a/b/c">>),
|
||||
add_route(<<"a/+/b">>),
|
||||
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
|
||||
|
||||
delete_route(<<"a/b/c">>),
|
||||
delete_route(<<"a/+/b">>),
|
||||
?assertEqual([], ?R:topics()).
|
||||
|
||||
t_match_routes(_) ->
|
||||
add_route(<<"a/b/c">>),
|
||||
add_route(<<"a/+/c">>),
|
||||
add_route(<<"a/b/#">>),
|
||||
add_route(<<"#">>),
|
||||
?assertEqual(
|
||||
[
|
||||
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
|
||||
#ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
|
||||
],
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||
),
|
||||
delete_route(<<"a/b/c">>),
|
||||
delete_route(<<"a/+/c">>),
|
||||
delete_route(<<"a/b/#">>),
|
||||
delete_route(<<"#">>),
|
||||
?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))).
|
||||
|
||||
t_print_routes(_) ->
|
||||
add_route(<<"+/#">>),
|
||||
add_route(<<"+/+">>),
|
||||
?R:print_routes(<<"a/b">>).
|
||||
|
||||
t_has_route(_) ->
|
||||
add_route(<<"devices/+/messages">>),
|
||||
?assert(?R:has_route(<<"devices/+/messages">>, ?DEF_DS_SESSION_ID)),
|
||||
delete_route(<<"devices/+/messages">>).
|
|
@ -15,7 +15,6 @@ start(_Type, _Args) ->
|
|||
emqx_ds_sup:start_link().
|
||||
|
||||
init_mnesia() ->
|
||||
%% FIXME: This is a temporary workaround to avoid crashes when starting on Windows
|
||||
ok = mria:create_table(
|
||||
?SESSION_TAB,
|
||||
[
|
||||
|
@ -39,6 +38,7 @@ init_mnesia() ->
|
|||
ok.
|
||||
|
||||
storage() ->
|
||||
%% FIXME: This is a temporary workaround to avoid crashes when starting on Windows
|
||||
case mria:rocksdb_backend_available() of
|
||||
true ->
|
||||
rocksdb_copies;
|
||||
|
|
Loading…
Reference in New Issue