From ed16ff07dfce00d3cc10a490b9e50f28514014fb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 5 Jun 2024 18:21:22 +0200 Subject: [PATCH] refactor(broker): simplify external broker behaviour --- apps/emqx/src/emqx_broker.erl | 23 +++---------- apps/emqx/src/emqx_external_broker.erl | 25 +++++--------- .../src/emqx_cluster_link.erl | 34 +++++++++++-------- 3 files changed, 33 insertions(+), 49 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index df6898470..8fa011429 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -246,7 +246,8 @@ publish(Msg) when is_record(Msg, message) -> []; Msg1 = #message{} -> do_publish(Msg1); - Msgs when is_list(Msgs) -> do_publish_many(Msgs) + Msgs when is_list(Msgs) -> + do_publish_many(Msgs) end. do_publish_many([]) -> @@ -259,7 +260,7 @@ do_publish(#message{topic = Topic} = Msg) -> Routes = aggre(emqx_router:match_routes(Topic)), Delivery = delivery(Msg), RouteRes = route(Routes, Delivery, PersistRes), - ext_route(ext_routes(Topic, Msg), Delivery, RouteRes). + do_forward_external(Delivery, RouteRes). persist_publish(Msg) -> case emqx_persistent_message:persist(Msg) of @@ -344,22 +345,8 @@ aggre([], false, Acc) -> aggre([], true, Acc) -> lists:usort(Acc). -ext_routes(Topic, Msg) -> - case emqx_external_broker:should_route_to_external_dests(Msg) of - true -> emqx_external_broker:match_routes(Topic); - false -> [] - end. - -ext_route([], _Delivery, RouteRes) -> - RouteRes; -ext_route(ExtRoutes, Delivery, RouteRes) -> - lists:foldl( - fun(#route{topic = To, dest = ExtDest}, Acc) -> - [{ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)} | Acc] - end, - RouteRes, - ExtRoutes - ). +do_forward_external(Delivery, RouteRes) -> + emqx_external_broker:forward(Delivery) ++ RouteRes. %% @doc Forward message to another node. -spec forward( diff --git a/apps/emqx/src/emqx_external_broker.erl b/apps/emqx/src/emqx_external_broker.erl index ebcd48994..253d73edd 100644 --- a/apps/emqx/src/emqx_external_broker.erl +++ b/apps/emqx/src/emqx_external_broker.erl @@ -16,11 +16,9 @@ -module(emqx_external_broker). --callback forward(dest(), emqx_types:delivery()) -> +-callback forward(emqx_types:delivery()) -> emqx_types:deliver_result(). --callback should_route_to_external_dests(emqx_types:message()) -> boolean(). - -callback add_route(emqx_types:topic()) -> ok. -callback delete_route(emqx_types:topic()) -> ok. @@ -30,23 +28,22 @@ -callback add_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok. -callback delete_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok. --callback match_routes(emqx_types:topic()) -> [emqx_types:route()]. - -type dest() :: term(). -export([ + %% Registration provider/0, register_provider/1, unregister_provider/1, - forward/2, - should_route_to_external_dests/1, + %% Forwarding + forward/1, + %% Routing updates add_route/1, delete_route/1, add_shared_route/2, delete_shared_route/2, add_persistent_route/2, - delete_persistent_route/2, - match_routes/1 + delete_persistent_route/2 ]). -export_type([dest/0]). @@ -111,11 +108,8 @@ provider() -> %% Broker API %%-------------------------------------------------------------------- -forward(ExternalDest, Delivery) -> - ?safe_with_provider(?FUNCTION_NAME(ExternalDest, Delivery), {error, unknown_dest}). - -should_route_to_external_dests(Message) -> - ?safe_with_provider(?FUNCTION_NAME(Message), false). +forward(Delivery) -> + ?safe_with_provider(?FUNCTION_NAME(Delivery), []). add_route(Topic) -> ?safe_with_provider(?FUNCTION_NAME(Topic), ok). @@ -135,9 +129,6 @@ add_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) -> ?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok). -match_routes(Topic) -> - ?safe_with_provider(?FUNCTION_NAME(Topic), ok). - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index cdfe22f3d..19211cb56 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -16,9 +16,7 @@ delete_shared_route/2, add_persistent_route/2, delete_persistent_route/2, - match_routes/1, - forward/2, - should_route_to_external_dests/1 + forward/1 ]). %% emqx hooks @@ -73,18 +71,26 @@ add_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) -> maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route). -forward(DestCluster, Delivery) -> - emqx_cluster_link_mqtt:forward(DestCluster, Delivery). +forward(#delivery{message = #message{extra = #{link_origin := _}}}) -> + %% Do not forward any external messages to other links. + %% Only forward locally originated messages to all the relevant links, i.e. no gossip + %% message forwarding. + []; +forward(Delivery = #delivery{message = #message{topic = Topic}}) -> + Routes = emqx_cluster_link_extrouter:match_routes(Topic), + forward(Routes, Delivery). -match_routes(Topic) -> - emqx_cluster_link_extrouter:match_routes(Topic). - -%% Do not forward any external messages to other links. -%% Only forward locally originated messages to all the relevant links, i.e. no gossip message forwarding. -should_route_to_external_dests(#message{extra = #{link_origin := _}}) -> - false; -should_route_to_external_dests(_Msg) -> - true. +forward([], _Delivery) -> + []; +forward(Routes, Delivery) -> + lists:foldl( + fun(#route{topic = To, dest = Cluster}, Acc) -> + Result = emqx_cluster_link_mqtt:forward(Cluster, Delivery), + [{Cluster, To, Result} | Acc] + end, + [], + Routes + ). %%-------------------------------------------------------------------- %% EMQX Hooks