feat(queue): add integration with external broker

This commit is contained in:
Ilya Averyanov 2024-07-17 16:23:05 +03:00
parent f0dd1bc4f4
commit 23f0e88b45
7 changed files with 41 additions and 8 deletions

View File

@ -43,7 +43,9 @@
add_shared_route/2, add_shared_route/2,
delete_shared_route/2, delete_shared_route/2,
add_persistent_route/2, add_persistent_route/2,
delete_persistent_route/2 delete_persistent_route/2,
add_persistent_shared_route/3,
delete_persistent_shared_route/3
]). ]).
-export_type([dest/0]). -export_type([dest/0]).
@ -129,6 +131,12 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok). ?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
add_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
delete_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -17,7 +17,7 @@
-module(emqx_persistent_session_ds_router). -module(emqx_persistent_session_ds_router).
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). -include("emqx_ps_ds_int.hrl").
-export([init_tables/0]). -export([init_tables/0]).
@ -47,7 +47,7 @@
-endif. -endif.
-type route() :: #ps_route{}. -type route() :: #ps_route{}.
-type dest() :: emqx_persistent_session_ds:id(). -type dest() :: emqx_persistent_session_ds:id() | #share_dest{}.
-export_type([dest/0, route/0]). -export_type([dest/0, route/0]).
@ -161,7 +161,7 @@ topics() ->
print_routes(Topic) -> print_routes(Topic) ->
lists:foreach( lists:foreach(
fun(#ps_route{topic = To, dest = Dest}) -> fun(#ps_route{topic = To, dest = Dest}) ->
io:format("~ts -> ~ts~n", [To, Dest]) io:format("~ts -> ~tp~n", [To, Dest])
end, end,
match_routes(Topic) match_routes(Topic)
). ).
@ -247,6 +247,8 @@ mk_filtertab_fold_fun(FoldFun) ->
match_filters(Topic) -> match_filters(Topic) ->
emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []). emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
get_dest_session_id(#share_dest{session_id = DSSessionId}) ->
DSSessionId;
get_dest_session_id({_, DSSessionId}) -> get_dest_session_id({_, DSSessionId}) ->
DSSessionId; DSSessionId;
get_dest_session_id(DSSessionId) -> get_dest_session_id(DSSessionId) ->

View File

@ -178,6 +178,7 @@ create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicF
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{ ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group session_id = SessionId, group = Group
}), }),
_ = emqx_external_broker:add_persistent_shared_route(TopicFilter, Group, SessionId),
#{upgrade_qos := UpgradeQoS} = Props, #{upgrade_qos := UpgradeQoS} = Props,
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@ -272,6 +273,7 @@ on_unsubscribe(
?tp(persistent_session_ds_subscription_delete, #{ ?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, share_topic_filter => ShareTopicFilter session_id => SessionId, share_topic_filter => ShareTopicFilter
}), }),
_ = emqx_external_broker:delete_persistent_shared_route(TopicFilter, Group, SessionId),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{ ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group session_id = SessionId, group = Group
}), }),

View File

@ -21,7 +21,7 @@
-record(ps_route, { -record(ps_route, {
topic :: binary(), topic :: binary(),
dest :: emqx_persistent_session_ds:id() | '_' dest :: emqx_persistent_session_ds_router:dest() | '_'
}). }).
-record(ps_routeidx, { -record(ps_routeidx, {

View File

@ -21,3 +21,6 @@
-define(METRIC_NAME, cluster_link). -define(METRIC_NAME, cluster_link).
-define(route_metric, 'routes'). -define(route_metric, 'routes').
-define(PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID),
<<"$sp/", Group/binary, "/", ID/binary, "/", Topic/binary>>
).

View File

@ -16,6 +16,8 @@
delete_shared_route/2, delete_shared_route/2,
add_persistent_route/2, add_persistent_route/2,
delete_persistent_route/2, delete_persistent_route/2,
add_persistent_shared_route/3,
delete_persistent_shared_route/3,
forward/1 forward/1
]). ]).
@ -71,6 +73,16 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) ->
maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route). maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
add_persistent_shared_route(Topic, Group, ID) ->
maybe_push_route_op(
add, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
).
delete_persistent_shared_route(Topic, Group, ID) ->
maybe_push_route_op(
delete, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
).
forward(#delivery{message = #message{extra = #{link_origin := _}}}) -> forward(#delivery{message = #message{extra = #{link_origin := _}}}) ->
%% Do not forward any external messages to other links. %% Do not forward any external messages to other links.
%% Only forward locally originated messages to all the relevant links, i.e. no gossip %% Only forward locally originated messages to all the relevant links, i.e. no gossip

View File

@ -3,6 +3,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_cluster_link_router_bootstrap). -module(emqx_cluster_link_router_bootstrap).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_router.hrl").
-include_lib("emqx/include/emqx_shared_sub.hrl"). -include_lib("emqx/include/emqx_shared_sub.hrl").
-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). -include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
@ -67,7 +68,7 @@ routes_by_topic(Topics, _IsPersistentRoute = true) ->
lists:foldl( lists:foldl(
fun(T, Acc) -> fun(T, Acc) ->
Routes = emqx_persistent_session_ds_router:lookup_routes(T), Routes = emqx_persistent_session_ds_router:lookup_routes(T),
[encode_route(T, ?PERSISTENT_ROUTE_ID(T, D)) || #ps_route{dest = D} <- Routes] ++ Acc [encode_route(T, ps_route_id(PSRoute)) || #ps_route{} = PSRoute <- Routes] ++ Acc
end, end,
[], [],
Topics Topics
@ -79,17 +80,22 @@ routes_by_wildcards(Wildcards, _IsPersistentRoute = false) ->
Routes ++ SharedRoutes; Routes ++ SharedRoutes;
routes_by_wildcards(Wildcards, _IsPersistentRoute = true) -> routes_by_wildcards(Wildcards, _IsPersistentRoute = true) ->
emqx_persistent_session_ds_router:foldl_routes( emqx_persistent_session_ds_router:foldl_routes(
fun(#ps_route{dest = D, topic = T}, Acc) -> fun(#ps_route{topic = T} = PSRoute, Acc) ->
case topic_intersect_any(T, Wildcards) of case topic_intersect_any(T, Wildcards) of
false -> false ->
Acc; Acc;
Intersec -> Intersec ->
[encode_route(Intersec, ?PERSISTENT_ROUTE_ID(T, D)) | Acc] [encode_route(Intersec, ps_route_id(PSRoute)) | Acc]
end end
end, end,
[] []
). ).
ps_route_id(#ps_route{topic = T, dest = #share_dest{group = Group, session_id = SessionId}}) ->
?PERSISTENT_SHARED_ROUTE_ID(T, Group, SessionId);
ps_route_id(#ps_route{topic = T, dest = SessionId}) ->
?PERSISTENT_ROUTE_ID(T, SessionId).
select_routes_by_topics(Topics) -> select_routes_by_topics(Topics) ->
[encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []]. [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].