Merge pull request #11598 from thalesmg/ds-fix-ps-router-m-20230912
fixes and improvements to persistent session router
This commit is contained in:
commit
e41f7dd68c
|
@ -71,11 +71,9 @@ init() ->
|
||||||
ok | {skipped, _Reason} | {error, _TODO}.
|
ok | {skipped, _Reason} | {error, _TODO}.
|
||||||
persist_message(Msg) ->
|
persist_message(Msg) ->
|
||||||
?WHEN_ENABLED(
|
?WHEN_ENABLED(
|
||||||
case needs_persistence(Msg) andalso find_subscribers(Msg) of
|
case needs_persistence(Msg) andalso has_subscribers(Msg) of
|
||||||
[_ | _] ->
|
true ->
|
||||||
store_message(Msg);
|
store_message(Msg);
|
||||||
[] ->
|
|
||||||
{skipped, no_subscribers};
|
|
||||||
false ->
|
false ->
|
||||||
{skipped, needs_no_persistence}
|
{skipped, needs_no_persistence}
|
||||||
end
|
end
|
||||||
|
@ -90,8 +88,8 @@ store_message(Msg) ->
|
||||||
Topic = emqx_topic:words(emqx_message:topic(Msg)),
|
Topic = emqx_topic:words(emqx_message:topic(Msg)),
|
||||||
emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
|
emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
|
||||||
|
|
||||||
find_subscribers(#message{topic = Topic}) ->
|
has_subscribers(#message{topic = Topic}) ->
|
||||||
emqx_persistent_session_ds_router:match_routes(Topic).
|
emqx_persistent_session_ds_router:has_any_route(Topic).
|
||||||
|
|
||||||
open_session(ClientID) ->
|
open_session(ClientID) ->
|
||||||
?WHEN_ENABLED(emqx_ds:session_open(ClientID)).
|
?WHEN_ENABLED(emqx_ds:session_open(ClientID)).
|
||||||
|
@ -101,6 +99,24 @@ open_session(ClientID) ->
|
||||||
add_subscription(TopicFilterBin, DSSessionID) ->
|
add_subscription(TopicFilterBin, DSSessionID) ->
|
||||||
?WHEN_ENABLED(
|
?WHEN_ENABLED(
|
||||||
begin
|
begin
|
||||||
|
%% N.B.: we chose to update the router before adding the subscription to the
|
||||||
|
%% session/iterator table. The reasoning for this is as follows:
|
||||||
|
%%
|
||||||
|
%% Messages matching this topic filter should start to be persisted as soon as
|
||||||
|
%% possible to avoid missing messages. If this is the first such persistent
|
||||||
|
%% session subscription, it's important to do so early on.
|
||||||
|
%%
|
||||||
|
%% This could, in turn, lead to some inconsistency: if such a route gets
|
||||||
|
%% created but the session/iterator data fails to be updated accordingly, we
|
||||||
|
%% have a dangling route. To remove such dangling routes, we may have a
|
||||||
|
%% periodic GC process that removes routes that do not have a matching
|
||||||
|
%% persistent subscription. Also, route operations use dirty mnesia
|
||||||
|
%% operations, which inherently have room for inconsistencies.
|
||||||
|
%%
|
||||||
|
%% In practice, we use the iterator reference table as a source of truth,
|
||||||
|
%% since it is guarded by a transaction context: we consider a subscription
|
||||||
|
%% operation to be successful if it ended up changing this table. Both router
|
||||||
|
%% and iterator information can be reconstructed from this table, if needed.
|
||||||
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
|
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
|
||||||
TopicFilter = emqx_topic:words(TopicFilterBin),
|
TopicFilter = emqx_topic:words(TopicFilterBin),
|
||||||
{ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
|
{ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
|
||||||
|
@ -148,6 +164,8 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) ->
|
||||||
del_subscription(TopicFilterBin, DSSessionID) ->
|
del_subscription(TopicFilterBin, DSSessionID) ->
|
||||||
?WHEN_ENABLED(
|
?WHEN_ENABLED(
|
||||||
begin
|
begin
|
||||||
|
%% N.B.: see comments in `?MODULE:add_subscription' for a discussion about the
|
||||||
|
%% order of operations here.
|
||||||
TopicFilter = emqx_topic:words(TopicFilterBin),
|
TopicFilter = emqx_topic:words(TopicFilterBin),
|
||||||
case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
|
case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
-export([
|
-export([
|
||||||
do_add_route/2,
|
do_add_route/2,
|
||||||
do_delete_route/2,
|
do_delete_route/2,
|
||||||
|
has_any_route/1,
|
||||||
match_routes/1,
|
match_routes/1,
|
||||||
lookup_routes/1,
|
lookup_routes/1,
|
||||||
foldr_routes/2,
|
foldr_routes/2,
|
||||||
|
@ -52,7 +53,7 @@ init_tables() ->
|
||||||
ok = mria:create_table(?PS_ROUTER_TAB, [
|
ok = mria:create_table(?PS_ROUTER_TAB, [
|
||||||
{type, bag},
|
{type, bag},
|
||||||
{rlog_shard, ?PS_ROUTER_SHARD},
|
{rlog_shard, ?PS_ROUTER_SHARD},
|
||||||
{storage, ram_copies},
|
{storage, disc_copies},
|
||||||
{record_name, ps_route},
|
{record_name, ps_route},
|
||||||
{attributes, record_info(fields, ps_route)},
|
{attributes, record_info(fields, ps_route)},
|
||||||
{storage_properties, [
|
{storage_properties, [
|
||||||
|
@ -65,7 +66,7 @@ init_tables() ->
|
||||||
ok = mria:create_table(?PS_FILTERS_TAB, [
|
ok = mria:create_table(?PS_FILTERS_TAB, [
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{rlog_shard, ?PS_ROUTER_SHARD},
|
{rlog_shard, ?PS_ROUTER_SHARD},
|
||||||
{storage, ram_copies},
|
{storage, disc_copies},
|
||||||
{record_name, ps_routeidx},
|
{record_name, ps_routeidx},
|
||||||
{attributes, record_info(fields, ps_routeidx)},
|
{attributes, record_info(fields, ps_routeidx)},
|
||||||
{storage_properties, [
|
{storage_properties, [
|
||||||
|
@ -101,6 +102,19 @@ do_delete_route(Topic, Dest) ->
|
||||||
mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest})
|
mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Takes a real topic (not filter) as input, and returns whether there is any
|
||||||
|
%% matching filters.
|
||||||
|
-spec has_any_route(emqx_types:topic()) -> boolean().
|
||||||
|
has_any_route(Topic) ->
|
||||||
|
DirectTopicMatch = lookup_route_tab(Topic),
|
||||||
|
WildcardMatch = emqx_topic_index:match(Topic, ?PS_FILTERS_TAB),
|
||||||
|
case {DirectTopicMatch, WildcardMatch} of
|
||||||
|
{[], false} ->
|
||||||
|
false;
|
||||||
|
{_, _} ->
|
||||||
|
true
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc Take a real topic (not filter) as input, return the matching topics and topic
|
%% @doc Take a real topic (not filter) as input, return the matching topics and topic
|
||||||
%% filters associated with route destination.
|
%% filters associated with route destination.
|
||||||
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
||||||
|
|
|
@ -66,6 +66,7 @@ match(Topic, Tab) ->
|
||||||
|
|
||||||
%% @doc Match given topic against the index and return _all_ matches.
|
%% @doc Match given topic against the index and return _all_ matches.
|
||||||
%% If `unique` option is given, return only unique matches by record ID.
|
%% If `unique` option is given, return only unique matches by record ID.
|
||||||
|
-spec matches(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [match(_ID)].
|
||||||
matches(Topic, Tab, Opts) ->
|
matches(Topic, Tab, Opts) ->
|
||||||
emqx_trie_search:matches(Topic, make_nextf(Tab), Opts).
|
emqx_trie_search:matches(Topic, make_nextf(Tab), Opts).
|
||||||
|
|
||||||
|
|
|
@ -80,12 +80,21 @@ delete_route(TopicFilter) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
t_add_delete(_) ->
|
t_add_delete(_) ->
|
||||||
|
?assertNot(?R:has_any_route(<<"a/b/c">>)),
|
||||||
add_route(<<"a/b/c">>),
|
add_route(<<"a/b/c">>),
|
||||||
|
?assert(?R:has_any_route(<<"a/b/c">>)),
|
||||||
add_route(<<"a/b/c">>),
|
add_route(<<"a/b/c">>),
|
||||||
|
?assert(?R:has_any_route(<<"a/b/c">>)),
|
||||||
add_route(<<"a/+/b">>),
|
add_route(<<"a/+/b">>),
|
||||||
|
?assert(?R:has_any_route(<<"a/b/c">>)),
|
||||||
|
?assert(?R:has_any_route(<<"a/c/b">>)),
|
||||||
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
|
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
|
||||||
delete_route(<<"a/b/c">>),
|
delete_route(<<"a/b/c">>),
|
||||||
|
?assertNot(?R:has_any_route(<<"a/b/c">>)),
|
||||||
|
?assert(?R:has_any_route(<<"a/c/b">>)),
|
||||||
delete_route(<<"a/+/b">>),
|
delete_route(<<"a/+/b">>),
|
||||||
|
?assertNot(?R:has_any_route(<<"a/b/c">>)),
|
||||||
|
?assertNot(?R:has_any_route(<<"a/c/b">>)),
|
||||||
?assertEqual([], ?R:topics()).
|
?assertEqual([], ?R:topics()).
|
||||||
|
|
||||||
t_add_delete_incremental(_) ->
|
t_add_delete_incremental(_) ->
|
||||||
|
@ -94,6 +103,7 @@ t_add_delete_incremental(_) ->
|
||||||
add_route(<<"a/+/+">>),
|
add_route(<<"a/+/+">>),
|
||||||
add_route(<<"a/b/#">>),
|
add_route(<<"a/b/#">>),
|
||||||
add_route(<<"#">>),
|
add_route(<<"#">>),
|
||||||
|
?assert(?R:has_any_route(<<"any/topic">>)),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[
|
[
|
||||||
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
|
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
|
|
||||||
-dialyzer({nowarn_function, storage/0}).
|
-dialyzer({nowarn_function, storage/0}).
|
||||||
|
|
||||||
-export([start/2]).
|
-export([start/2, storage/0]).
|
||||||
|
|
||||||
-include("emqx_ds_int.hrl").
|
-include("emqx_ds_int.hrl").
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue