refactor(broker): simplify external broker behaviour

This commit is contained in:
Andrew Mayorov 2024-06-05 18:21:22 +02:00 committed by Serge Tupchii
parent d282c61120
commit ed16ff07df
3 changed files with 33 additions and 49 deletions

View File

@ -246,7 +246,8 @@ publish(Msg) when is_record(Msg, message) ->
[]; [];
Msg1 = #message{} -> Msg1 = #message{} ->
do_publish(Msg1); do_publish(Msg1);
Msgs when is_list(Msgs) -> do_publish_many(Msgs) Msgs when is_list(Msgs) ->
do_publish_many(Msgs)
end. end.
do_publish_many([]) -> do_publish_many([]) ->
@ -259,7 +260,7 @@ do_publish(#message{topic = Topic} = Msg) ->
Routes = aggre(emqx_router:match_routes(Topic)), Routes = aggre(emqx_router:match_routes(Topic)),
Delivery = delivery(Msg), Delivery = delivery(Msg),
RouteRes = route(Routes, Delivery, PersistRes), RouteRes = route(Routes, Delivery, PersistRes),
ext_route(ext_routes(Topic, Msg), Delivery, RouteRes). do_forward_external(Delivery, RouteRes).
persist_publish(Msg) -> persist_publish(Msg) ->
case emqx_persistent_message:persist(Msg) of case emqx_persistent_message:persist(Msg) of
@ -344,22 +345,8 @@ aggre([], false, Acc) ->
aggre([], true, Acc) -> aggre([], true, Acc) ->
lists:usort(Acc). lists:usort(Acc).
ext_routes(Topic, Msg) -> do_forward_external(Delivery, RouteRes) ->
case emqx_external_broker:should_route_to_external_dests(Msg) of emqx_external_broker:forward(Delivery) ++ RouteRes.
true -> emqx_external_broker:match_routes(Topic);
false -> []
end.
ext_route([], _Delivery, RouteRes) ->
RouteRes;
ext_route(ExtRoutes, Delivery, RouteRes) ->
lists:foldl(
fun(#route{topic = To, dest = ExtDest}, Acc) ->
[{ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)} | Acc]
end,
RouteRes,
ExtRoutes
).
%% @doc Forward message to another node. %% @doc Forward message to another node.
-spec forward( -spec forward(

View File

@ -16,11 +16,9 @@
-module(emqx_external_broker). -module(emqx_external_broker).
-callback forward(dest(), emqx_types:delivery()) -> -callback forward(emqx_types:delivery()) ->
emqx_types:deliver_result(). emqx_types:deliver_result().
-callback should_route_to_external_dests(emqx_types:message()) -> boolean().
-callback add_route(emqx_types:topic()) -> ok. -callback add_route(emqx_types:topic()) -> ok.
-callback delete_route(emqx_types:topic()) -> ok. -callback delete_route(emqx_types:topic()) -> ok.
@ -30,23 +28,22 @@
-callback add_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok. -callback add_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
-callback delete_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok. -callback delete_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
-callback match_routes(emqx_types:topic()) -> [emqx_types:route()].
-type dest() :: term(). -type dest() :: term().
-export([ -export([
%% Registration
provider/0, provider/0,
register_provider/1, register_provider/1,
unregister_provider/1, unregister_provider/1,
forward/2, %% Forwarding
should_route_to_external_dests/1, forward/1,
%% Routing updates
add_route/1, add_route/1,
delete_route/1, delete_route/1,
add_shared_route/2, add_shared_route/2,
delete_shared_route/2, delete_shared_route/2,
add_persistent_route/2, add_persistent_route/2,
delete_persistent_route/2, delete_persistent_route/2
match_routes/1
]). ]).
-export_type([dest/0]). -export_type([dest/0]).
@ -111,11 +108,8 @@ provider() ->
%% Broker API %% Broker API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
forward(ExternalDest, Delivery) -> forward(Delivery) ->
?safe_with_provider(?FUNCTION_NAME(ExternalDest, Delivery), {error, unknown_dest}). ?safe_with_provider(?FUNCTION_NAME(Delivery), []).
should_route_to_external_dests(Message) ->
?safe_with_provider(?FUNCTION_NAME(Message), false).
add_route(Topic) -> add_route(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok). ?safe_with_provider(?FUNCTION_NAME(Topic), ok).
@ -135,9 +129,6 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok). ?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
match_routes(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -16,9 +16,7 @@
delete_shared_route/2, delete_shared_route/2,
add_persistent_route/2, add_persistent_route/2,
delete_persistent_route/2, delete_persistent_route/2,
match_routes/1, forward/1
forward/2,
should_route_to_external_dests/1
]). ]).
%% emqx hooks %% emqx hooks
@ -73,18 +71,26 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) -> delete_persistent_route(Topic, ID) ->
maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route). maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
forward(DestCluster, Delivery) -> forward(#delivery{message = #message{extra = #{link_origin := _}}}) ->
emqx_cluster_link_mqtt:forward(DestCluster, Delivery). %% Do not forward any external messages to other links.
%% Only forward locally originated messages to all the relevant links, i.e. no gossip
%% message forwarding.
[];
forward(Delivery = #delivery{message = #message{topic = Topic}}) ->
Routes = emqx_cluster_link_extrouter:match_routes(Topic),
forward(Routes, Delivery).
match_routes(Topic) -> forward([], _Delivery) ->
emqx_cluster_link_extrouter:match_routes(Topic). [];
forward(Routes, Delivery) ->
%% Do not forward any external messages to other links. lists:foldl(
%% Only forward locally originated messages to all the relevant links, i.e. no gossip message forwarding. fun(#route{topic = To, dest = Cluster}, Acc) ->
should_route_to_external_dests(#message{extra = #{link_origin := _}}) -> Result = emqx_cluster_link_mqtt:forward(Cluster, Delivery),
false; [{Cluster, To, Result} | Acc]
should_route_to_external_dests(_Msg) -> end,
true. [],
Routes
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% EMQX Hooks %% EMQX Hooks