From cae9ae1fab208747039210355074ccce4468d64e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 12 Sep 2023 16:07:55 -0300 Subject: [PATCH 1/4] fix(ps_router): use `disc_copies` for storing persistent session routes --- apps/emqx/src/emqx_persistent_session_ds_router.erl | 4 ++-- apps/emqx_durable_storage/src/emqx_ds_app.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl index 4400b23ff..cbb5175bc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_router.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -52,7 +52,7 @@ init_tables() -> ok = mria:create_table(?PS_ROUTER_TAB, [ {type, bag}, {rlog_shard, ?PS_ROUTER_SHARD}, - {storage, ram_copies}, + {storage, disc_copies}, {record_name, ps_route}, {attributes, record_info(fields, ps_route)}, {storage_properties, [ @@ -65,7 +65,7 @@ init_tables() -> ok = mria:create_table(?PS_FILTERS_TAB, [ {type, ordered_set}, {rlog_shard, ?PS_ROUTER_SHARD}, - {storage, ram_copies}, + {storage, disc_copies}, {record_name, ps_routeidx}, {attributes, record_info(fields, ps_routeidx)}, {storage_properties, [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 7b36bd7bd..3c9d2652e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -6,7 +6,7 @@ -dialyzer({nowarn_function, storage/0}). --export([start/2]). +-export([start/2, storage/0]). -include("emqx_ds_int.hrl"). From e081abb9e691014f6e7423802a7f3f7df9d58c85 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 12 Sep 2023 16:29:43 -0300 Subject: [PATCH 2/4] perf(ps_router): only check if there are any routes when deciding whether to persist message --- apps/emqx/src/emqx_persistent_session_ds.erl | 10 ++++------ .../emqx/src/emqx_persistent_session_ds_router.erl | 14 ++++++++++++++ apps/emqx/src/emqx_topic_index.erl | 1 + .../emqx_persistent_session_ds_router_SUITE.erl | 10 ++++++++++ 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index e2984e30b..68ae15c6c 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -71,11 +71,9 @@ init() -> ok | {skipped, _Reason} | {error, _TODO}. persist_message(Msg) -> ?WHEN_ENABLED( - case needs_persistence(Msg) andalso find_subscribers(Msg) of - [_ | _] -> + case needs_persistence(Msg) andalso has_subscribers(Msg) of + true -> store_message(Msg); - [] -> - {skipped, no_subscribers}; false -> {skipped, needs_no_persistence} end @@ -90,8 +88,8 @@ store_message(Msg) -> Topic = emqx_topic:words(emqx_message:topic(Msg)), emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)). -find_subscribers(#message{topic = Topic}) -> - emqx_persistent_session_ds_router:match_routes(Topic). +has_subscribers(#message{topic = Topic}) -> + emqx_persistent_session_ds_router:has_any_route(Topic). open_session(ClientID) -> ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl index cbb5175bc..324db991a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_router.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -25,6 +25,7 @@ -export([ do_add_route/2, do_delete_route/2, + has_any_route/1, match_routes/1, lookup_routes/1, foldr_routes/2, @@ -101,6 +102,19 @@ do_delete_route(Topic, Dest) -> mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest}) end. +%% @doc Takes a real topic (not filter) as input, and returns whether there is any +%% matching filters. +-spec has_any_route(emqx_types:topic()) -> boolean(). +has_any_route(Topic) -> + DirectTopicMatch = lookup_route_tab(Topic), + WildcardMatch = emqx_topic_index:match(Topic, ?PS_FILTERS_TAB), + case {DirectTopicMatch, WildcardMatch} of + {[], false} -> + false; + {_, _} -> + true + end. + %% @doc Take a real topic (not filter) as input, return the matching topics and topic %% filters associated with route destination. -spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index 59dfdfeab..9b1982cc6 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -66,6 +66,7 @@ match(Topic, Tab) -> %% @doc Match given topic against the index and return _all_ matches. %% If `unique` option is given, return only unique matches by record ID. +-spec matches(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [key(_ID)]. matches(Topic, Tab, Opts) -> emqx_trie_search:matches(Topic, make_nextf(Tab), Opts). diff --git a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl index 742dc5b41..3e48173c3 100644 --- a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl @@ -80,12 +80,21 @@ delete_route(TopicFilter) -> % error('TODO'). t_add_delete(_) -> + ?assertNot(?R:has_any_route(<<"a/b/c">>)), add_route(<<"a/b/c">>), + ?assert(?R:has_any_route(<<"a/b/c">>)), add_route(<<"a/b/c">>), + ?assert(?R:has_any_route(<<"a/b/c">>)), add_route(<<"a/+/b">>), + ?assert(?R:has_any_route(<<"a/b/c">>)), + ?assert(?R:has_any_route(<<"a/c/b">>)), ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), delete_route(<<"a/b/c">>), + ?assertNot(?R:has_any_route(<<"a/b/c">>)), + ?assert(?R:has_any_route(<<"a/c/b">>)), delete_route(<<"a/+/b">>), + ?assertNot(?R:has_any_route(<<"a/b/c">>)), + ?assertNot(?R:has_any_route(<<"a/c/b">>)), ?assertEqual([], ?R:topics()). t_add_delete_incremental(_) -> @@ -94,6 +103,7 @@ t_add_delete_incremental(_) -> add_route(<<"a/+/+">>), add_route(<<"a/b/#">>), add_route(<<"#">>), + ?assert(?R:has_any_route(<<"any/topic">>)), ?assertEqual( [ #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, From f55f6dc7797736b86bb35a4a063ebe8fb2186225 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 13 Sep 2023 10:11:00 -0300 Subject: [PATCH 3/4] docs(ps_session): add notes explaining rationale about operation order and consistency --- apps/emqx/src/emqx_persistent_session_ds.erl | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 68ae15c6c..aed3ece82 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -99,6 +99,24 @@ open_session(ClientID) -> add_subscription(TopicFilterBin, DSSessionID) -> ?WHEN_ENABLED( begin + %% N.B.: we chose to update the router before adding the subscription to the + %% session/iterator table. The reasoning for this is as follows: + %% + %% Messages matching this topic filter should start to be persisted as soon as + %% possible to avoid missing messages. If this is the first such persistent + %% session subscription, it's important to do so early on. + %% + %% This could, in turn, lead to some inconsistency: if such a route gets + %% created but the session/iterator data fails to be updated accordingly, we + %% have a dangling route. To remove such dangling routes, we may have a + %% periodic GC process that removes routes that do not have a matching + %% persistent subscription. Also, route operations use dirty mnesia + %% operations, which inherently have room for inconsistencies. + %% + %% In practice, we use the iterator reference table as a source of truth, + %% since it is guarded by a transaction context: we consider a subscription + %% operation to be successful if it ended up changing this table. Both router + %% and iterator information can be reconstructed from this table, if needed. ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID), TopicFilter = emqx_topic:words(TopicFilterBin), {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( @@ -146,6 +164,8 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) -> del_subscription(TopicFilterBin, DSSessionID) -> ?WHEN_ENABLED( begin + %% N.B.: see comments in `?MODULE:add_subscription' for a discussion about the + %% order of operations here. TopicFilter = emqx_topic:words(TopicFilterBin), case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of {error, not_found} -> From 803b69d8783af02aaedc9534deddfdd3164c5ed3 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 13 Sep 2023 10:49:45 -0300 Subject: [PATCH 4/4] refactor: use more consistent return types Co-authored-by: Andrew Mayorov --- apps/emqx/src/emqx_topic_index.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index 9b1982cc6..eaedb2e53 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -66,7 +66,7 @@ match(Topic, Tab) -> %% @doc Match given topic against the index and return _all_ matches. %% If `unique` option is given, return only unique matches by record ID. --spec matches(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [key(_ID)]. +-spec matches(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [match(_ID)]. matches(Topic, Tab, Opts) -> emqx_trie_search:matches(Topic, make_nextf(Tab), Opts).