diff --git a/apps/emqx/src/emqx_external_broker.erl b/apps/emqx/src/emqx_external_broker.erl index fe360a5b8..5fcee71f0 100644 --- a/apps/emqx/src/emqx_external_broker.erl +++ b/apps/emqx/src/emqx_external_broker.erl @@ -43,7 +43,9 @@ add_shared_route/2, delete_shared_route/2, add_persistent_route/2, - delete_persistent_route/2 + delete_persistent_route/2, + add_persistent_shared_route/3, + delete_persistent_shared_route/3 ]). -export_type([dest/0]). @@ -129,6 +131,12 @@ add_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) -> ?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok). +add_persistent_shared_route(Topic, Group, ID) -> + ?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok). + +delete_persistent_shared_route(Topic, Group, ID) -> + ?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_router.erl index b0ee14963..1b80a28d2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_router.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_router.erl @@ -17,7 +17,7 @@ -module(emqx_persistent_session_ds_router). -include("emqx.hrl"). --include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). +-include("emqx_ps_ds_int.hrl"). -export([init_tables/0]). @@ -47,7 +47,7 @@ -endif. -type route() :: #ps_route{}. --type dest() :: emqx_persistent_session_ds:id(). +-type dest() :: emqx_persistent_session_ds:id() | #share_dest{}. -export_type([dest/0, route/0]). @@ -161,7 +161,7 @@ topics() -> print_routes(Topic) -> lists:foreach( fun(#ps_route{topic = To, dest = Dest}) -> - io:format("~ts -> ~ts~n", [To, Dest]) + io:format("~ts -> ~tp~n", [To, Dest]) end, match_routes(Topic) ). @@ -247,6 +247,8 @@ mk_filtertab_fold_fun(FoldFun) -> match_filters(Topic) -> emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []). +get_dest_session_id(#share_dest{session_id = DSSessionId}) -> + DSSessionId; get_dest_session_id({_, DSSessionId}) -> DSSessionId; get_dest_session_id(DSSessionId) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 634207d12..11b89441d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -178,6 +178,7 @@ create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicF ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{ session_id = SessionId, group = Group }), + _ = emqx_external_broker:add_persistent_shared_route(TopicFilter, Group, SessionId), #{upgrade_qos := UpgradeQoS} = Props, {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), @@ -272,6 +273,7 @@ on_unsubscribe( ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, share_topic_filter => ShareTopicFilter }), + _ = emqx_external_broker:delete_persistent_shared_route(TopicFilter, Group, SessionId), ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{ session_id = SessionId, group = Group }), diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl index dc487376b..e533cfcb9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl @@ -21,7 +21,7 @@ -record(ps_route, { topic :: binary(), - dest :: emqx_persistent_session_ds:id() | '_' + dest :: emqx_persistent_session_ds_router:dest() | '_' }). -record(ps_routeidx, { diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl index 32c675d8d..8a0c374ed 100644 --- a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -21,3 +21,6 @@ -define(METRIC_NAME, cluster_link). -define(route_metric, 'routes'). +-define(PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), + <<"$sp/", Group/binary, "/", ID/binary, "/", Topic/binary>> +). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 76228c052..d68ffb4be 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -16,6 +16,8 @@ delete_shared_route/2, add_persistent_route/2, delete_persistent_route/2, + add_persistent_shared_route/3, + delete_persistent_shared_route/3, forward/1 ]). @@ -71,6 +73,16 @@ add_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) -> maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route). +add_persistent_shared_route(Topic, Group, ID) -> + maybe_push_route_op( + add, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route + ). + +delete_persistent_shared_route(Topic, Group, ID) -> + maybe_push_route_op( + delete, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route + ). + forward(#delivery{message = #message{extra = #{link_origin := _}}}) -> %% Do not forward any external messages to other links. %% Only forward locally originated messages to all the relevant links, i.e. no gossip diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl index 1670c2ab4..6656c8c89 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl @@ -3,6 +3,7 @@ %%-------------------------------------------------------------------- -module(emqx_cluster_link_router_bootstrap). +-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_shared_sub.hrl"). -include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). @@ -67,7 +68,7 @@ routes_by_topic(Topics, _IsPersistentRoute = true) -> lists:foldl( fun(T, Acc) -> Routes = emqx_persistent_session_ds_router:lookup_routes(T), - [encode_route(T, ?PERSISTENT_ROUTE_ID(T, D)) || #ps_route{dest = D} <- Routes] ++ Acc + [encode_route(T, ps_route_id(PSRoute)) || #ps_route{} = PSRoute <- Routes] ++ Acc end, [], Topics @@ -79,17 +80,22 @@ routes_by_wildcards(Wildcards, _IsPersistentRoute = false) -> Routes ++ SharedRoutes; routes_by_wildcards(Wildcards, _IsPersistentRoute = true) -> emqx_persistent_session_ds_router:foldl_routes( - fun(#ps_route{dest = D, topic = T}, Acc) -> + fun(#ps_route{topic = T} = PSRoute, Acc) -> case topic_intersect_any(T, Wildcards) of false -> Acc; Intersec -> - [encode_route(Intersec, ?PERSISTENT_ROUTE_ID(T, D)) | Acc] + [encode_route(Intersec, ps_route_id(PSRoute)) | Acc] end end, [] ). +ps_route_id(#ps_route{topic = T, dest = #share_dest{group = Group, session_id = SessionId}}) -> + ?PERSISTENT_SHARED_ROUTE_ID(T, Group, SessionId); +ps_route_id(#ps_route{topic = T, dest = SessionId}) -> + ?PERSISTENT_ROUTE_ID(T, SessionId). + select_routes_by_topics(Topics) -> [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].