diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index ed29ea614..d42258611 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -244,11 +244,22 @@ publish(Msg) when is_record(Msg, message) -> topic => Topic }), []; - Msg1 = #message{topic = Topic} -> - PersistRes = persist_publish(Msg1), - route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes) + Msg1 = #message{} -> + do_publish(Msg1); + Msgs when is_list(Msgs) -> do_publish_many(Msgs) end. +do_publish_many([]) -> + []; +do_publish_many([Msg | T]) -> + do_publish(Msg) ++ do_publish_many(T). + +do_publish(#message{topic = Topic} = Msg) -> + PersistRes = persist_publish(Msg), + {Routes, ExtRoutes} = aggre(emqx_router:match_routes(Topic)), + Routes1 = maybe_add_ext_routes(ExtRoutes, Routes, Msg), + route(Routes1, delivery(Msg), PersistRes). + persist_publish(Msg) -> case emqx_persistent_message:persist(Msg) of ok -> @@ -311,26 +322,40 @@ do_route({To, Node}, Delivery) when Node =:= node() -> {Node, To, dispatch(To, Delivery)}; do_route({To, Node}, Delivery) when is_atom(Node) -> {Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))}; +do_route({To, {external, _} = ExtDest}, Delivery) -> + {ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)}; do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. aggre([]) -> - []; + {[], []}; aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> - [{To, Node}]; + {[{To, Node}], []}; +aggre([#route{topic = To, dest = {external, _} = ExtDest}]) -> + {[], [{To, ExtDest}]}; aggre([#route{topic = To, dest = {Group, _Node}}]) -> - [{To, Group}]; + {[{To, Group}], []}; aggre(Routes) -> - aggre(Routes, false, []). + aggre(Routes, false, {[], []}). -aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) -> - aggre(Rest, Dedup, [{To, Node} | Acc]); -aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) -> - aggre(Rest, true, [{To, Group} | Acc]); +aggre([#route{topic = To, dest = Node} | Rest], Dedup, {Acc, ExtAcc}) when is_atom(Node) -> + aggre(Rest, Dedup, {[{To, Node} | Acc], ExtAcc}); +aggre([#route{topic = To, dest = {external, _} = ExtDest} | Rest], Dedup, {Acc, ExtAcc}) -> + aggre(Rest, Dedup, {Acc, [{To, ExtDest} | ExtAcc]}); +aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, {Acc, ExtAcc}) -> + aggre(Rest, true, {[{To, Group} | Acc], ExtAcc}); aggre([], false, Acc) -> Acc; -aggre([], true, Acc) -> - lists:usort(Acc). +aggre([], true, {Acc, ExtAcc}) -> + {lists:usort(Acc), lists:usort(ExtAcc)}. + +maybe_add_ext_routes([] = _ExtRoutes, Routes, _Msg) -> + Routes; +maybe_add_ext_routes(ExtRoutes, Routes, Msg) -> + case emqx_external_broker:should_route_to_external_dests(Msg) of + true -> Routes ++ ExtRoutes; + false -> Routes + end. %% @doc Forward message to another node. -spec forward( @@ -643,19 +668,27 @@ maybe_delete_route(Topic) -> sync_route(Action, Topic, ReplyTo) -> EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]), - case EnabledOn of - all -> - push_sync_route(Action, Topic, ReplyTo); - none -> - regular_sync_route(Action, Topic); - Role -> - case Role =:= mria_config:whoami() of - true -> - push_sync_route(Action, Topic, ReplyTo); - false -> - regular_sync_route(Action, Topic) - end - end. + Res = + case EnabledOn of + all -> + push_sync_route(Action, Topic, ReplyTo); + none -> + regular_sync_route(Action, Topic); + Role -> + case Role =:= mria_config:whoami() of + true -> + push_sync_route(Action, Topic, ReplyTo); + false -> + regular_sync_route(Action, Topic) + end + end, + _ = external_sync_route(Action, Topic), + Res. + +external_sync_route(add, Topic) -> + emqx_external_broker:maybe_add_route(Topic); +external_sync_route(delete, Topic) -> + emqx_external_broker:maybe_delete_route(Topic). push_sync_route(Action, Topic, Opts) -> emqx_router_syncer:push(Action, Topic, node(), Opts). diff --git a/apps/emqx/src/emqx_external_broker.erl b/apps/emqx/src/emqx_external_broker.erl new file mode 100644 index 000000000..a9af9ddc9 --- /dev/null +++ b/apps/emqx/src/emqx_external_broker.erl @@ -0,0 +1,117 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_external_broker). + +-callback forward(emqx_router:external_dest(), emqx_types:delivery()) -> + emqx_types:deliver_result(). + +-callback should_route_to_external_dests(emqx_types:message()) -> boolean(). + +-callback maybe_add_route(emqx_types:topic()) -> ok. +-callback maybe_delete_route(emqx_types:topic()) -> ok. + +-export([ + provider/0, + register_provider/1, + unregister_provider/1, + forward/2, + should_route_to_external_dests/1, + maybe_add_route/1, + maybe_delete_route/1 +]). + +-include("logger.hrl"). + +-define(PROVIDER, {?MODULE, external_broker}). + +-define(safe_with_provider(IfRegistered, IfNotRegistered), + case persistent_term:get(?PROVIDER, undefined) of + undefined -> + IfNotRegistered; + Provider -> + try + Provider:IfRegistered + catch + Err:Reason:St -> + ?SLOG(error, #{ + msg => "external_broker_crashed", + provider => Provider, + callback => ?FUNCTION_NAME, + stacktrace => St, + error => Err, + reason => Reason + }), + {error, Reason} + end + end +). + +%% TODO: provider API copied from emqx_external_traces, +%% but it can be moved to a common module. + +%%-------------------------------------------------------------------- +%% Provider API +%%-------------------------------------------------------------------- + +-spec register_provider(module()) -> ok | {error, term()}. +register_provider(Module) when is_atom(Module) -> + case is_valid_provider(Module) of + true -> + persistent_term:put(?PROVIDER, Module); + false -> + {error, invalid_provider} + end. + +-spec unregister_provider(module()) -> ok | {error, term()}. +unregister_provider(Module) -> + case persistent_term:get(?PROVIDER, undefined) of + Module -> + persistent_term:erase(?PROVIDER), + ok; + _ -> + {error, not_registered} + end. + +-spec provider() -> module() | undefined. +provider() -> + persistent_term:get(?PROVIDER, undefined). + +%%-------------------------------------------------------------------- +%% Broker API +%%-------------------------------------------------------------------- + +forward(ExternalDest, Delivery) -> + ?safe_with_provider(?FUNCTION_NAME(ExternalDest, Delivery), {error, unknown_dest}). + +should_route_to_external_dests(Message) -> + ?safe_with_provider(?FUNCTION_NAME(Message), false). + +maybe_add_route(Topic) -> + ?safe_with_provider(?FUNCTION_NAME(Topic), ok). + +maybe_delete_route(Topic) -> + ?safe_with_provider(?FUNCTION_NAME(Topic), ok). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +is_valid_provider(Module) -> + lists:all( + fun({F, A}) -> erlang:function_exported(Module, F, A) end, + ?MODULE:behaviour_info(callbacks) + ). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index c2616f98a..55b9ab079 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -91,11 +91,12 @@ deinit_schema/0 ]). --export_type([dest/0]). +-export_type([dest/0, external_dest/0]). -export_type([schemavsn/0]). -type group() :: binary(). --type dest() :: node() | {group(), node()}. +-type external_dest() :: {external, term()}. +-type dest() :: node() | {group(), node()} | external_dest(). -type schemavsn() :: v1 | v2. %% Operation :: {add, ...} | {delete, ...}. @@ -107,7 +108,14 @@ unused = [] :: nil() }). --define(node_patterns(Node), [Node, {'_', Node}]). +-define(dest_patterns(NodeOrExtDest), + case is_atom(NodeOrExtDest) of + %% node + true -> [NodeOrExtDest, {'_', NodeOrExtDest}]; + %% external destination + false -> [NodeOrExtDest] + end +). -define(UNSUPPORTED, unsupported). @@ -306,14 +314,14 @@ print_routes(Topic) -> match_routes(Topic) ). --spec cleanup_routes(node()) -> ok. -cleanup_routes(Node) -> - cleanup_routes(get_schema_vsn(), Node). +-spec cleanup_routes(node() | external_dest()) -> ok. +cleanup_routes(NodeOrExtDest) -> + cleanup_routes(get_schema_vsn(), NodeOrExtDest). -cleanup_routes(v2, Node) -> - cleanup_routes_v2(Node); -cleanup_routes(v1, Node) -> - cleanup_routes_v1(Node). +cleanup_routes(v2, NodeOrExtDest) -> + cleanup_routes_v2(NodeOrExtDest); +cleanup_routes(v1, NodeOrExtDest) -> + cleanup_routes_v1(NodeOrExtDest). -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldl_routes(FoldFun, AccIn) -> @@ -430,19 +438,19 @@ has_route_v1(Topic, Dest) -> has_route_tab_entry(Topic, Dest) -> [] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}). -cleanup_routes_v1(Node) -> +cleanup_routes_v1(NodeOrExtDest) -> ?with_fallback( lists:foreach( fun(Pattern) -> throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern))) end, - ?node_patterns(Node) + ?dest_patterns(NodeOrExtDest) ), - cleanup_routes_v1_fallback(Node) + cleanup_routes_v1_fallback(NodeOrExtDest) ). -cleanup_routes_v1_fallback(Node) -> - Patterns = [make_route_rec_pat(P) || P <- ?node_patterns(Node)], +cleanup_routes_v1_fallback(NodeOrExtDest) -> + Patterns = [make_route_rec_pat(P) || P <- ?dest_patterns(NodeOrExtDest)], mria:transaction(?ROUTE_SHARD, fun() -> [ mnesia:delete_object(?ROUTE_TAB, Route, write) @@ -525,7 +533,7 @@ has_route_v2(Topic, Dest) -> has_route_tab_entry(Topic, Dest) end. -cleanup_routes_v2(Node) -> +cleanup_routes_v2(NodeOrExtDest) -> ?with_fallback( lists:foreach( fun(Pattern) -> @@ -537,18 +545,18 @@ cleanup_routes_v2(Node) -> ), throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern))) end, - ?node_patterns(Node) + ?dest_patterns(NodeOrExtDest) ), - cleanup_routes_v2_fallback(Node) + cleanup_routes_v2_fallback(NodeOrExtDest) ). -cleanup_routes_v2_fallback(Node) -> +cleanup_routes_v2_fallback(NodeOrExtDest) -> %% NOTE %% No point in transaction here because all the operations on filters table are dirty. ok = ets:foldl( fun(#routeidx{entry = K}, ok) -> case get_dest_node(emqx_topic_index:get_id(K)) of - Node -> + NodeOrExtDest -> mria:dirty_delete(?ROUTE_TAB_FILTERS, K); _ -> ok @@ -560,7 +568,7 @@ cleanup_routes_v2_fallback(Node) -> ok = ets:foldl( fun(#route{dest = Dest} = Route, ok) -> case get_dest_node(Dest) of - Node -> + NodeOrExtDest -> mria:dirty_delete_object(?ROUTE_TAB, Route); _ -> ok @@ -570,6 +578,8 @@ cleanup_routes_v2_fallback(Node) -> ?ROUTE_TAB ). +get_dest_node({external, _} = ExtDest) -> + ExtDest; get_dest_node({_, Node}) -> Node; get_dest_node(Node) -> diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index cdb27b052..9cd631508 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -33,7 +33,8 @@ feed_var/3, systop/1, parse/1, - parse/2 + parse/2, + intersection/2 ]). -export([ @@ -52,6 +53,8 @@ ((C =:= '#' orelse C =:= <<"#">>) andalso REST =/= []) ). +-define(IS_WILDCARD(W), W =:= '+' orelse W =:= '#'). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -98,6 +101,45 @@ match(_, ['#']) -> match(_, _) -> false. +%% @doc Finds an intersection between two topics, two filters or a topic and a filter. +%% The function is commutative: reversing parameters doesn't affect the returned value. +%% Two topics intersect only when they are equal. +%% The intersection of a topic and a filter is always either the topic itself or false (no intersection). +%% The intersection of two filters is either false or a new topic filter that would match only those topics, +%% that can be matched by both input filters. +%% For example, the intersection of "t/global/#" and "t/+/1/+" is "t/global/1/+". +-spec intersection(TopicOrFilter, TopicOrFilter) -> TopicOrFilter | false when + TopicOrFilter :: emqx_types:topic(). +intersection(Topic1, Topic2) when is_binary(Topic1), is_binary(Topic2) -> + case intersection(words(Topic1), words(Topic2), []) of + [] -> false; + Intersection -> join(lists:reverse(Intersection)) + end. + +intersection(Words1, ['#'], Acc) -> + lists:reverse(Words1, Acc); +intersection(['#'], Words2, Acc) -> + lists:reverse(Words2, Acc); +intersection([W1], ['+'], Acc) -> + [W1 | Acc]; +intersection(['+'], [W2], Acc) -> + [W2 | Acc]; +intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1), ?IS_WILDCARD(W2) -> + intersection(T1, T2, [wildcard_intersection(W1, W2) | Acc]); +intersection([W | T1], [W | T2], Acc) -> + intersection(T1, T2, [W | Acc]); +intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1) -> + intersection(T1, T2, [W2 | Acc]); +intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W2) -> + intersection(T1, T2, [W1 | Acc]); +intersection([], [], Acc) -> + Acc; +intersection(_, _, _) -> + []. + +wildcard_intersection(W, W) -> W; +wildcard_intersection(_, _) -> '+'. + -spec match_share(Name, Filter) -> boolean() when Name :: share(), Filter :: topic() | share(). diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index f416aabc4..8c7011f7a 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -23,6 +23,7 @@ -export([delete/3]). -export([match/2]). -export([matches/3]). +-export([matches_filter/3]). -export([make_key/2]). @@ -72,6 +73,12 @@ match(Topic, Tab) -> matches(Topic, Tab, Opts) -> emqx_trie_search:matches(Topic, make_nextf(Tab), Opts). +%% @doc Match given topic filter against the index and return _all_ matches. +%% If `unique` option is given, return only unique matches by record ID. +-spec matches_filter(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [match(_ID)]. +matches_filter(TopicFilter, Tab, Opts) -> + emqx_trie_search:matches_filter(TopicFilter, make_nextf(Tab), Opts). + %% @doc Extract record ID from the match. -spec get_id(match(ID)) -> ID. get_id(Key) -> diff --git a/apps/emqx/src/emqx_trie_search.erl b/apps/emqx/src/emqx_trie_search.erl index 94a04f5ae..080fad74b 100644 --- a/apps/emqx/src/emqx_trie_search.erl +++ b/apps/emqx/src/emqx_trie_search.erl @@ -99,7 +99,7 @@ -module(emqx_trie_search). -export([make_key/2, make_pat/2, filter/1]). --export([match/2, matches/3, get_id/1, get_topic/1]). +-export([match/2, matches/3, get_id/1, get_topic/1, matches_filter/3]). -export_type([key/1, word/0, words/0, nextf/0, opts/0]). -define(END, '$end_of_table'). @@ -183,9 +183,20 @@ match(Topic, NextF) -> matches(Topic, NextF, Opts) -> search(Topic, NextF, Opts). +%% @doc Match given topic filter against the index and return _all_ matches. +-spec matches_filter(emqx_types:topic(), nextf(), opts()) -> [key(_)]. +matches_filter(TopicFilter, NextF, Opts) -> + search(TopicFilter, NextF, [topic_filter | Opts]). + %% @doc Entrypoint of the search for a given topic. search(Topic, NextF, Opts) -> - Words = topic_words(Topic), + %% A private opt + IsFilter = proplists:get_bool(topic_filter, Opts), + Words = + case IsFilter of + true -> filter_words(Topic); + false -> topic_words(Topic) + end, Base = base_init(Words), ORetFirst = proplists:get_bool(return_first, Opts), OUnique = proplists:get_bool(unique, Opts), @@ -200,8 +211,10 @@ search(Topic, NextF, Opts) -> end, Matches = case search_new(Words, Base, NextF, Acc0) of - {Cursor, Acc} -> + {Cursor, Acc} when not IsFilter -> match_topics(Topic, Cursor, NextF, Acc); + {_Cursor, Acc} -> + Acc; Acc -> Acc end, @@ -275,6 +288,17 @@ compare(['#'], _Words, _) -> % Closest possible next entries that we must not miss: % * a/+/+/d/# (same topic but a different ID) match_full; +%% Filter search %% +compare(_Filter, ['#'], _) -> + match_full; +compare([_ | TF], ['+' | TW], Pos) -> + case compare(TF, TW, Pos + 1) of + lower -> + lower; + Other -> + Other + end; +%% Filter search end %% compare(['+' | TF], [HW | TW], Pos) -> case compare(TF, TW, Pos + 1) of lower -> diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 322cc1c05..03a3c8a0f 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -267,6 +267,7 @@ [ {node(), topic(), deliver_result()} | {share, topic(), deliver_result()} + | {emqx_router:external_dest(), topic(), deliver_result()} | persisted ] | disconnect. diff --git a/apps/emqx_cluster_link/BSL.txt b/apps/emqx_cluster_link/BSL.txt new file mode 100644 index 000000000..c22445af8 --- /dev/null +++ b/apps/emqx_cluster_link/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2024 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2028-04-17 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl new file mode 100644 index 000000000..42eb7ca7b --- /dev/null +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -0,0 +1,10 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-define(TOPIC_PREFIX, "$LINK/cluster/"). +-define(CTRL_TOPIC_PREFIX, ?TOPIC_PREFIX "ctrl/"). +-define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/"). +-define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/"). + +-define(DEST(FromClusterName), {external, {link, FromClusterName}}). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.app.src b/apps/emqx_cluster_link/src/emqx_cluster_link.app.src new file mode 100644 index 000000000..d8da0c1ee --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.app.src @@ -0,0 +1,24 @@ +%% -*- mode: erlang -*- +{application, emqx_cluster_link, [ + {description, "EMQX Cluster Linking"}, + % strict semver, bump manually! + {vsn, "0.1.0"}, + {modules, []}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqtt, + ecpool, + emqx, + emqx_resource + ]}, + {mod, {emqx_cluster_link_app, []}}, + {env, []}, + {licenses, ["Business Source License 1.1"]}, + {maintainers, ["EMQX Team "]}, + {links, [ + {"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} +]}. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl new file mode 100644 index 000000000..f0b0c95ba --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -0,0 +1,155 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link). + +-behaviour(emqx_external_broker). + +-export([ + register_external_broker/0, + unregister_external_broker/0, + maybe_add_route/1, + maybe_delete_route/1, + forward/2, + should_route_to_external_dests/1 +]). + +%% emqx hooks +-export([ + put_hook/0, + delete_hook/0, + on_message_publish/1 +]). + +-include("emqx_cluster_link.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%%-------------------------------------------------------------------- +%% emqx_external_broker API +%%-------------------------------------------------------------------- + +register_external_broker() -> + emqx_external_broker:register_provider(?MODULE). + +unregister_external_broker() -> + emqx_external_broker:unregister_provider(?MODULE). + +maybe_add_route(Topic) -> + emqx_cluster_link_coordinator:route_op(<<"add">>, Topic). + +maybe_delete_route(_Topic) -> + %% Not implemented yet + %% emqx_cluster_link_coordinator:route_op(<<"delete">>, Topic). + ok. + +forward(ExternalDest, Delivery) -> + emqx_cluster_link_mqtt:forward(ExternalDest, Delivery). + +%% Do not forward any external messages to other links. +%% Only forward locally originated messages to all the relevant links, i.e. no gossip message forwarding. +should_route_to_external_dests(#message{extra = #{link_origin := _}}) -> + false; +should_route_to_external_dests(_Msg) -> + true. + +%%-------------------------------------------------------------------- +%% EMQX Hooks +%%-------------------------------------------------------------------- + +on_message_publish(#message{topic = <>, payload = Payload}) -> + _ = + case emqx_cluster_link_mqtt:decode_route_op(Payload) of + {add, Topics} when is_list(Topics) -> + add_routes(Topics, ClusterName); + {add, Topic} -> + emqx_router_syncer:push(add, Topic, ?DEST(ClusterName), #{}); + {delete, _} -> + %% Not implemented yet + ok; + cleanup_routes -> + cleanup_routes(ClusterName) + end, + {stop, []}; +on_message_publish(#message{topic = <>, payload = Payload}) -> + case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of + #message{} = ForwardedMsg -> + {stop, with_sender_name(ForwardedMsg, ClusterName)}; + _Err -> + %% Just ignore it. It must be already logged by the decoder + {stop, []} + end; +on_message_publish( + #message{topic = <>, payload = Payload} = Msg +) -> + case emqx_cluster_link_mqtt:decode_ctrl_msg(Payload, ClusterName) of + {init_link, InitRes} -> + on_init(InitRes, ClusterName, Msg); + {ack_link, Res} -> + on_init_ack(Res, ClusterName, Msg); + unlink -> + %% Stop pushing messages to the cluster that requested unlink, + %% It brings the link to a half-closed (unidirectional) state, + %% as this cluster may still replicate routes and receive messages from ClusterName. + emqx_cluster_link_mqtt:stop_msg_fwd_resource(ClusterName), + cleanup_routes(ClusterName) + end, + {stop, []}; +on_message_publish(_Msg) -> + ok. + +put_hook() -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_SYS_MSGS). + +delete_hook() -> + emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +cleanup_routes(ClusterName) -> + emqx_router:cleanup_routes(?DEST(ClusterName)). + +lookup_link_conf(ClusterName) -> + lists:search( + fun(#{upstream := N}) -> N =:= ClusterName end, + emqx:get_config([cluster, links], []) + ). + +on_init(Res, ClusterName, Msg) -> + #{ + 'Correlation-Data' := ReqId, + 'Response-Topic' := RespTopic + } = emqx_message:get_header(properties, Msg), + case lookup_link_conf(ClusterName) of + {value, LinkConf} -> + _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + emqx_cluster_link_mqtt:ack_link(ClusterName, Res, RespTopic, ReqId); + false -> + ?SLOG(error, #{ + msg => "init_link_request_from_unknown_cluster", + link_name => ClusterName + }), + %% Cannot ack/reply since we don't know how to reach the link cluster, + %% The cluster that tried to initiatw this link is expected to eventually fail with timeout. + ok + end. + +on_init_ack(Res, ClusterName, Msg) -> + #{'Correlation-Data' := ReqId} = emqx_message:get_header(properties, Msg), + emqx_cluster_link_coordinator:on_link_ack(ClusterName, ReqId, Res). + +add_routes(Topics, ClusterName) -> + lists:foreach( + fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end, + Topics + ). + +%% let it crash if extra is not a map, +%% we don't expect the message to be forwarded from an older EMQX release, +%% that doesn't set extra = #{} by default. +with_sender_name(#message{extra = Extra} = Msg, ClusterName) when is_map(Extra) -> + Msg#message{extra = Extra#{link_origin => ClusterName}}. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl new file mode 100644 index 000000000..68dc07f48 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -0,0 +1,61 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_app). + +-behaviour(application). + +-export([start/2, prep_stop/1, stop/1]). + +-define(BROKER_MOD, emqx_cluster_link). + +start(_StartType, _StartArgs) -> + emqx_cluster_link_config:add_handler(), + LinksConf = enabled_links(), + _ = + case LinksConf of + [_ | _] -> + ok = emqx_cluster_link:register_external_broker(), + ok = emqx_cluster_link:put_hook(), + ok = start_msg_fwd_resources(LinksConf); + _ -> + ok + end, + emqx_cluster_link_sup:start_link(LinksConf). + +prep_stop(State) -> + emqx_cluster_link_config:remove_handler(), + State. + +stop(_State) -> + _ = emqx_cluster_link:delete_hook(), + _ = emqx_cluster_link:unregister_external_broker(), + _ = stop_msg_fwd_resources(emqx:get_config([cluster, links], [])), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +enabled_links() -> + lists:filter( + fun(#{enable := IsEnabled}) -> IsEnabled =:= true end, + emqx:get_config([cluster, links], []) + ). + +start_msg_fwd_resources(LinksConf) -> + lists:foreach( + fun(LinkConf) -> + {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf) + end, + LinksConf + ). + +stop_msg_fwd_resources(LinksConf) -> + lists:foreach( + fun(#{upstream := Name}) -> + emqx_cluster_link_mqtt:stop_msg_fwd_resource(Name) + end, + LinksConf + ). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl new file mode 100644 index 000000000..ade3a8c97 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -0,0 +1,162 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_config). + +-behaviour(emqx_config_handler). + +-include_lib("emqx/include/logger.hrl"). + +-define(LINKS_PATH, [cluster, links]). +-define(CERTS_PATH(LinkName), filename:join(["cluster", "links", LinkName])). + +-export([ + add_handler/0, + remove_handler/0 +]). + +-export([ + pre_config_update/3, + post_config_update/5 +]). + +add_handler() -> + ok = emqx_config_handler:add_handler(?LINKS_PATH, ?MODULE). + +remove_handler() -> + ok = emqx_config_handler:remove_handler(?LINKS_PATH). + +pre_config_update(?LINKS_PATH, RawConf, RawConf) -> + {ok, RawConf}; +pre_config_update(?LINKS_PATH, NewRawConf, _RawConf) -> + {ok, convert_certs(NewRawConf)}. + +post_config_update(?LINKS_PATH, _Req, Old, Old, _AppEnvs) -> + ok; +post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) -> + ok = maybe_toggle_hook_and_provider(New), + #{ + removed := Removed, + added := Added, + changed := Changed + } = emqx_utils:diff_lists(New, Old, fun upstream_name/1), + RemovedRes = remove_links(Removed), + AddedRes = add_links(Added), + UpdatedRes = update_links(Changed), + IsAllOk = all_ok(RemovedRes) andalso all_ok(AddedRes) andalso all_ok(UpdatedRes), + case IsAllOk of + true -> + ok; + false -> + {error, #{added => AddedRes, removed => RemovedRes, updated => UpdatedRes}} + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +maybe_toggle_hook_and_provider(LinksConf) -> + case is_any_enabled(LinksConf) of + true -> + ok = emqx_cluster_link:register_external_broker(), + ok = emqx_cluster_link:put_hook(); + false -> + _ = emqx_cluster_link:delete_hook(), + _ = emqx_cluster_link:unregister_external_broker(), + ok + end. + +is_any_enabled(LinksConf) -> + lists:any( + fun(#{enable := IsEnabled}) -> IsEnabled =:= true end, + LinksConf + ). + +all_ok(Results) -> + lists:all( + fun + (ok) -> true; + ({ok, _}) -> true; + (_) -> false + end, + Results + ). + +add_links(LinksConf) -> + [add_link(Link) || Link <- LinksConf]. + +add_link(#{enabled := true} = LinkConf) -> + %% NOTE: this can be started later during init_link phase, but it looks not harmful to start it beforehand... + MsgFwdRes = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + CoordRes = ensure_coordinator(LinkConf), + combine_results(CoordRes, MsgFwdRes); +add_link(_DisabledLinkConf) -> + ok. + +remove_links(LinksConf) -> + [remove_link(Link) || Link <- LinksConf]. + +remove_link(LinkConf) -> + emqx_cluster_link_coord_sup:stop_coordinator(LinkConf). + +update_links(LinksConf) -> + [update_link(Link) || Link <- LinksConf]. + +%% TODO: do some updates without restart (at least without coordinator restart and re-election) +update_link(#{enabled := true} = LinkConf) -> + _ = remove_link(LinkConf), + add_link(LinkConf); +update_link(#{enabled := false} = LinkConf) -> + case remove_link(LinkConf) of + {error, not_found} -> ok; + Other -> Other + end. + +ensure_coordinator(LinkConf) -> + case emqx_cluster_link_coord_sup:start_coordinator(LinkConf) of + {error, {already_started, Pid}} -> + {ok, Pid}; + {error, already_present} -> + emqx_cluster_link_coord_sup:restart_coordinator(LinkConf) + end. + +combine_results(ok, ok) -> + ok; +combine_results(CoordRes, MsgFwdRes) -> + {error, #{coordinator => CoordRes, msg_fwd_resource => MsgFwdRes}}. + +upstream_name(#{upstream := N}) -> N; +upstream_name(#{<<"upstream">> := N}) -> N. + +convert_certs(LinksConf) -> + lists:map( + fun + (#{ssl := SSLOpts} = LinkConf) -> + LinkConf#{ssl => do_convert_certs(upstream_name(LinkConf), SSLOpts)}; + (#{<<"ssl">> := SSLOpts} = LinkConf) -> + LinkConf#{<<"ssl">> => do_convert_certs(upstream_name(LinkConf), SSLOpts)}; + (LinkConf) -> + LinkConf + end, + LinksConf + ). + +do_convert_certs(LinkName, SSLOpts) -> + case emqx_tls_lib:ensure_ssl_files(?CERTS_PATH(LinkName), SSLOpts) of + {ok, undefined} -> + SSLOpts; + {ok, SSLOpts1} -> + SSLOpts1; + {error, Reason} -> + ?SLOG( + error, + #{ + msg => "bad_ssl_config", + config_path => ?LINKS_PATH, + name => LinkName, + reason => Reason + } + ), + throw({bad_ssl_config, Reason}) + end. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl new file mode 100644 index 000000000..78fa030f2 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_coord_sup). + +-behaviour(supervisor). + +-export([start_link/1]). +-export([init/1]). + +-export([ + start_coordinator/1, + restart_coordinator/1, + stop_coordinator/1 +]). + +-define(SERVER, ?MODULE). +-define(COORDINATOR_MOD, emqx_cluster_link_coordinator). + +start_link(LinksConf) -> + supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf). + +init(LinksConf) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 5 + }, + {ok, {SupFlags, children(LinksConf)}}. + +start_coordinator(#{upstream := Name} = LinkConf) -> + supervisor:start_child(?SERVER, worker_spec(Name, LinkConf)). + +restart_coordinator(#{upstream := Name} = _LinkConf) -> + supervisor:restart_child(?SERVER, Name). + +stop_coordinator(#{upstream := Name} = _LinkConf) -> + case supervisor:terminate_child(?SERVER, Name) of + ok -> + supervisor:delete_child(?SERVER, Name); + Err -> + Err + end. + +worker_spec(Id, LinkConf) -> + #{ + id => Id, + start => {?COORDINATOR_MOD, start_link, [LinkConf]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [?COORDINATOR_MOD] + }. + +children(LinksConf) -> + [worker_spec(Name, Conf) || #{upstream := Name, enable := true} = Conf <- LinksConf]. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl new file mode 100644 index 000000000..4b8b9be8f --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl @@ -0,0 +1,454 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc experimental prototype implementation. +%% The idea is to add a sync point for all cluster route operations, +%% so that, routes can be batched/shrunk (via using emqx_route_syncer) before pushing them to linked clusters. +%% The expected result is reduced communication between linked clusters: +%% each nodes communicates with other clusters through coordinator. +%% The drawbacks are numerous though: +%% - complexity/leader elections, +%% - routes removal seems hard to implement unless remote cluster routes as stored per node, +%% in that case global coordinator per cluster is not needed any more. - TBD +-module(emqx_cluster_link_coordinator). + +-behaviour(gen_statem). + +%% API +-export([ + route_op/2, + on_link_ack/3 +]). + +-export([start_link/1]). + +%% gen_statem +-export([ + callback_mode/0, + init/1, + terminate/3 +]). + +%% gen_statem state functions +-export([ + wait_for_coordinator/3, + connecting/3, + init_linking/3, + bootstrapping/3, + coordinating/3, + following/3 +]). + +-export([select_routes/1]). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-define(COORDINATOR(UpstreamName), {?MODULE, UpstreamName}). +-define(SERVER, ?MODULE). +-define(WAIT_COORD_RETRY_INTERVAL, 100). +-define(CONN_RETRY_INTERVAL, 5000). +-define(INIT_LINK_RESP_TIMEOUT, 15_000). +-define(INIT_LINK_RETRIES, 5). +-define(UPSTREAM_DEST, {external, {link, _}}). +-define(IS_ROUTE_OP(Op), Op =:= <<"add">>; Op =:= <<"delete">>). + +start_link(Conf) -> + gen_statem:start_link(?MODULE, Conf, []). + +route_op(Op, Topic) -> + lists:foreach( + fun(#{upstream := UpstreamName, topics := LinkFilters}) -> + case topic_intersect_any(Topic, LinkFilters) of + false -> ok; + TopicOrFilter -> maybe_cast(UpstreamName, {Op, TopicOrFilter}) + end + end, + emqx:get_config([cluster, links]) + ). + +on_link_ack(ClusterName, ReqId, Res) -> + maybe_cast(ClusterName, {ack_link, ClusterName, ReqId, Res}). + +callback_mode() -> + [state_functions, state_enter]. + +init(LinkConf) -> + process_flag(trap_exit, true), + %% It helps to avoid unnecessary global name conflicts (and, as a result, coordinator re-election), + %% e.g. when a down nodes comes back + %% TODO: need to better understand `global` behaviour + _ = global:sync(), + Data = #{is_coordinator => false, link_conf => LinkConf}, + {ok, wait_for_coordinator, Data}. + +wait_for_coordinator(enter, _OldState, _Data) -> + {keep_state_and_data, [{state_timeout, 0, do_wait_for_coordinator}]}; +wait_for_coordinator(_, do_wait_for_coordinator, Data) -> + #{link_conf := #{upstream := Name}} = Data, + case global:whereis_name(?COORDINATOR(Name)) of + undefined -> + case register_coordinator(Name) of + yes -> + {next_state, connecting, Data#{is_coordinator => true}}; + no -> + %% TODO: this should not happen forever, if it does, we need to detect it + {keep_state_and_data, [ + {state_timeout, ?WAIT_COORD_RETRY_INTERVAL, do_wait_for_coordinator} + ]} + end; + %% Can be a prev stale pid? + %% Let it crash with case_clause if it happens... + Pid when is_pid(Pid) andalso Pid =/= self() -> + Data1 = Data#{coordinator_mon => erlang:monitor(process, Pid), coordinator_pid => Pid}, + {next_state, following, Data1} + end; +wait_for_coordinator(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> + %% Ignore any route op, until bootstrapping is started. + %% All ignored route ops are expected to be caught up during the bootstrap. + keep_state_and_data; +wait_for_coordinator(EventType, Event, Data) -> + handle_event_(?FUNCTION_NAME, EventType, Event, Data). + +connecting(enter, _OldState, _Data) -> + {keep_state_and_data, [{state_timeout, 0, reconnect}]}; +connecting(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> + %% Ignore any route op, until bootstrapping is started. + %% All ignored route ops are expected to be caught up during the bootstrap. + keep_state_and_data; +connecting(_EventType, reconnect, Data) -> + ensure_conn_pool(init_linking, Data); +connecting(EventType, Event, Data) -> + handle_event_(?FUNCTION_NAME, EventType, Event, Data). + +init_linking(enter, _OldState, Data) -> + {keep_state, Data#{link_retries => ?INIT_LINK_RETRIES}, [{state_timeout, 0, init_link}]}; +init_linking(cast, {ack_link, _ClusterName, ReqId, Res}, #{link_req_id := ReqId} = Data) -> + case Res of + %% This state machine is not suitable to bootstrap the upstream cluster conditionally, + %% since it ignores any route ops received before bootstrapping... + {ok, #{proto_ver := _, need_bootstrap := _}} -> + {next_state, bootstrapping, maps:without([link_req_id, link_retries], Data)}; + {error, <<"bad_upstream_name">>} -> + %% unrecoverable error that needs a user intervention, + %% TODO: maybe need to transition to some error state + {keep_state, maps:without([link_req_id, link_retries], Data), [{state_timeout, cancel}]} + end; +init_linking(_, init_link, #{link_conf := #{upstream := Name}, link_retries := Retries} = Data) -> + case Retries > 0 of + true -> + {ReqId, {ok, _}} = emqx_cluster_link_mqtt:init_link(Name), + Data1 = Data#{link_req_id => ReqId, link_retries => Retries - 1}, + {keep_state, Data1, [{state_timeout, ?INIT_LINK_RESP_TIMEOUT, init_link}]}; + false -> + ?SLOG(error, #{ + msg => "no_link_ack_response_received", + link_name => Name + }), + %% unrecoverable error that needs a user intervention, + %% TODO: maybe need to transition to some error state + keep_state_and_data + end; +init_linking(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> + %% Ignore any route op, until bootstrapping is started. + %% All ignored route ops are expected to be caught up during the bootstrap. + keep_state_and_data; +init_linking(EventType, Event, Data) -> + handle_event_(?FUNCTION_NAME, EventType, Event, Data). + +bootstrapping(enter, _OldState, #{link_conf := LinkConf} = Data) -> + #{topics := LinkFilters, upstream := ClusterName} = LinkConf, + %% TODO add timeout? + {Pid, Ref} = erlang:spawn_monitor(fun() -> bootstrap(ClusterName, LinkFilters) end), + {keep_state, Data#{bootstrap_pid => Pid, bootstrap_ref => Ref}}; +bootstrapping(info, {'DOWN', Ref, process, _Pid, Reason}, #{bootstrap_ref := Ref} = Data) -> + %% TODO: think about the best way to proceed if bootstrapping failed, + %% perhaps just transition back to connecting state? + normal = Reason, + Data1 = maps:without([bootstrap_ref, bootstrap_pid], Data), + {next_state, coordinating, Data1}; +%% Accumulate new route ops, since there is no guarantee +%% they will be included in the bootstrapped data +bootstrapping(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> + {keep_state_and_data, [postpone]}; +bootstrapping(EventType, Event, Data) -> + handle_event_(?FUNCTION_NAME, EventType, Event, Data). + +coordinating(enter, _OldState, _Data) -> + keep_state_and_data; +coordinating(cast, {Op, Topic}, Data) when ?IS_ROUTE_OP(Op) -> + #{link_conf := #{upstream := ClusterName}} = Data, + %% TODO: batching + case emqx_cluster_link_mqtt:publish_route_op(async, ClusterName, Op, Topic) of + {error, _} -> + %% Conn pool error, reconnect. + {next_state, connecting, stop_conn_pool(Data)}; + _Ref -> + keep_state_and_data + end; +%% TODO: this can also be received in other states, move to generic handler? +coordinating(info, {global_name_conflict, CoordName}, Data) -> + LogData = #{ + msg => "emqx_cluster_link_coordinator_name_conflict", + coordinator_name => CoordName + }, + LogData1 = + %% TODO: this can be a previous (self) coordinator? + case global:whereis_name(CoordName) of + undefined -> LogData; + Pid -> LogData#{new_coordinator => Pid, coordinator_node => node(Pid)} + end, + ?SLOG(warning, LogData1), + Data1 = stop_conn_pool(Data), + {next_state, wait_for_coordinator, Data1#{is_coordinator => false}}; +%% only errors results are expected +%% TODO: a single error causes reconnection and re-bootstrapping, +%% it's worth considering some optimizations. +coordinating(info, {pub_result, _Ref, {error, Reason}}, #{link_conf := #{upstream := Name}} = Data) -> + ?SLOG(error, #{ + msg => "failed_to_replicate_route_op_to_linked_cluster", + link_name => Name, + reason => Reason + }), + %% TODO: check errors, some may be not possible to correct by re-connecting + Data1 = stop_conn_pool(Data), + {next_state, connecting, Data1}; +coordinating(EventType, Event, Data) -> + handle_event_(?FUNCTION_NAME, EventType, Event, Data). + +following(enter, _OldState, _Data) -> + keep_state_and_data; +following(info, {'DOWN', MRef, process, _Pid, _Info}, #{coordinator_mon := MRef} = Data) -> + {next_state, wait_for_coordinator, maps:without([coordinator_mon, coordinator_pid], Data)}; +following(EventType, Event, Data) -> + handle_event_(?FUNCTION_NAME, EventType, Event, Data). + +handle_event_(_State, info, {'DOWN', Ref, process, _Pid, Reason}, Data) -> + case Data of + #{conn_pool_mons := #{Ref := WorkerName}, is_coordinator := true} -> + ?SLOG(warning, #{ + msg => "cluster_link_route_connection_is_down", + reason => Reason, + worker => WorkerName + }), + {next_state, connecting, stop_conn_pool(Data)}; + _ -> + %% Must be a stale 'DOWN' msg (e.g., from the next worker) which is already handled. + keep_state_and_data + end; +handle_event_(State, EventType, Event, Data) -> + ?SLOG(warning, #{ + msg => "unexpected_event", + event => Event, + event_type => EventType, + state => State, + data => Data + }), + keep_state_and_data. + +terminate(Reason, _State, #{link_conf := #{upstream := ClusterName}} = Data) -> + %% TODO unregister coordinator? + IsCoordinator = maps:get(is_coordinator, Data, false), + case Reason of + shutdown when IsCoordinator -> + %% must be sync, since we are going to stop the pool + %% NOTE: there is no guarantee that unlink op will arrive the last one + %% (since there may be other route op sent over another pool worker) + %% and clear everything, but it must be good enough to GC most of the routes. + _ = emqx_cluster_link_mqtt:remove_link(ClusterName); + _ -> + ok + end, + _ = stop_conn_pool(Data), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +topic_intersect_any(Topic, [LinkFilter | T]) -> + case emqx_topic:intersection(Topic, LinkFilter) of + false -> topic_intersect_any(Topic, T); + TopicOrFilter -> TopicOrFilter + end; +topic_intersect_any(_Topic, []) -> + false. + +bootstrap(ClusterName, LinkFilters) -> + %% TODO: do this in chunks + Topics = select_routes(LinkFilters), + {ok, _} = emqx_cluster_link_mqtt:publish_routes(sync, ClusterName, Topics). + +%% TODO: if a local route matches link filter exactly, +%% it's enough to only select this matching filter itself and skip any other routes? +%% E.g., local routes: "t/global/#", "t/global/1/+", clsuter link topics = ["t/global/#"], +%% it's enough to replicate "t/global/#" only to the linked cluster. +%% What to do when "t/global/#" subscriber unsubscribers +%% and we start to get forwarded messages (e.g. "t/global/2/3") matching no subscribers? +%% How can we efficiently replace "t/global/#" route with "t/global/1/+" +%% (intersection of "t/global/#" and "t/global/#")? +%% So maybe better not to do it at all and replicate both "t/global/1/+" and "t/global/#" ? +select_routes(LinkFilters) -> + {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters), + Routes = select_routes_by_topics(Topics), + Routes1 = intersecting_routes(Wildcards), + AllRoutes = Routes ++ Routes1, + case emqx_router:get_schema_vsn() of + v1 -> AllRoutes; + %% v2 stores filters (Wildcard subscriptions routes) in a separate index, + %% so WildcardRoutes contains only non-wildcard routes matching wildcard link filters. + %% Thus, we need to select wildcard routes additionally + v2 -> intersecting_routes_v2(Wildcards) ++ AllRoutes + end. + +select_routes_by_topics([]) -> + []; +select_routes_by_topics([Topic | T]) -> + case filter_out_upstream_routes(emqx_router:match_routes(Topic)) of + [_ | _] -> + %% These are non-wildcard link topics, so we don't care about actual + %% routes as long as they are matched, and just need to replicate + %% topic routes to the linked cluster + [Topic | select_routes_by_topics(T)]; + _ -> + select_routes_by_topics(T) + end. + +filter_out_upstream_routes(Routes) -> + lists:filter( + fun + (#route{dest = ?UPSTREAM_DEST}) -> false; + (_) -> true + end, + Routes + ). + +%% selects only non-wildcard routes that match wildcards (filters), +%% can only be done as a linear search over all routes +intersecting_routes([]) -> + []; +intersecting_routes(Wildcards) -> + Res = ets:foldl( + fun + (#route{dest = ?UPSTREAM_DEST}, Acc) -> + Acc; + (#route{topic = T}, Acc) -> + %% TODO: probably nice to validate cluster link topic filters + %% to have no intersections between each other? + case topic_intersect_any(T, Wildcards) of + false -> Acc; + Intersection -> Acc#{Intersection => undefined} + end + end, + #{}, + ?ROUTE_TAB + ), + maps:keys(Res). + +intersecting_routes_v2([]) -> + []; +intersecting_routes_v2(Wildcards) -> + lists:foldl( + fun(Wildcard, Acc) -> + MatchedFilters = matched_filters_v2(Wildcard), + all_intersections(Wildcard, MatchedFilters, Acc) + end, + [], + Wildcards + ). + +matched_filters_v2(Wildcard) -> + MatchesAcc = lists:foldl( + fun(M, Acc) -> + case emqx_topic_index:get_id(M) of + ?UPSTREAM_DEST -> + Acc; + _ -> + Acc#{emqx_topic_index:get_topic(M) => undefined} + end + end, + #{}, + emqx_topic_index:matches_filter(Wildcard, ?ROUTE_TAB_FILTERS, []) + ), + maps:keys(MatchesAcc). + +all_intersections(Wildcard, [W | Wildcards], Acc) -> + case emqx_topic:intersection(Wildcard, W) of + false -> all_intersections(Wildcard, Wildcards, Acc); + Intersection -> all_intersections(Wildcard, Wildcards, [Intersection | Acc]) + end; +all_intersections(_, [], Acc) -> + lists:usort(Acc). + +maybe_cast(UpstreamName, Msg) -> + case global:whereis_name(?COORDINATOR(UpstreamName)) of + Pid when is_pid(Pid) -> + gen_statem:cast(Pid, Msg); + undefined -> + %% Ignore and rely on coordinator bootstrapping once it's elected + ok + end. + +register_coordinator(UpstreamName) -> + case mria_config:role() of + core -> + global:register_name( + ?COORDINATOR(UpstreamName), self(), fun global:random_notify_name/3 + ); + _ -> + no + end. + +%% connecting state helper +ensure_conn_pool(NextState, #{link_conf := LinkConf} = Data) -> + Res = start_conn_pool(LinkConf), + Data1 = Data#{conn_pool => Res}, + case Res of + {ok, _} -> + Data2 = Data1#{conn_pool_mons => mon_pool_workers(LinkConf)}, + {next_state, NextState, Data2}; + _Err -> + {keep_state, Data1, [{state_timeout, ?CONN_RETRY_INTERVAL, reconnect}]} + end. + +start_conn_pool(LinkConf) -> + case emqx_cluster_link_mqtt:start_routing_pool(LinkConf) of + {ok, _Pid} = Ok -> + Ok; + {error, Reason} = Err -> + #{upstream := Name} = LinkConf, + ?SLOG(error, #{ + msg => "failed_to_connect_to_linked_cluster", + cluster_name => Name, + reason => Reason + }), + Err + end. + +stop_conn_pool(#{link_conf := #{upstream := Name}} = Data) -> + case Data of + #{conn_pool := {ok, _}} -> + Data1 = maybe_unmointor_workers(Data), + Data1#{conn_pool => {stopped, emqx_cluster_link_mqtt:stop_routing_pool(Name)}}; + _ -> + Data + end. + +maybe_unmointor_workers(#{conn_pool_mons := MonitorsMap} = Data) -> + _ = maps:foreach( + fun(Mref, _Name) -> + erlang:demonitor(Mref) + end, + MonitorsMap + ), + maps:remove(conn_pool_mons, Data); +maybe_unmointor_workers(Data) -> + Data. + +mon_pool_workers(LinkConf) -> + maps:from_list([ + {erlang:monitor(process, Pid), Name} + || {Name, Pid} <- emqx_cluster_link_mqtt:routing_pool_workers(LinkConf) + ]). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl new file mode 100644 index 000000000..1e9310aca --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -0,0 +1,547 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_mqtt). + +-include("emqx_cluster_link.hrl"). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%-include_lib("emqtt/include/emqtt.hrl"). + +-behaviour(emqx_resource). +-behaviour(ecpool_worker). + +%% ecpool +-export([connect/1]). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_query_async/4, + on_get_status/2 +]). + +-export([ + ensure_msg_fwd_resource/1, + stop_msg_fwd_resource/1, + start_routing_pool/1, + stop_routing_pool/1, + routing_pool_workers/1, + init_link/1, + ack_link/4, + remove_link/1, + publish_route_op/4, + publish_routes/3, + cleanup_routes/1, + decode_ctrl_msg/2, + decode_route_op/1, + decode_forwarded_msg/1 +]). + +-export([ + forward/2 +]). + +-define(ROUTE_CLIENTID_SUFFIX, ":route:"). +-define(MSG_CLIENTID_SUFFIX, ":msg:"). +-define(CLIENTID(Base, Suffix), emqx_bridge_mqtt_lib:clientid_base([Base, Suffix])). + +-define(MQTT_HOST_OPTS, #{default_port => 1883}). +-define(MY_CLUSTER_NAME, atom_to_binary(emqx_config:get([cluster, name]))). + +-define(ROUTE_TOPIC, <>). +-define(MSG_FWD_TOPIC, <>). +-define(CTRL_TOPIC(ClusterName), <>). + +%% ecpool and emqx_resource names +-define(ROUTE_POOL_PREFIX, "emqx_cluster_link_mqtt:route:"). +-define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:"). +-define(RES_NAME(Prefix, ClusterName), <>). +-define(ROUTE_POOL_NAME(ClusterName), ?RES_NAME(?ROUTE_POOL_PREFIX, ClusterName)). +-define(MSG_RES_ID(ClusterName), ?RES_NAME(?MSG_POOL_PREFIX, ClusterName)). +-define(HEALTH_CHECK_TIMEOUT, 1000). +-define(RES_GROUP, <<"emqx_cluster_link">>). +-define(DEFAULT_POOL_KEY, <<"default">>). + +%% Protocol +-define(PROTO_VER, <<"1.0">>). +-define(INIT_LINK_OP, <<"init_link">>). +-define(ACK_LINK_OP, <<"ack_link">>). +-define(UNLINK_OP, <<"unlink">>). +-define(BATCH_ROUTES_OP, <<"add_routes">>). +-define(CLEANUP_ROUTES_OP, <<"cleanup_routes">>). +%% It's worth optimizing non-batch op payload size, +%% thus it's encoded as a plain binary +-define(TOPIC_WITH_OP(Op, Topic), <>). +-define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])). +-define(ENCODE(Payload), erlang:term_to_binary(Payload)). + +-define(PUB_TIMEOUT, 10_000). + +ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) -> + ResConf = #{ + query_mode => async, + start_after_created => true, + start_timeout => 5000, + health_check_interval => 5000, + %% TODO: configure res_buf_worker pool separately? + worker_pool_size => PoolSize + }, + emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResConf). + +stop_msg_fwd_resource(ClusterName) -> + emqx_resource:stop(?MSG_RES_ID(ClusterName)). + +%%-------------------------------------------------------------------- +%% emqx_resource callbacks (message forwarding) +%%-------------------------------------------------------------------- + +callback_mode() -> async_if_possible. + +on_start(ResourceId, #{pool_size := PoolSize} = ClusterConf) -> + PoolName = ResourceId, + Options = [ + {name, PoolName}, + {pool_size, PoolSize}, + {pool_type, hash}, + {client_opts, emqtt_client_opts(?MSG_CLIENTID_SUFFIX, ClusterConf)} + ], + ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName), + case emqx_resource_pool:start(PoolName, ?MODULE, Options) of + ok -> + {ok, #{pool_name => PoolName, topic => ?MSG_FWD_TOPIC}}; + {error, {start_pool_failed, _, Reason}} -> + {error, Reason} + end. + +on_stop(ResourceId, _State) -> + #{pool_name := PoolName} = emqx_resource:get_allocated_resources(ResourceId), + emqx_resource_pool:stop(PoolName). + +on_query(_ResourceId, FwdMsg, #{pool_name := PoolName, topic := LinkTopic} = _State) when + is_record(FwdMsg, message) +-> + #message{topic = Topic, qos = QoS} = FwdMsg, + handle_send_result( + ecpool:pick_and_do( + {PoolName, Topic}, + fun(ConnPid) -> + emqtt:publish(ConnPid, LinkTopic, ?ENCODE(FwdMsg), QoS) + end, + no_handover + ) + ); +on_query(_ResourceId, {Topic, Props, Payload, QoS}, #{pool_name := PoolName} = _State) -> + handle_send_result( + ecpool:pick_and_do( + {PoolName, Topic}, + fun(ConnPid) -> + emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}]) + end, + no_handover + ) + ). + +on_query_async( + _ResourceId, FwdMsg, CallbackIn, #{pool_name := PoolName, topic := LinkTopic} = _State +) -> + Callback = {fun on_async_result/2, [CallbackIn]}, + #message{topic = Topic, qos = QoS} = FwdMsg, + %% TODO check message ordering, pick by topic,client pair? + ecpool:pick_and_do( + {PoolName, Topic}, + fun(ConnPid) -> + %% #delivery{} record has no valuable data for a remote link... + Payload = ?ENCODE(FwdMsg), + %% TODO: check override QOS requirements (if any) + emqtt:publish_async(ConnPid, LinkTopic, Payload, QoS, Callback) + end, + no_handover + ). + +%% copied from emqx_bridge_mqtt_connector + +on_async_result(Callback, Result) -> + apply_callback_function(Callback, handle_send_result(Result)). + +apply_callback_function(F, Result) when is_function(F) -> + erlang:apply(F, [Result]); +apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> + erlang:apply(F, A ++ [Result]); +apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> + erlang:apply(M, F, A ++ [Result]). + +handle_send_result(ok) -> + ok; +handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) -> + ok; +handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) -> + ok; +handle_send_result({ok, Reply}) -> + {error, classify_reply(Reply)}; +handle_send_result({error, Reason}) -> + {error, classify_error(Reason)}. + +classify_reply(Reply = #{reason_code := _}) -> + {unrecoverable_error, Reply}. + +classify_error(disconnected = Reason) -> + {recoverable_error, Reason}; +classify_error(ecpool_empty) -> + {recoverable_error, disconnected}; +classify_error({disconnected, _RC, _} = Reason) -> + {recoverable_error, Reason}; +classify_error({shutdown, _} = Reason) -> + {recoverable_error, Reason}; +classify_error(shutdown = Reason) -> + {recoverable_error, Reason}; +classify_error(Reason) -> + {unrecoverable_error, Reason}. + +%% copied from emqx_bridge_mqtt_connector +on_get_status(_ResourceId, #{pool_name := PoolName} = _State) -> + Workers = [Worker || {_Name, Worker} <- ecpool:workers(PoolName)], + try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of + Statuses -> + combine_status(Statuses) + catch + exit:timeout -> + connecting + end. + +get_status(Worker) -> + case ecpool_worker:client(Worker) of + {ok, Client} -> status(Client); + {error, _} -> disconnected + end. + +status(Pid) -> + try + case proplists:get_value(socket, emqtt:info(Pid)) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + catch + exit:{noproc, _} -> + disconnected + end. + +combine_status(Statuses) -> + %% NOTE + %% Natural order of statuses: [connected, connecting, disconnected] + %% * `disconnected` wins over any other status + %% * `connecting` wins over `connected` + case lists:reverse(lists:usort(Statuses)) of + [Status | _] -> + Status; + [] -> + disconnected + end. + +%%-------------------------------------------------------------------- +%% ecpool +%%-------------------------------------------------------------------- + +connect(Options) -> + WorkerId = proplists:get_value(ecpool_worker_id, Options), + #{clientid := ClientId} = ClientOpts = proplists:get_value(client_opts, Options), + ClientId1 = emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId), + ClientOpts1 = ClientOpts#{clientid => ClientId1}, + case emqtt:start_link(ClientOpts1) of + {ok, Pid} -> + case emqtt:connect(Pid) of + {ok, _Props} -> + {ok, Pid}; + Error -> + Error + end; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_start_failed", + config => emqx_utils:redact(ClientOpts), + reason => Reason + }), + Error + end. + +%%-------------------------------------------------------------------- +%% Routing +%%-------------------------------------------------------------------- + +routing_pool_workers(#{upstream := ClusterName} = _ClusterConf) -> + ecpool:workers(?ROUTE_POOL_NAME(ClusterName)). + +start_routing_pool(#{upstream := ClusterName} = ClusterConf) -> + start_pool(?ROUTE_POOL_NAME(ClusterName), ?ROUTE_CLIENTID_SUFFIX, ClusterConf). + +stop_routing_pool(ClusterName) -> + ecpool:stop_sup_pool(?ROUTE_POOL_NAME(ClusterName)). + +init_link(ClusterName) -> + Payload = #{ + <<"op">> => ?INIT_LINK_OP, + <<"proto_ver">> => ?PROTO_VER, + <<"upstream">> => ClusterName, + %% TODO: may no need to reserve it as it is a map? + <<"extra">> => #{} + }, + ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)), + Properties = #{ + 'Response-Topic' => ?CTRL_TOPIC(ClusterName), + 'Correlation-Data' => ReqId + }, + Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME), + {ReqId, publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, Properties, Topic, ?QOS_1)}. + +ack_link(ClusterName, Result, RespTopic, ReqId) -> + Payload = #{ + <<"op">> => ?ACK_LINK_OP, + %% The links may compare and downgrade/adjust protocol in future + <<"proto_ver">> => ?PROTO_VER, + %% may be used in future to avoud re-bootrstrapping all the routes, + %% for example, if the connection was abrupted for a while but the cluster was healthy + %% and didn't lost any routes. In that case, retrying lost route updates would be sufficient. + %% For now, it's always true for simplicitiy reasons. + <<"need_bootstrap">> => true, + <<"extra">> => #{} + }, + Payload1 = + case Result of + {ok, _} -> + Payload#{<<"result">> => <<"ok">>}; + {error, Reason} -> + Payload#{<<"result">> => <<"error">>, reason => Reason} + end, + Props = #{'Correlation-Data' => ReqId}, + Query = {RespTopic, Props, Payload1, ?QOS_1}, + %% Using msg forwading resource to send the response back. + %% TODO: maybe async query? + emqx_resource:query(?MSG_RES_ID(ClusterName), Query, #{ + query_mode => simple_sync, pick_key => RespTopic + }). + +remove_link(ClusterName) -> + Payload = #{<<"op">> => ?UNLINK_OP}, + Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME), + publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, Topic, ?QOS_0). + +publish_routes(QueryType, ClusterName, Topics) -> + %% Picks the same pool worker consistently. + %% Although, as writes are idompotent we can pick it randomly - TBD. + publish_routes(QueryType, ClusterName, ?DEFAULT_POOL_KEY, Topics). + +publish_routes(QueryType, ClusterName, PoolKey, Topics) -> + Payload = #{<<"op">> => ?BATCH_ROUTES_OP, <<"topics">> => Topics}, + publish(QueryType, ClusterName, PoolKey, Payload). + +cleanup_routes(ClusterName) -> + Payload = #{<<"op">> => ?CLEANUP_ROUTES_OP}, + publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, ?ROUTE_TOPIC, ?QOS_0). + +publish_route_op(QueryType, ClusterName, Op, Topic) when Op =:= <<"add">>; Op =:= <<"delete">> -> + Payload = ?TOPIC_WITH_OP(Op, Topic), + publish(QueryType, ClusterName, Topic, Payload). + +publish(QueryType, ClusterName, PoolKey, Payload) -> + publish(QueryType, ClusterName, PoolKey, Payload, #{}). + +publish(QueryType, ClusterName, PoolKey, Payload, Props) -> + %% Deletes are not implemented for now, writes are idempotent, so QOS_1 is fine. + publish(QueryType, ClusterName, PoolKey, Payload, Props, ?ROUTE_TOPIC, ?QOS_1). + +publish(async, ClusterName, PoolKey, Payload, Props, Topic, QoS) -> + ecpool:pick_and_do( + {?ROUTE_POOL_NAME(ClusterName), PoolKey}, + fun(ConnPid) -> + Ref = erlang:make_ref(), + Cb = {fun publish_result/3, [self(), Ref]}, + emqtt:publish_async( + ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}], ?PUB_TIMEOUT, Cb + ), + Ref + end, + no_handover + ); +publish(sync, ClusterName, PoolKey, Payload, Props, Topic, QoS) -> + ecpool:pick_and_do( + {?ROUTE_POOL_NAME(ClusterName), PoolKey}, + fun(ConnPid) -> + emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}]) + end, + no_handover + ). + +publish_result(Caller, Ref, Result) -> + case handle_send_result(Result) of + ok -> + %% avoid extra message passing, we only care about errors for now + ok; + Err -> + Caller ! {pub_result, Ref, Err} + end. + +%%-------------------------------------------------------------------- +%% Protocol +%%-------------------------------------------------------------------- + +decode_ctrl_msg(Payload, ClusterName) -> + decode_ctrl_msg1(?DECODE(Payload), ClusterName). + +decode_ctrl_msg1( + #{ + <<"op">> := ?INIT_LINK_OP, + <<"proto_ver">> := ProtoVer, + <<"upstream">> := UpstreamName + }, + ClusterName +) -> + ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName), + %% UpstreamName is the name the remote linked cluster refers to this cluster, + %% so it must equal to the local cluster name, more clear naming is desired... + MyClusterName = ?MY_CLUSTER_NAME, + case UpstreamName of + MyClusterName -> + {init_link, {ok, #{proto_ver => ProtoVer1}}}; + _ -> + ?SLOG(error, #{ + msg => "misconfigured_cluster_link_name", + %% How this cluster names itself + local_name => MyClusterName, + %% How the remote cluster names itself + link_name => ClusterName, + %% How the remote cluster names this local cluster + upstream_name => UpstreamName + }), + {init_link, {error, <<"bad_upstream_name">>}} + end; +decode_ctrl_msg1( + #{ + <<"op">> := ?ACK_LINK_OP, + <<"result">> := <<"ok">>, + <<"proto_ver">> := ProtoVer, + <<"need_bootstrap">> := IsBootstrapNeeded + }, + ClusterName +) -> + ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName), + {ack_link, {ok, #{proto_ver => ProtoVer1, need_bootstrap => IsBootstrapNeeded}}}; +decode_ctrl_msg1( + #{ + <<"op">> := ?ACK_LINK_OP, + <<"result">> := <<"error">>, + <<"reason">> := Reason + }, + _ClusterName +) -> + {ack_link, {error, Reason}}; +decode_ctrl_msg1(#{<<"op">> := ?UNLINK_OP}, _ClusterName) -> + unlink. + +decode_route_op(Payload) -> + decode_route_op1(?DECODE(Payload)). + +decode_route_op1(<<"add_", Topic/binary>>) -> + {add, Topic}; +decode_route_op1(<<"delete_", Topic/binary>>) -> + {delete, Topic}; +decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) -> + {add, Topics}; +decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) -> + cleanup_routes; +decode_route_op1(Payload) -> + ?SLOG(warning, #{ + msg => "unexpected_cluster_link_route_op_payload", + payload => Payload + }), + {error, Payload}. + +decode_forwarded_msg(Payload) -> + case ?DECODE(Payload) of + #message{} = Msg -> + Msg; + _ -> + ?SLOG(warning, #{ + msg => "unexpected_cluster_link_forwarded_msg_payload", + payload => Payload + }), + {error, Payload} + end. + +decode_proto_ver(ProtoVer, ClusterName) -> + {MyMajor, MyMinor} = decode_proto_ver1(?PROTO_VER), + case decode_proto_ver1(ProtoVer) of + {Major, Minor} = Res when + Major > MyMajor; + Minor > MyMinor + -> + ?SLOG(notice, #{ + msg => "different_cluster_link_protocol_versions", + protocol_version => ?PROTO_VER, + link_protocol_version => ProtoVer, + link_name => ClusterName + }), + Res; + Res -> + Res + end. + +decode_proto_ver1(ProtoVer) -> + [Major, Minor] = binary:split(ProtoVer, <<".">>), + %% Let it fail (for now), we don't expect invalid data to pass through the linking protocol.. + {emqx_utils_conv:int(Major), emqx_utils_conv:int(Minor)}. + +%%-------------------------------------------------------------------- +%% emqx_external_broker +%%-------------------------------------------------------------------- + +forward({external, {link, ClusterName}}, #delivery{message = #message{topic = Topic} = Msg}) -> + QueryOpts = #{pick_key => Topic}, + emqx_resource:query(?MSG_RES_ID(ClusterName), Msg, QueryOpts). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +emqtt_client_opts( + ClientIdSuffix, #{server := Server, ssl := #{enable := EnableSsl} = Ssl} = ClusterConf +) -> + BaseClientId = maps:get(client_id, ClusterConf, ?MY_CLUSTER_NAME), + ClientId = ?CLIENTID(BaseClientId, ClientIdSuffix), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS), + Opts = #{ + host => Host, + port => Port, + clientid => ClientId, + proto_ver => v5, + ssl => EnableSsl, + ssl_opts => maps:to_list(maps:remove(enable, Ssl)) + }, + with_password(with_user(Opts, ClusterConf), ClusterConf). + +with_user(Opts, #{username := U} = _ClusterConf) -> + Opts#{username => U}; +with_user(Opts, _ClusterConf) -> + Opts. + +with_password(Opts, #{password := P} = _ClusterConf) -> + Opts#{password => emqx_secret:unwrap(P)}; +with_password(Opts, _ClusterConf) -> + Opts. + +start_pool(PoolName, ClientIdSuffix, #{pool_size := PoolSize} = ClusterConf) -> + ClientOpts = emqtt_client_opts(ClientIdSuffix, ClusterConf), + Opts = [ + {name, PoolName}, + {pool_size, PoolSize}, + {pool_type, hash}, + {client_opts, ClientOpts} + ], + ecpool:start_sup_pool(PoolName, ?MODULE, Opts). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl new file mode 100644 index 000000000..abdfff39f --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_schema). + +-behaviour(emqx_schema_hooks). + +-include_lib("hocon/include/hoconsc.hrl"). + +-export([injected_fields/0]). + +-export([ + roots/0, + fields/1, + namespace/0, + desc/1 +]). + +-define(MQTT_HOST_OPTS, #{default_port => 1883}). + +namespace() -> "cluster_linking". + +roots() -> []. + +injected_fields() -> + #{cluster => fields("cluster_linking")}. + +fields("cluster_linking") -> + [ + %% TODO: validate and ensure upstream names are unique! + {links, ?HOCON(?ARRAY(?R_REF("link")), #{default => []})} + ]; +fields("link") -> + [ + {enable, ?HOCON(boolean(), #{default => false})}, + {upstream, ?HOCON(binary(), #{required => true})}, + {server, + emqx_schema:servers_sc(#{required => true, desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, + {clientid, ?HOCON(binary(), #{desc => ?DESC("clientid")})}, + {username, ?HOCON(binary(), #{desc => ?DESC("username")})}, + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}, + {ssl, #{ + type => ?R_REF(emqx_schema, "ssl_client_opts"), + default => #{<<"enable">> => false}, + desc => ?DESC("ssl") + }}, + %% TODO: validate topics: + %% - basic topic validation + %% - non-overlapping (not intersecting) filters ? + {topics, ?HOCON(?ARRAY(binary()), #{required => true})}, + {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})} + ]. + +desc(_) -> + "todo". diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl new file mode 100644 index 000000000..c98b9f4c5 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_sup). + +-behaviour(supervisor). + +-export([start_link/1]). + +-export([init/1]). + +-define(COORD_SUP, emqx_cluster_link_coord_sup). +-define(SERVER, ?MODULE). + +start_link(LinksConf) -> + supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf). + +init(LinksConf) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 5 + }, + Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)], + {ok, {SupFlags, Children}}. + +sup_spec(Id, Mod, Conf) -> + #{ + id => Id, + start => {Mod, start_link, [Conf]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [Mod] + }. diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 786b0f685..a0b5c820f 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -74,7 +74,9 @@ (?CE_AUTHN_PROVIDER_SCHEMA_MODS ++ ?EE_AUTHN_PROVIDER_SCHEMA_MODS) ). --define(OTHER_INJECTING_CONFIGS, ?AUTH_EXT_SCHEMA_MODS). +-define(CLUSTER_LINKING_SCHEMA_MODS, [emqx_cluster_link_schema]). + +-define(OTHER_INJECTING_CONFIGS, ?AUTH_EXT_SCHEMA_MODS ++ ?CLUSTER_LINKING_SCHEMA_MODS). -else. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 2c0de10aa..b4c59d291 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -255,7 +255,7 @@ fields("cluster") -> importance => ?IMPORTANCE_HIDDEN } )} - ]; + ] ++ emqx_schema_hooks:injection_point(cluster); fields(cluster_static) -> [ {"seeds", diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index e0e62d123..d9bcf9d25 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -133,7 +133,8 @@ emqx_bridge_syskeeper, emqx_bridge_confluent, emqx_ds_shared_sub, - emqx_auth_ext + emqx_auth_ext, + emqx_cluster_link ], %% must always be of type `load' ce_business_apps =>