diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 73e1cd144..0ad01c0ef 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -67,7 +67,7 @@ -record(route, { topic :: binary(), - dest :: node() | {binary(), node()} | emqx_session:session_id() + dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest() }). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index d42258611..5744f2e74 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -256,9 +256,10 @@ do_publish_many([Msg | T]) -> do_publish(#message{topic = Topic} = Msg) -> PersistRes = persist_publish(Msg), - {Routes, ExtRoutes} = aggre(emqx_router:match_routes(Topic)), - Routes1 = maybe_add_ext_routes(ExtRoutes, Routes, Msg), - route(Routes1, delivery(Msg), PersistRes). + Routes = aggre(emqx_router:match_routes(Topic)), + Delivery = delivery(Msg), + RouteRes = route(Routes, Delivery, PersistRes), + ext_route(ext_routes(Topic, Msg), Delivery, RouteRes). persist_publish(Msg) -> case emqx_persistent_message:persist(Msg) of @@ -322,41 +323,44 @@ do_route({To, Node}, Delivery) when Node =:= node() -> {Node, To, dispatch(To, Delivery)}; do_route({To, Node}, Delivery) when is_atom(Node) -> {Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))}; -do_route({To, {external, _} = ExtDest}, Delivery) -> - {ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)}; do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. aggre([]) -> - {[], []}; + []; aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> - {[{To, Node}], []}; -aggre([#route{topic = To, dest = {external, _} = ExtDest}]) -> - {[], [{To, ExtDest}]}; + [{To, Node}]; aggre([#route{topic = To, dest = {Group, _Node}}]) -> - {[{To, Group}], []}; + [{To, Group}]; aggre(Routes) -> - aggre(Routes, false, {[], []}). + aggre(Routes, false, []). -aggre([#route{topic = To, dest = Node} | Rest], Dedup, {Acc, ExtAcc}) when is_atom(Node) -> - aggre(Rest, Dedup, {[{To, Node} | Acc], ExtAcc}); -aggre([#route{topic = To, dest = {external, _} = ExtDest} | Rest], Dedup, {Acc, ExtAcc}) -> - aggre(Rest, Dedup, {Acc, [{To, ExtDest} | ExtAcc]}); -aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, {Acc, ExtAcc}) -> - aggre(Rest, true, {[{To, Group} | Acc], ExtAcc}); +aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) -> + aggre(Rest, Dedup, [{To, Node} | Acc]); +aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) -> + aggre(Rest, true, [{To, Group} | Acc]); aggre([], false, Acc) -> Acc; -aggre([], true, {Acc, ExtAcc}) -> - {lists:usort(Acc), lists:usort(ExtAcc)}. +aggre([], true, Acc) -> + lists:usort(Acc). -maybe_add_ext_routes([] = _ExtRoutes, Routes, _Msg) -> - Routes; -maybe_add_ext_routes(ExtRoutes, Routes, Msg) -> +ext_routes(Topic, Msg) -> case emqx_external_broker:should_route_to_external_dests(Msg) of - true -> Routes ++ ExtRoutes; - false -> Routes + true -> emqx_external_broker:match_routes(Topic); + false -> [] end. +ext_route([], _Delivery, RouteRes) -> + RouteRes; +ext_route(ExtRoutes, Delivery, RouteRes) -> + lists:foldl( + fun(#route{topic = To, dest = ExtDest}, Acc) -> + [{ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)} | Acc] + end, + RouteRes, + ExtRoutes + ). + %% @doc Forward message to another node. -spec forward( node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async diff --git a/apps/emqx/src/emqx_external_broker.erl b/apps/emqx/src/emqx_external_broker.erl index a9af9ddc9..3b3ff83c6 100644 --- a/apps/emqx/src/emqx_external_broker.erl +++ b/apps/emqx/src/emqx_external_broker.erl @@ -24,6 +24,10 @@ -callback maybe_add_route(emqx_types:topic()) -> ok. -callback maybe_delete_route(emqx_types:topic()) -> ok. +-callback match_routes(emqx_types:topic()) -> [emqx_types:route()]. + +-type dest() :: term(). + -export([ provider/0, register_provider/1, @@ -31,9 +35,12 @@ forward/2, should_route_to_external_dests/1, maybe_add_route/1, - maybe_delete_route/1 + maybe_delete_route/1, + match_routes/1 ]). +-export_type([dest/0]). + -include("logger.hrl"). -define(PROVIDER, {?MODULE, external_broker}). @@ -106,6 +113,9 @@ maybe_add_route(Topic) -> maybe_delete_route(Topic) -> ?safe_with_provider(?FUNCTION_NAME(Topic), ok). +match_routes(Topic) -> + ?safe_with_provider(?FUNCTION_NAME(Topic), ok). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 55b9ab079..c60bd2383 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -95,8 +95,7 @@ -export_type([schemavsn/0]). -type group() :: binary(). --type external_dest() :: {external, term()}. --type dest() :: node() | {group(), node()} | external_dest(). +-type dest() :: node() | {group(), node()}. -type schemavsn() :: v1 | v2. %% Operation :: {add, ...} | {delete, ...}. diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 03a3c8a0f..d8dd1cff4 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -267,7 +267,7 @@ [ {node(), topic(), deliver_result()} | {share, topic(), deliver_result()} - | {emqx_router:external_dest(), topic(), deliver_result()} + | {emqx_external_broker:dest(), topic(), deliver_result()} | persisted ] | disconnect. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index ae3647d4a..579f58dce 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -11,6 +11,7 @@ unregister_external_broker/0, maybe_add_route/1, maybe_delete_route/1, + match_routes/1, forward/2, should_route_to_external_dests/1 ]). @@ -38,15 +39,16 @@ unregister_external_broker() -> emqx_external_broker:unregister_provider(?MODULE). maybe_add_route(Topic) -> - emqx_cluster_link_coordinator:route_op(<<"add">>, Topic). + maybe_push_route_op(add, Topic). -maybe_delete_route(_Topic) -> - %% Not implemented yet - %% emqx_cluster_link_coordinator:route_op(<<"delete">>, Topic). - ok. +maybe_delete_route(Topic) -> + maybe_push_route_op(delete, Topic). -forward(ExternalDest, Delivery) -> - emqx_cluster_link_mqtt:forward(ExternalDest, Delivery). +forward(DestCluster, Delivery) -> + emqx_cluster_link_mqtt:forward(DestCluster, Delivery). + +match_routes(Topic) -> + emqx_cluster_link_extrouter:match_routes(Topic). %% Do not forward any external messages to other links. %% Only forward locally originated messages to all the relevant links, i.e. no gossip message forwarding. @@ -105,6 +107,28 @@ delete_hook() -> %% Internal functions %%-------------------------------------------------------------------- +maybe_push_route_op(Op, Topic) -> + lists:foreach( + fun(#{upstream := Cluster, topics := LinkFilters}) -> + case topic_intersect_any(Topic, LinkFilters) of + false -> + ok; + TopicIntersection -> + ID = Topic, + emqx_cluster_link_router_syncer:push(Cluster, Op, TopicIntersection, ID) + end + end, + emqx_cluster_link_config:enabled_links() + ). + +topic_intersect_any(Topic, [LinkFilter | T]) -> + case emqx_topic:intersection(Topic, LinkFilter) of + false -> topic_intersect_any(Topic, T); + TopicOrFilter -> TopicOrFilter + end; +topic_intersect_any(_Topic, []) -> + false. + actor_init(ClusterName, Actor, Incarnation) -> Env = #{timestamp => erlang:system_time(millisecond)}, {ok, _} = emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incarnation, Env). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index bdbb702ca..be2769fd5 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -16,6 +16,7 @@ -export([ %% General cluster/0, + enabled_links/0, links/0, link/1, topic_filters/1, @@ -41,6 +42,9 @@ cluster() -> links() -> emqx:get_config(?LINKS_PATH, []). +enabled_links() -> + [L || L = #{enable := true} <- links()]. + link(Name) -> case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, links()) of [LinkConf | _] -> LinkConf; diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index a09d4d8de..504e59c74 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -64,6 +64,8 @@ %% Op4 | n2@ds delete client/42/# → MCounter -= 1 bsl 1 = 0 → route deleted -type lane() :: non_neg_integer(). +-include_lib("emqx/include/emqx.hrl"). + -define(DEFAULT_ACTOR_TTL_MS, 30_000). -define(EXTROUTE_SHARD, ?MODULE). @@ -117,7 +119,8 @@ create_tables() -> match_routes(Topic) -> Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]), - [match_to_route(M) || M <- Matches]. + %% `unique` opt is not enough, since we keep the original Topic as a part of RouteID + lists:usort([match_to_route(M) || M <- Matches]). lookup_routes(Topic) -> Pat = #extroute{entry = emqx_topic_index:make_key(Topic, '$1'), _ = '_'}, @@ -128,7 +131,8 @@ topics() -> [emqx_topic_index:get_topic(K) || [K] <- ets:match(?EXTROUTE_TAB, Pat)]. match_to_route(M) -> - emqx_topic_index:get_topic(M). + ?ROUTE_ID(Cluster, _) = emqx_topic_index:get_id(M), + #route{topic = emqx_topic_index:get_topic(M), dest = Cluster}. %% diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 3b16642ac..a3e3ce2fb 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -556,8 +556,8 @@ encode_field(route, {add, Route = {_Topic, _ID}}) -> encode_field(route, {delete, {Topic, ID}}) -> {?ROUTE_DELETE, Topic, ID}. -decode_field(route, {?ROUTE_DELETE, Route = {_Topic, _ID}}) -> - {delete, Route}; +decode_field(route, {?ROUTE_DELETE, Topic, ID}) -> + {delete, {Topic, ID}}; decode_field(route, Route = {_Topic, _ID}) -> {add, Route}. @@ -565,7 +565,7 @@ decode_field(route, Route = {_Topic, _ID}) -> %% emqx_external_broker %%-------------------------------------------------------------------- -forward({external, {link, ClusterName}}, #delivery{message = #message{topic = Topic} = Msg}) -> +forward(ClusterName, #delivery{message = #message{topic = Topic} = Msg}) -> QueryOpts = #{pick_key => Topic}, emqx_resource:query(?MSG_RES_ID(ClusterName), Msg, QueryOpts). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index d432cd7d8..d41a41c5f 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -123,7 +123,11 @@ refine_client_options(Options = #{clientid := ClientID}) -> client_session_present(ClientPid) -> Info = emqtt:info(ClientPid), - proplists:get_value(session_present, Info, false). + %% FIXME: waitnig for emqtt release that fixes session_present type (must be a boolean) + case proplists:get_value(session_present, Info, 0) of + 0 -> false; + 1 -> true + end. announce_client(TargetCluster, Pid) -> true = gproc:reg_other(?CLIENT_NAME(TargetCluster), Pid), @@ -272,11 +276,10 @@ terminate(_Reason, _State) -> process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) -> case start_link_client(TargetCluster) of {ok, ClientPid} -> + %% TODO: error handling, handshake + {ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr), ok = start_syncer(TargetCluster), ok = announce_client(TargetCluster, ClientPid), - %% TODO: error handling, handshake - - {ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr), process_bootstrap(St#st{client = ClientPid}); {error, Reason} -> handle_connect_error(Reason, St)