From f1294736b7be7ac023b834d12bfaa20441500123 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 7 Sep 2023 14:39:53 -0300 Subject: [PATCH] feat(ds): add filter for message persistence Fixes https://emqx.atlassian.net/browse/EMQX-10520 --- apps/emqx/include/emqx.hrl | 1 + apps/emqx/src/emqx_persistent_session_ds.erl | 22 +- .../emqx_ps_ds_int.hrl | 33 +++ .../src/emqx_persistent_session_ds_router.erl | 226 ++++++++++++++++++ .../test/emqx_persistent_messages_SUITE.erl | 74 +++++- ...mqx_persistent_session_ds_router_SUITE.erl | 178 ++++++++++++++ apps/emqx_durable_storage/src/emqx_ds_app.erl | 2 +- 7 files changed, 521 insertions(+), 15 deletions(-) create mode 100644 apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl create mode 100644 apps/emqx/src/emqx_persistent_session_ds_router.erl create mode 100644 apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index bc1d66ca2..7c3eed70b 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,6 +23,7 @@ -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(CM_SHARD, emqx_cm_shard). -define(ROUTE_SHARD, route_shard). +-define(PS_ROUTER_SHARD, ps_router_shard). %% Banner %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 4aa9175cd..e2984e30b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -16,6 +16,7 @@ -module(emqx_persistent_session_ds). +-include("emqx.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([init/0]). @@ -56,11 +57,13 @@ %% init() -> - ?WHEN_ENABLED( + ?WHEN_ENABLED(begin ok = emqx_ds:ensure_shard(?DS_SHARD, #{ dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) - }) - ). + }), + ok = emqx_persistent_session_ds_router:init_tables(), + ok + end). %% @@ -71,8 +74,8 @@ persist_message(Msg) -> case needs_persistence(Msg) andalso find_subscribers(Msg) of [_ | _] -> store_message(Msg); - % [] -> - % {skipped, no_subscribers}; + [] -> + {skipped, no_subscribers}; false -> {skipped, needs_no_persistence} end @@ -87,8 +90,8 @@ store_message(Msg) -> Topic = emqx_topic:words(emqx_message:topic(Msg)), emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)). -find_subscribers(_Msg) -> - [node()]. +find_subscribers(#message{topic = Topic}) -> + emqx_persistent_session_ds_router:match_routes(Topic). open_session(ClientID) -> ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). @@ -98,6 +101,7 @@ open_session(ClientID) -> add_subscription(TopicFilterBin, DSSessionID) -> ?WHEN_ENABLED( begin + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID), TopicFilter = emqx_topic:words(TopicFilterBin), {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( DSSessionID, TopicFilter @@ -160,7 +164,9 @@ del_subscription(TopicFilterBin, DSSessionID) -> persistent_session_ds_iterator_delete, #{}, emqx_ds:session_del_iterator(DSSessionID, TopicFilter) - ) + ), + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID), + ok end ). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl new file mode 100644 index 000000000..157be320e --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_PS_DS_HRL). +-define(EMQX_PS_DS_HRL, true). + +-define(PS_ROUTER_TAB, emqx_ds_ps_router). +-define(PS_FILTERS_TAB, emqx_ds_ps_filters). + +-type dest() :: emqx_ds:session_id(). + +-record(ps_route, { + topic :: binary(), + dest :: emqx_ds:session_id() +}). +-record(ps_routeidx, { + entry :: emqx_topic_index:key(dest()), + unused = [] :: nil() +}). + +-endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl new file mode 100644 index 000000000..2bcaad0ea --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -0,0 +1,226 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_router). + +-include("emqx.hrl"). +-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). + +-export([init_tables/0]). + +%% Route APIs +-export([ + do_add_route/2, + do_delete_route/2, + match_routes/1, + lookup_routes/1, + foldr_routes/2, + foldl_routes/2 +]). + +-export([cleanup_routes/1]). +-export([print_routes/1]). +-export([topics/0]). + +-ifdef(TEST). +-export([has_route/2]). +-endif. + +%%-------------------------------------------------------------------- +%% Table Initialization +%%-------------------------------------------------------------------- + +init_tables() -> + mria_config:set_dirty_shard(?PS_ROUTER_SHARD, true), + ok = mria:create_table(?PS_ROUTER_TAB, [ + {type, bag}, + {rlog_shard, ?PS_ROUTER_SHARD}, + {storage, ram_copies}, + {record_name, ps_route}, + {attributes, record_info(fields, ps_route)}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ]} + ]), + ok = mria:create_table(?PS_FILTERS_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PS_ROUTER_SHARD}, + {storage, ram_copies}, + {record_name, ps_routeidx}, + {attributes, record_info(fields, ps_routeidx)}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, auto} + ]} + ]} + ]), + ok = mria:wait_for_tables([?PS_ROUTER_TAB, ?PS_FILTERS_TAB]), + ok. + +%%-------------------------------------------------------------------- +%% Route APIs +%%-------------------------------------------------------------------- + +-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. +do_add_route(Topic, Dest) when is_binary(Topic) -> + case has_route(Topic, Dest) of + true -> + ok; + false -> + mria_insert_route(Topic, Dest) + end. + +-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. +do_delete_route(Topic, Dest) -> + case emqx_trie_search:filter(Topic) of + Words when is_list(Words) -> + K = emqx_topic_index:make_key(Words, Dest), + mria:dirty_delete(?PS_FILTERS_TAB, K); + false -> + mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest}) + end. + +%% @doc Take a real topic (not filter) as input, return the matching topics and topic +%% filters associated with route destination. +-spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. +match_routes(Topic) when is_binary(Topic) -> + lookup_route_tab(Topic) ++ + [match_to_route(M) || M <- match_filters(Topic)]. + +%% @doc Take a topic or filter as input, and return the existing routes with exactly +%% this topic or filter. +-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. +lookup_routes(Topic) -> + case emqx_topic:wildcard(Topic) of + true -> + Pat = #ps_routeidx{entry = emqx_topic_index:make_key(Topic, '$1')}, + [Dest || [Dest] <- ets:match(?PS_FILTERS_TAB, Pat)]; + false -> + lookup_route_tab(Topic) + end. + +-spec has_route(emqx_types:topic(), dest()) -> boolean(). +has_route(Topic, Dest) -> + case emqx_topic:wildcard(Topic) of + true -> + ets:member(?PS_FILTERS_TAB, emqx_topic_index:make_key(Topic, Dest)); + false -> + has_route_tab_entry(Topic, Dest) + end. + +-spec topics() -> list(emqx_types:topic()). +topics() -> + Pat = #ps_routeidx{entry = '$1'}, + Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?PS_FILTERS_TAB, Pat)], + list_route_tab_topics() ++ Filters. + +%% @doc Print routes to a topic +-spec print_routes(emqx_types:topic()) -> ok. +print_routes(Topic) -> + lists:foreach( + fun(#ps_route{topic = To, dest = Dest}) -> + io:format("~ts -> ~ts~n", [To, Dest]) + end, + match_routes(Topic) + ). + +-spec cleanup_routes(emqx_ds:session_id()) -> ok. +cleanup_routes(DSSessionId) -> + %% NOTE + %% No point in transaction here because all the operations on filters table are dirty. + ok = ets:foldl( + fun(#ps_routeidx{entry = K}, ok) -> + case get_dest_session_id(emqx_topic_index:get_id(K)) of + DSSessionId -> + mria:dirty_delete(?PS_FILTERS_TAB, K); + _ -> + ok + end + end, + ok, + ?PS_FILTERS_TAB + ), + ok = ets:foldl( + fun(#ps_route{dest = Dest} = Route, ok) -> + case get_dest_session_id(Dest) of + DSSessionId -> + mria:dirty_delete_object(?PS_ROUTER_TAB, Route); + _ -> + ok + end + end, + ok, + ?PS_ROUTER_TAB + ). + +-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldl_routes(FoldFun, AccIn) -> + fold_routes(foldl, FoldFun, AccIn). + +-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldr_routes(FoldFun, AccIn) -> + fold_routes(foldr, FoldFun, AccIn). + +%%-------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------- + +mria_insert_route(Topic, Dest) -> + case emqx_trie_search:filter(Topic) of + Words when is_list(Words) -> + K = emqx_topic_index:make_key(Words, Dest), + mria:dirty_write(?PS_FILTERS_TAB, #ps_routeidx{entry = K}); + false -> + mria_route_tab_insert(#ps_route{topic = Topic, dest = Dest}) + end. + +fold_routes(FunName, FoldFun, AccIn) -> + FilterFoldFun = mk_filtertab_fold_fun(FoldFun), + Acc = ets:FunName(FoldFun, AccIn, ?PS_ROUTER_TAB), + ets:FunName(FilterFoldFun, Acc, ?PS_FILTERS_TAB). + +mk_filtertab_fold_fun(FoldFun) -> + fun(#ps_routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end. + +match_filters(Topic) -> + emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []). + +get_dest_session_id({_, DSSessionId}) -> + DSSessionId; +get_dest_session_id(DSSessionId) -> + DSSessionId. + +match_to_route(M) -> + #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. + +mria_route_tab_insert(Route) -> + mria:dirty_write(?PS_ROUTER_TAB, Route). + +lookup_route_tab(Topic) -> + ets:lookup(?PS_ROUTER_TAB, Topic). + +has_route_tab_entry(Topic, Dest) -> + [] =/= ets:match(?PS_ROUTER_TAB, #ps_route{topic = Topic, dest = Dest}). + +list_route_tab_topics() -> + mnesia:dirty_all_keys(?PS_ROUTER_TAB). + +mria_route_tab_delete(Route) -> + mria:dirty_delete_object(?PS_ROUTER_TAB, Route). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 8dbd34cd9..c4c689704 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -54,8 +55,12 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(t_session_subscription_iterators, Config) -> Nodes = ?config(nodes, Config), ok = emqx_cth_cluster:stop(Nodes), + ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>), + delete_all_messages(), ok; end_per_testcase(_TestCase, _Config) -> + ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>), + delete_all_messages(), ok. t_messages_persisted(_Config) -> @@ -75,12 +80,12 @@ t_messages_persisted(_Config) -> Messages = [ M1 = {<<"client/1/topic">>, <<"1">>}, M2 = {<<"client/2/topic">>, <<"2">>}, - M3 = {<<"client/3/topic/sub">>, <<"3">>}, - M4 = {<<"client/4">>, <<"4">>}, + _M3 = {<<"client/3/topic/sub">>, <<"3">>}, + _M4 = {<<"client/4">>, <<"4">>}, M5 = {<<"random/5">>, <<"5">>}, - M6 = {<<"random/6/topic">>, <<"6">>}, + _M6 = {<<"random/6/topic">>, <<"6">>}, M7 = {<<"client/7/topic">>, <<"7">>}, - M8 = {<<"client/8/topic/sub">>, <<"8">>}, + _M8 = {<<"client/8/topic/sub">>, <<"8">>}, M9 = {<<"random/9">>, <<"9">>}, M10 = {<<"random/10">>, <<"10">>} ], @@ -94,8 +99,53 @@ t_messages_persisted(_Config) -> ct:pal("Persisted = ~p", [Persisted]), ?assertEqual( - % [M1, M2, M5, M7, M9, M10], - [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10], + [M1, M2, M5, M7, M9, M10], + [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted] + ), + + ok. + +t_messages_persisted_2(_Config) -> + Prefix = atom_to_binary(?FUNCTION_NAME), + C1 = connect(<>, _CleanStart0 = true, _EI0 = 30), + CP = connect(<>, _CleanStart1 = true, _EI1 = undefined), + T = fun(T0) -> <> end, + + %% won't be persisted + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"random/topic">>), <<"0">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/1/topic">>), <<"1">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/2/topic">>), <<"2">>, 1), + + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, T(<<"client/+/topic">>), qos1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"random/topic">>), <<"3">>, 1), + %% will be persisted + {ok, #{reason_code := ?RC_SUCCESS}} = + emqtt:publish(CP, T(<<"client/1/topic">>), <<"4">>, 1), + {ok, #{reason_code := ?RC_SUCCESS}} = + emqtt:publish(CP, T(<<"client/2/topic">>), <<"5">>, 1), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C1, T(<<"client/+/topic">>)), + %% won't be persisted + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"random/topic">>), <<"6">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/1/topic">>), <<"7">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1), + + Persisted = consume(?DS_SHARD, {['#'], 0}), + + ct:pal("Persisted = ~p", [Persisted]), + + ?assertEqual( + [ + {T(<<"client/1/topic">>), <<"4">>}, + {T(<<"client/2/topic">>), <<"5">>} + ], [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted] ), @@ -224,6 +274,18 @@ consume(It) -> [] end. +delete_all_messages() -> + Persisted = consume(?DS_SHARD, {['#'], 0}), + lists:foreach( + fun(Msg) -> + GUID = emqx_message:id(Msg), + Topic = emqx_topic:words(emqx_message:topic(Msg)), + Timestamp = emqx_guid:timestamp(GUID), + ok = emqx_ds_storage_layer:delete(?DS_SHARD, GUID, Timestamp, Topic) + end, + Persisted + ). + receive_messages(Count) -> receive_messages(Count, []). diff --git a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl new file mode 100644 index 000000000..445ccd9a4 --- /dev/null +++ b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl @@ -0,0 +1,178 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_router_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(R, emqx_persistent_session_ds_router). +-define(DEF_DS_SESSION_ID, <<"some-client-id">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE]), + AppSpecs = [ + emqx_durable_storage, + {emqx, #{ + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker]}] + }} + ], + Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +init_per_testcase(_TestCase, Config) -> + clear_tables(), + Config. + +end_per_testcase(_TestCase, _Config) -> + clear_tables(). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +clear_tables() -> + lists:foreach( + fun mnesia:clear_table/1, + [?PS_ROUTER_TAB, ?PS_FILTERS_TAB] + ). + +add_route(TopicFilter) -> + ?R:do_add_route(TopicFilter, ?DEF_DS_SESSION_ID). + +delete_route(TopicFilter) -> + ?R:do_delete_route(TopicFilter, ?DEF_DS_SESSION_ID). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +% t_lookup_routes(_) -> +% error('TODO'). + +t_add_delete(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/b/c">>), + add_route(<<"a/+/b">>), + ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), + delete_route(<<"a/b/c">>), + delete_route(<<"a/+/b">>), + ?assertEqual([], ?R:topics()). + +t_add_delete_incremental(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/+/c">>), + add_route(<<"a/+/+">>), + add_route(<<"a/b/#">>), + add_route(<<"#">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/+/c">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/+/+">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/b/#">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/b/c">>), + ?assertEqual( + [#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ). + +t_do_add_delete(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/b/c">>), + add_route(<<"a/+/b">>), + ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), + + delete_route(<<"a/b/c">>), + delete_route(<<"a/+/b">>), + ?assertEqual([], ?R:topics()). + +t_match_routes(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/+/c">>), + add_route(<<"a/b/#">>), + add_route(<<"#">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/b/c">>), + delete_route(<<"a/+/c">>), + delete_route(<<"a/b/#">>), + delete_route(<<"#">>), + ?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))). + +t_print_routes(_) -> + add_route(<<"+/#">>), + add_route(<<"+/+">>), + ?R:print_routes(<<"a/b">>). + +t_has_route(_) -> + add_route(<<"devices/+/messages">>), + ?assert(?R:has_route(<<"devices/+/messages">>, ?DEF_DS_SESSION_ID)), + delete_route(<<"devices/+/messages">>). diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index cbcdb0b8c..7b36bd7bd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -15,7 +15,6 @@ start(_Type, _Args) -> emqx_ds_sup:start_link(). init_mnesia() -> - %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows ok = mria:create_table( ?SESSION_TAB, [ @@ -39,6 +38,7 @@ init_mnesia() -> ok. storage() -> + %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows case mria:rocksdb_backend_available() of true -> rocksdb_copies;