From ac19cf89df60577e33b6c801bc64f6283ff8b7ba Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 23 May 2024 12:47:48 +0300 Subject: [PATCH] chore(clusterlink): remove code related to the rejected coordinator-based implementation --- .../include/emqx_cluster_link.hrl | 3 - .../src/emqx_cluster_link.app.src | 1 - .../src/emqx_cluster_link.erl | 54 --- .../src/emqx_cluster_link_config.erl | 21 +- .../src/emqx_cluster_link_coord_sup.erl | 57 --- .../src/emqx_cluster_link_coordinator.erl | 454 ------------------ .../src/emqx_cluster_link_mqtt.erl | 270 +---------- 7 files changed, 15 insertions(+), 845 deletions(-) delete mode 100644 apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl delete mode 100644 apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl index 8bf9dd7c2..dd2544114 100644 --- a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -3,12 +3,9 @@ %%-------------------------------------------------------------------- -define(TOPIC_PREFIX, "$LINK/cluster/"). --define(CTRL_TOPIC_PREFIX, ?TOPIC_PREFIX "ctrl/"). -define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/"). -define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/"). --define(DEST(FromClusterName), {external, {link, FromClusterName}}). - %% Fairly compact text encoding. -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>). -define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.app.src b/apps/emqx_cluster_link/src/emqx_cluster_link.app.src index d8da0c1ee..f7c5e102a 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.app.src +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.app.src @@ -9,7 +9,6 @@ kernel, stdlib, emqtt, - ecpool, emqx, emqx_resource ]}, diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 8d843edcc..846204066 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -100,22 +100,6 @@ on_message_publish(#message{topic = <>, p %% Just ignore it. It must be already logged by the decoder {stop, []} end; -on_message_publish( - #message{topic = <>, payload = Payload} = Msg -) -> - case emqx_cluster_link_mqtt:decode_ctrl_msg(Payload, ClusterName) of - {init_link, InitRes} -> - on_init(InitRes, ClusterName, Msg); - {ack_link, Res} -> - on_init_ack(Res, ClusterName, Msg); - unlink -> - %% Stop pushing messages to the cluster that requested unlink, - %% It brings the link to a half-closed (unidirectional) state, - %% as this cluster may still replicate routes and receive messages from ClusterName. - emqx_cluster_link_mqtt:stop_msg_fwd_resource(ClusterName), - cleanup_routes(ClusterName) - end, - {stop, []}; on_message_publish(_Msg) -> ok. @@ -166,44 +150,6 @@ update_routes(ClusterName, Actor, Incarnation, RouteOps) -> RouteOps ). -cleanup_routes(ClusterName) -> - emqx_router:cleanup_routes(?DEST(ClusterName)). - -lookup_link_conf(ClusterName) -> - lists:search( - fun(#{upstream := N}) -> N =:= ClusterName end, - emqx:get_config([cluster, links], []) - ). - -on_init(Res, ClusterName, Msg) -> - #{ - 'Correlation-Data' := ReqId, - 'Response-Topic' := RespTopic - } = emqx_message:get_header(properties, Msg), - case lookup_link_conf(ClusterName) of - {value, LinkConf} -> - _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), - emqx_cluster_link_mqtt:ack_link(ClusterName, Res, RespTopic, ReqId); - false -> - ?SLOG(error, #{ - msg => "init_link_request_from_unknown_cluster", - link_name => ClusterName - }), - %% Cannot ack/reply since we don't know how to reach the link cluster, - %% The cluster that tried to initiatw this link is expected to eventually fail with timeout. - ok - end. - -on_init_ack(Res, ClusterName, Msg) -> - #{'Correlation-Data' := ReqId} = emqx_message:get_header(properties, Msg), - emqx_cluster_link_coordinator:on_link_ack(ClusterName, ReqId, Res). - -%% add_routes(Topics, ClusterName) -> -%% lists:foreach( -%% fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end, -%% Topics -%% ). - %% let it crash if extra is not a map, %% we don't expect the message to be forwarded from an older EMQX release, %% that doesn't set extra = #{} by default. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index ba17d22e8..4b93407b2 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -21,7 +21,8 @@ link/1, topic_filters/1, %% Connections - emqtt_options/1 + emqtt_options/1, + mk_emqtt_options/1 ]). -export([ @@ -152,16 +153,18 @@ add_links(LinksConf) -> add_link(#{enabled := true} = LinkConf) -> %% NOTE: this can be started later during init_link phase, but it looks not harmful to start it beforehand... MsgFwdRes = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), - CoordRes = ensure_coordinator(LinkConf), - combine_results(CoordRes, MsgFwdRes); + %% TODO + ActorRes = ok, + combine_results(ActorRes, MsgFwdRes); add_link(_DisabledLinkConf) -> ok. remove_links(LinksConf) -> [remove_link(Link) || Link <- LinksConf]. -remove_link(LinkConf) -> - emqx_cluster_link_coord_sup:stop_coordinator(LinkConf). +remove_link(_LinkConf) -> + %% TODO + ok. update_links(LinksConf) -> [update_link(Link) || Link <- LinksConf]. @@ -176,14 +179,6 @@ update_link(#{enabled := false} = LinkConf) -> Other -> Other end. -ensure_coordinator(LinkConf) -> - case emqx_cluster_link_coord_sup:start_coordinator(LinkConf) of - {error, {already_started, Pid}} -> - {ok, Pid}; - {error, already_present} -> - emqx_cluster_link_coord_sup:restart_coordinator(LinkConf) - end. - combine_results(ok, ok) -> ok; combine_results(CoordRes, MsgFwdRes) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl deleted file mode 100644 index 78fa030f2..000000000 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl +++ /dev/null @@ -1,57 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_cluster_link_coord_sup). - --behaviour(supervisor). - --export([start_link/1]). --export([init/1]). - --export([ - start_coordinator/1, - restart_coordinator/1, - stop_coordinator/1 -]). - --define(SERVER, ?MODULE). --define(COORDINATOR_MOD, emqx_cluster_link_coordinator). - -start_link(LinksConf) -> - supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf). - -init(LinksConf) -> - SupFlags = #{ - strategy => one_for_one, - intensity => 10, - period => 5 - }, - {ok, {SupFlags, children(LinksConf)}}. - -start_coordinator(#{upstream := Name} = LinkConf) -> - supervisor:start_child(?SERVER, worker_spec(Name, LinkConf)). - -restart_coordinator(#{upstream := Name} = _LinkConf) -> - supervisor:restart_child(?SERVER, Name). - -stop_coordinator(#{upstream := Name} = _LinkConf) -> - case supervisor:terminate_child(?SERVER, Name) of - ok -> - supervisor:delete_child(?SERVER, Name); - Err -> - Err - end. - -worker_spec(Id, LinkConf) -> - #{ - id => Id, - start => {?COORDINATOR_MOD, start_link, [LinkConf]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [?COORDINATOR_MOD] - }. - -children(LinksConf) -> - [worker_spec(Name, Conf) || #{upstream := Name, enable := true} = Conf <- LinksConf]. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl deleted file mode 100644 index 4b8b9be8f..000000000 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl +++ /dev/null @@ -1,454 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - -%% @doc experimental prototype implementation. -%% The idea is to add a sync point for all cluster route operations, -%% so that, routes can be batched/shrunk (via using emqx_route_syncer) before pushing them to linked clusters. -%% The expected result is reduced communication between linked clusters: -%% each nodes communicates with other clusters through coordinator. -%% The drawbacks are numerous though: -%% - complexity/leader elections, -%% - routes removal seems hard to implement unless remote cluster routes as stored per node, -%% in that case global coordinator per cluster is not needed any more. - TBD --module(emqx_cluster_link_coordinator). - --behaviour(gen_statem). - -%% API --export([ - route_op/2, - on_link_ack/3 -]). - --export([start_link/1]). - -%% gen_statem --export([ - callback_mode/0, - init/1, - terminate/3 -]). - -%% gen_statem state functions --export([ - wait_for_coordinator/3, - connecting/3, - init_linking/3, - bootstrapping/3, - coordinating/3, - following/3 -]). - --export([select_routes/1]). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_router.hrl"). --include_lib("emqx/include/logger.hrl"). - --define(COORDINATOR(UpstreamName), {?MODULE, UpstreamName}). --define(SERVER, ?MODULE). --define(WAIT_COORD_RETRY_INTERVAL, 100). --define(CONN_RETRY_INTERVAL, 5000). --define(INIT_LINK_RESP_TIMEOUT, 15_000). --define(INIT_LINK_RETRIES, 5). --define(UPSTREAM_DEST, {external, {link, _}}). --define(IS_ROUTE_OP(Op), Op =:= <<"add">>; Op =:= <<"delete">>). - -start_link(Conf) -> - gen_statem:start_link(?MODULE, Conf, []). - -route_op(Op, Topic) -> - lists:foreach( - fun(#{upstream := UpstreamName, topics := LinkFilters}) -> - case topic_intersect_any(Topic, LinkFilters) of - false -> ok; - TopicOrFilter -> maybe_cast(UpstreamName, {Op, TopicOrFilter}) - end - end, - emqx:get_config([cluster, links]) - ). - -on_link_ack(ClusterName, ReqId, Res) -> - maybe_cast(ClusterName, {ack_link, ClusterName, ReqId, Res}). - -callback_mode() -> - [state_functions, state_enter]. - -init(LinkConf) -> - process_flag(trap_exit, true), - %% It helps to avoid unnecessary global name conflicts (and, as a result, coordinator re-election), - %% e.g. when a down nodes comes back - %% TODO: need to better understand `global` behaviour - _ = global:sync(), - Data = #{is_coordinator => false, link_conf => LinkConf}, - {ok, wait_for_coordinator, Data}. - -wait_for_coordinator(enter, _OldState, _Data) -> - {keep_state_and_data, [{state_timeout, 0, do_wait_for_coordinator}]}; -wait_for_coordinator(_, do_wait_for_coordinator, Data) -> - #{link_conf := #{upstream := Name}} = Data, - case global:whereis_name(?COORDINATOR(Name)) of - undefined -> - case register_coordinator(Name) of - yes -> - {next_state, connecting, Data#{is_coordinator => true}}; - no -> - %% TODO: this should not happen forever, if it does, we need to detect it - {keep_state_and_data, [ - {state_timeout, ?WAIT_COORD_RETRY_INTERVAL, do_wait_for_coordinator} - ]} - end; - %% Can be a prev stale pid? - %% Let it crash with case_clause if it happens... - Pid when is_pid(Pid) andalso Pid =/= self() -> - Data1 = Data#{coordinator_mon => erlang:monitor(process, Pid), coordinator_pid => Pid}, - {next_state, following, Data1} - end; -wait_for_coordinator(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> - %% Ignore any route op, until bootstrapping is started. - %% All ignored route ops are expected to be caught up during the bootstrap. - keep_state_and_data; -wait_for_coordinator(EventType, Event, Data) -> - handle_event_(?FUNCTION_NAME, EventType, Event, Data). - -connecting(enter, _OldState, _Data) -> - {keep_state_and_data, [{state_timeout, 0, reconnect}]}; -connecting(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> - %% Ignore any route op, until bootstrapping is started. - %% All ignored route ops are expected to be caught up during the bootstrap. - keep_state_and_data; -connecting(_EventType, reconnect, Data) -> - ensure_conn_pool(init_linking, Data); -connecting(EventType, Event, Data) -> - handle_event_(?FUNCTION_NAME, EventType, Event, Data). - -init_linking(enter, _OldState, Data) -> - {keep_state, Data#{link_retries => ?INIT_LINK_RETRIES}, [{state_timeout, 0, init_link}]}; -init_linking(cast, {ack_link, _ClusterName, ReqId, Res}, #{link_req_id := ReqId} = Data) -> - case Res of - %% This state machine is not suitable to bootstrap the upstream cluster conditionally, - %% since it ignores any route ops received before bootstrapping... - {ok, #{proto_ver := _, need_bootstrap := _}} -> - {next_state, bootstrapping, maps:without([link_req_id, link_retries], Data)}; - {error, <<"bad_upstream_name">>} -> - %% unrecoverable error that needs a user intervention, - %% TODO: maybe need to transition to some error state - {keep_state, maps:without([link_req_id, link_retries], Data), [{state_timeout, cancel}]} - end; -init_linking(_, init_link, #{link_conf := #{upstream := Name}, link_retries := Retries} = Data) -> - case Retries > 0 of - true -> - {ReqId, {ok, _}} = emqx_cluster_link_mqtt:init_link(Name), - Data1 = Data#{link_req_id => ReqId, link_retries => Retries - 1}, - {keep_state, Data1, [{state_timeout, ?INIT_LINK_RESP_TIMEOUT, init_link}]}; - false -> - ?SLOG(error, #{ - msg => "no_link_ack_response_received", - link_name => Name - }), - %% unrecoverable error that needs a user intervention, - %% TODO: maybe need to transition to some error state - keep_state_and_data - end; -init_linking(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> - %% Ignore any route op, until bootstrapping is started. - %% All ignored route ops are expected to be caught up during the bootstrap. - keep_state_and_data; -init_linking(EventType, Event, Data) -> - handle_event_(?FUNCTION_NAME, EventType, Event, Data). - -bootstrapping(enter, _OldState, #{link_conf := LinkConf} = Data) -> - #{topics := LinkFilters, upstream := ClusterName} = LinkConf, - %% TODO add timeout? - {Pid, Ref} = erlang:spawn_monitor(fun() -> bootstrap(ClusterName, LinkFilters) end), - {keep_state, Data#{bootstrap_pid => Pid, bootstrap_ref => Ref}}; -bootstrapping(info, {'DOWN', Ref, process, _Pid, Reason}, #{bootstrap_ref := Ref} = Data) -> - %% TODO: think about the best way to proceed if bootstrapping failed, - %% perhaps just transition back to connecting state? - normal = Reason, - Data1 = maps:without([bootstrap_ref, bootstrap_pid], Data), - {next_state, coordinating, Data1}; -%% Accumulate new route ops, since there is no guarantee -%% they will be included in the bootstrapped data -bootstrapping(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) -> - {keep_state_and_data, [postpone]}; -bootstrapping(EventType, Event, Data) -> - handle_event_(?FUNCTION_NAME, EventType, Event, Data). - -coordinating(enter, _OldState, _Data) -> - keep_state_and_data; -coordinating(cast, {Op, Topic}, Data) when ?IS_ROUTE_OP(Op) -> - #{link_conf := #{upstream := ClusterName}} = Data, - %% TODO: batching - case emqx_cluster_link_mqtt:publish_route_op(async, ClusterName, Op, Topic) of - {error, _} -> - %% Conn pool error, reconnect. - {next_state, connecting, stop_conn_pool(Data)}; - _Ref -> - keep_state_and_data - end; -%% TODO: this can also be received in other states, move to generic handler? -coordinating(info, {global_name_conflict, CoordName}, Data) -> - LogData = #{ - msg => "emqx_cluster_link_coordinator_name_conflict", - coordinator_name => CoordName - }, - LogData1 = - %% TODO: this can be a previous (self) coordinator? - case global:whereis_name(CoordName) of - undefined -> LogData; - Pid -> LogData#{new_coordinator => Pid, coordinator_node => node(Pid)} - end, - ?SLOG(warning, LogData1), - Data1 = stop_conn_pool(Data), - {next_state, wait_for_coordinator, Data1#{is_coordinator => false}}; -%% only errors results are expected -%% TODO: a single error causes reconnection and re-bootstrapping, -%% it's worth considering some optimizations. -coordinating(info, {pub_result, _Ref, {error, Reason}}, #{link_conf := #{upstream := Name}} = Data) -> - ?SLOG(error, #{ - msg => "failed_to_replicate_route_op_to_linked_cluster", - link_name => Name, - reason => Reason - }), - %% TODO: check errors, some may be not possible to correct by re-connecting - Data1 = stop_conn_pool(Data), - {next_state, connecting, Data1}; -coordinating(EventType, Event, Data) -> - handle_event_(?FUNCTION_NAME, EventType, Event, Data). - -following(enter, _OldState, _Data) -> - keep_state_and_data; -following(info, {'DOWN', MRef, process, _Pid, _Info}, #{coordinator_mon := MRef} = Data) -> - {next_state, wait_for_coordinator, maps:without([coordinator_mon, coordinator_pid], Data)}; -following(EventType, Event, Data) -> - handle_event_(?FUNCTION_NAME, EventType, Event, Data). - -handle_event_(_State, info, {'DOWN', Ref, process, _Pid, Reason}, Data) -> - case Data of - #{conn_pool_mons := #{Ref := WorkerName}, is_coordinator := true} -> - ?SLOG(warning, #{ - msg => "cluster_link_route_connection_is_down", - reason => Reason, - worker => WorkerName - }), - {next_state, connecting, stop_conn_pool(Data)}; - _ -> - %% Must be a stale 'DOWN' msg (e.g., from the next worker) which is already handled. - keep_state_and_data - end; -handle_event_(State, EventType, Event, Data) -> - ?SLOG(warning, #{ - msg => "unexpected_event", - event => Event, - event_type => EventType, - state => State, - data => Data - }), - keep_state_and_data. - -terminate(Reason, _State, #{link_conf := #{upstream := ClusterName}} = Data) -> - %% TODO unregister coordinator? - IsCoordinator = maps:get(is_coordinator, Data, false), - case Reason of - shutdown when IsCoordinator -> - %% must be sync, since we are going to stop the pool - %% NOTE: there is no guarantee that unlink op will arrive the last one - %% (since there may be other route op sent over another pool worker) - %% and clear everything, but it must be good enough to GC most of the routes. - _ = emqx_cluster_link_mqtt:remove_link(ClusterName); - _ -> - ok - end, - _ = stop_conn_pool(Data), - ok. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -topic_intersect_any(Topic, [LinkFilter | T]) -> - case emqx_topic:intersection(Topic, LinkFilter) of - false -> topic_intersect_any(Topic, T); - TopicOrFilter -> TopicOrFilter - end; -topic_intersect_any(_Topic, []) -> - false. - -bootstrap(ClusterName, LinkFilters) -> - %% TODO: do this in chunks - Topics = select_routes(LinkFilters), - {ok, _} = emqx_cluster_link_mqtt:publish_routes(sync, ClusterName, Topics). - -%% TODO: if a local route matches link filter exactly, -%% it's enough to only select this matching filter itself and skip any other routes? -%% E.g., local routes: "t/global/#", "t/global/1/+", clsuter link topics = ["t/global/#"], -%% it's enough to replicate "t/global/#" only to the linked cluster. -%% What to do when "t/global/#" subscriber unsubscribers -%% and we start to get forwarded messages (e.g. "t/global/2/3") matching no subscribers? -%% How can we efficiently replace "t/global/#" route with "t/global/1/+" -%% (intersection of "t/global/#" and "t/global/#")? -%% So maybe better not to do it at all and replicate both "t/global/1/+" and "t/global/#" ? -select_routes(LinkFilters) -> - {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters), - Routes = select_routes_by_topics(Topics), - Routes1 = intersecting_routes(Wildcards), - AllRoutes = Routes ++ Routes1, - case emqx_router:get_schema_vsn() of - v1 -> AllRoutes; - %% v2 stores filters (Wildcard subscriptions routes) in a separate index, - %% so WildcardRoutes contains only non-wildcard routes matching wildcard link filters. - %% Thus, we need to select wildcard routes additionally - v2 -> intersecting_routes_v2(Wildcards) ++ AllRoutes - end. - -select_routes_by_topics([]) -> - []; -select_routes_by_topics([Topic | T]) -> - case filter_out_upstream_routes(emqx_router:match_routes(Topic)) of - [_ | _] -> - %% These are non-wildcard link topics, so we don't care about actual - %% routes as long as they are matched, and just need to replicate - %% topic routes to the linked cluster - [Topic | select_routes_by_topics(T)]; - _ -> - select_routes_by_topics(T) - end. - -filter_out_upstream_routes(Routes) -> - lists:filter( - fun - (#route{dest = ?UPSTREAM_DEST}) -> false; - (_) -> true - end, - Routes - ). - -%% selects only non-wildcard routes that match wildcards (filters), -%% can only be done as a linear search over all routes -intersecting_routes([]) -> - []; -intersecting_routes(Wildcards) -> - Res = ets:foldl( - fun - (#route{dest = ?UPSTREAM_DEST}, Acc) -> - Acc; - (#route{topic = T}, Acc) -> - %% TODO: probably nice to validate cluster link topic filters - %% to have no intersections between each other? - case topic_intersect_any(T, Wildcards) of - false -> Acc; - Intersection -> Acc#{Intersection => undefined} - end - end, - #{}, - ?ROUTE_TAB - ), - maps:keys(Res). - -intersecting_routes_v2([]) -> - []; -intersecting_routes_v2(Wildcards) -> - lists:foldl( - fun(Wildcard, Acc) -> - MatchedFilters = matched_filters_v2(Wildcard), - all_intersections(Wildcard, MatchedFilters, Acc) - end, - [], - Wildcards - ). - -matched_filters_v2(Wildcard) -> - MatchesAcc = lists:foldl( - fun(M, Acc) -> - case emqx_topic_index:get_id(M) of - ?UPSTREAM_DEST -> - Acc; - _ -> - Acc#{emqx_topic_index:get_topic(M) => undefined} - end - end, - #{}, - emqx_topic_index:matches_filter(Wildcard, ?ROUTE_TAB_FILTERS, []) - ), - maps:keys(MatchesAcc). - -all_intersections(Wildcard, [W | Wildcards], Acc) -> - case emqx_topic:intersection(Wildcard, W) of - false -> all_intersections(Wildcard, Wildcards, Acc); - Intersection -> all_intersections(Wildcard, Wildcards, [Intersection | Acc]) - end; -all_intersections(_, [], Acc) -> - lists:usort(Acc). - -maybe_cast(UpstreamName, Msg) -> - case global:whereis_name(?COORDINATOR(UpstreamName)) of - Pid when is_pid(Pid) -> - gen_statem:cast(Pid, Msg); - undefined -> - %% Ignore and rely on coordinator bootstrapping once it's elected - ok - end. - -register_coordinator(UpstreamName) -> - case mria_config:role() of - core -> - global:register_name( - ?COORDINATOR(UpstreamName), self(), fun global:random_notify_name/3 - ); - _ -> - no - end. - -%% connecting state helper -ensure_conn_pool(NextState, #{link_conf := LinkConf} = Data) -> - Res = start_conn_pool(LinkConf), - Data1 = Data#{conn_pool => Res}, - case Res of - {ok, _} -> - Data2 = Data1#{conn_pool_mons => mon_pool_workers(LinkConf)}, - {next_state, NextState, Data2}; - _Err -> - {keep_state, Data1, [{state_timeout, ?CONN_RETRY_INTERVAL, reconnect}]} - end. - -start_conn_pool(LinkConf) -> - case emqx_cluster_link_mqtt:start_routing_pool(LinkConf) of - {ok, _Pid} = Ok -> - Ok; - {error, Reason} = Err -> - #{upstream := Name} = LinkConf, - ?SLOG(error, #{ - msg => "failed_to_connect_to_linked_cluster", - cluster_name => Name, - reason => Reason - }), - Err - end. - -stop_conn_pool(#{link_conf := #{upstream := Name}} = Data) -> - case Data of - #{conn_pool := {ok, _}} -> - Data1 = maybe_unmointor_workers(Data), - Data1#{conn_pool => {stopped, emqx_cluster_link_mqtt:stop_routing_pool(Name)}}; - _ -> - Data - end. - -maybe_unmointor_workers(#{conn_pool_mons := MonitorsMap} = Data) -> - _ = maps:foreach( - fun(Mref, _Name) -> - erlang:demonitor(Mref) - end, - MonitorsMap - ), - maps:remove(conn_pool_mons, Data); -maybe_unmointor_workers(Data) -> - Data. - -mon_pool_workers(LinkConf) -> - maps:from_list([ - {erlang:monitor(process, Pid), Name} - || {Name, Pid} <- emqx_cluster_link_mqtt:routing_pool_workers(LinkConf) - ]). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index a3e3ce2fb..d62965bb2 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -9,8 +9,6 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). -%-include_lib("emqtt/include/emqtt.hrl"). - -behaviour(emqx_resource). -behaviour(ecpool_worker). @@ -30,16 +28,6 @@ -export([ ensure_msg_fwd_resource/1, stop_msg_fwd_resource/1, - start_routing_pool/1, - stop_routing_pool/1, - routing_pool_workers/1, - init_link/1, - ack_link/4, - remove_link/1, - publish_route_op/4, - publish_routes/3, - cleanup_routes/1, - decode_ctrl_msg/2, decode_route_op/1, decode_forwarded_msg/1 ]). @@ -54,37 +42,24 @@ forward/2 ]). --define(ROUTE_CLIENTID_SUFFIX, ":route:"). -define(MSG_CLIENTID_SUFFIX, ":msg:"). --define(CLIENTID(Base, Suffix), emqx_bridge_mqtt_lib:clientid_base([Base, Suffix])). -define(MQTT_HOST_OPTS, #{default_port => 1883}). -define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()). -define(ROUTE_TOPIC, <>). -define(MSG_FWD_TOPIC, <>). --define(CTRL_TOPIC(ClusterName), <>). +%%-define(CTRL_TOPIC(ClusterName), <>). -%% ecpool and emqx_resource names --define(ROUTE_POOL_PREFIX, "emqx_cluster_link_mqtt:route:"). -define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:"). -define(RES_NAME(Prefix, ClusterName), <>). -define(ROUTE_POOL_NAME(ClusterName), ?RES_NAME(?ROUTE_POOL_PREFIX, ClusterName)). -define(MSG_RES_ID(ClusterName), ?RES_NAME(?MSG_POOL_PREFIX, ClusterName)). -define(HEALTH_CHECK_TIMEOUT, 1000). -define(RES_GROUP, <<"emqx_cluster_link">>). --define(DEFAULT_POOL_KEY, <<"default">>). %% Protocol --define(PROTO_VER, <<"1.0">>). --define(INIT_LINK_OP, <<"init_link">>). --define(ACK_LINK_OP, <<"ack_link">>). --define(UNLINK_OP, <<"unlink">>). --define(BATCH_ROUTES_OP, <<"add_routes">>). --define(CLEANUP_ROUTES_OP, <<"cleanup_routes">>). -%% It's worth optimizing non-batch op payload size, -%% thus it's encoded as a plain binary --define(TOPIC_WITH_OP(Op, Topic), <>). +%% -define(PROTO_VER, <<"1.0">>). -define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])). -define(ENCODE(Payload), erlang:term_to_binary(Payload)). @@ -290,121 +265,9 @@ connect(Options) -> end. %%-------------------------------------------------------------------- -%% Routing +%% Protocol %%-------------------------------------------------------------------- -routing_pool_workers(#{upstream := ClusterName} = _ClusterConf) -> - ecpool:workers(?ROUTE_POOL_NAME(ClusterName)). - -start_routing_pool(#{upstream := ClusterName} = ClusterConf) -> - start_pool(?ROUTE_POOL_NAME(ClusterName), ?ROUTE_CLIENTID_SUFFIX, ClusterConf). - -stop_routing_pool(ClusterName) -> - ecpool:stop_sup_pool(?ROUTE_POOL_NAME(ClusterName)). - -init_link(ClusterName) -> - Payload = #{ - <<"op">> => ?INIT_LINK_OP, - <<"proto_ver">> => ?PROTO_VER, - <<"upstream">> => ClusterName, - %% TODO: may no need to reserve it as it is a map? - <<"extra">> => #{} - }, - ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)), - Properties = #{ - 'Response-Topic' => ?CTRL_TOPIC(ClusterName), - 'Correlation-Data' => ReqId - }, - Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME), - {ReqId, publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, Properties, Topic, ?QOS_1)}. - -ack_link(ClusterName, Result, RespTopic, ReqId) -> - Payload = #{ - <<"op">> => ?ACK_LINK_OP, - %% The links may compare and downgrade/adjust protocol in future - <<"proto_ver">> => ?PROTO_VER, - %% may be used in future to avoud re-bootrstrapping all the routes, - %% for example, if the connection was abrupted for a while but the cluster was healthy - %% and didn't lost any routes. In that case, retrying lost route updates would be sufficient. - %% For now, it's always true for simplicitiy reasons. - <<"need_bootstrap">> => true, - <<"extra">> => #{} - }, - Payload1 = - case Result of - {ok, _} -> - Payload#{<<"result">> => <<"ok">>}; - {error, Reason} -> - Payload#{<<"result">> => <<"error">>, reason => Reason} - end, - Props = #{'Correlation-Data' => ReqId}, - Query = {RespTopic, Props, Payload1, ?QOS_1}, - %% Using msg forwading resource to send the response back. - %% TODO: maybe async query? - emqx_resource:query(?MSG_RES_ID(ClusterName), Query, #{ - query_mode => simple_sync, pick_key => RespTopic - }). - -remove_link(ClusterName) -> - Payload = #{<<"op">> => ?UNLINK_OP}, - Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME), - publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, Topic, ?QOS_0). - -publish_routes(QueryType, ClusterName, Topics) -> - %% Picks the same pool worker consistently. - %% Although, as writes are idompotent we can pick it randomly - TBD. - publish_routes(QueryType, ClusterName, ?DEFAULT_POOL_KEY, Topics). - -publish_routes(QueryType, ClusterName, PoolKey, Topics) -> - Payload = #{<<"op">> => ?BATCH_ROUTES_OP, <<"topics">> => Topics}, - publish(QueryType, ClusterName, PoolKey, Payload). - -cleanup_routes(ClusterName) -> - Payload = #{<<"op">> => ?CLEANUP_ROUTES_OP}, - publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, ?ROUTE_TOPIC, ?QOS_0). - -publish_route_op(QueryType, ClusterName, Op, Topic) when Op =:= <<"add">>; Op =:= <<"delete">> -> - Payload = ?TOPIC_WITH_OP(Op, Topic), - publish(QueryType, ClusterName, Topic, Payload). - -publish(QueryType, ClusterName, PoolKey, Payload) -> - publish(QueryType, ClusterName, PoolKey, Payload, #{}). - -publish(QueryType, ClusterName, PoolKey, Payload, Props) -> - %% Deletes are not implemented for now, writes are idempotent, so QOS_1 is fine. - publish(QueryType, ClusterName, PoolKey, Payload, Props, ?ROUTE_TOPIC, ?QOS_1). - -publish(async, ClusterName, PoolKey, Payload, Props, Topic, QoS) -> - ecpool:pick_and_do( - {?ROUTE_POOL_NAME(ClusterName), PoolKey}, - fun(ConnPid) -> - Ref = erlang:make_ref(), - Cb = {fun publish_result/3, [self(), Ref]}, - emqtt:publish_async( - ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}], ?PUB_TIMEOUT, Cb - ), - Ref - end, - no_handover - ); -publish(sync, ClusterName, PoolKey, Payload, Props, Topic, QoS) -> - ecpool:pick_and_do( - {?ROUTE_POOL_NAME(ClusterName), PoolKey}, - fun(ConnPid) -> - emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}]) - end, - no_handover - ). - -publish_result(Caller, Ref, Result) -> - case handle_send_result(Result) of - ok -> - %% avoid extra message passing, we only care about errors for now - ok; - Err -> - Caller ! {pub_result, Ref, Err} - end. - %%% New leader-less Syncer/Actor implementation publish_actor_init_sync(ClientPid, Actor, Incarnation) -> @@ -427,63 +290,6 @@ publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> }, emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1). -%%-------------------------------------------------------------------- -%% Protocol -%%-------------------------------------------------------------------- - -decode_ctrl_msg(Payload, ClusterName) -> - decode_ctrl_msg1(?DECODE(Payload), ClusterName). - -decode_ctrl_msg1( - #{ - <<"op">> := ?INIT_LINK_OP, - <<"proto_ver">> := ProtoVer, - <<"upstream">> := UpstreamName - }, - ClusterName -) -> - ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName), - %% UpstreamName is the name the remote linked cluster refers to this cluster, - %% so it must equal to the local cluster name, more clear naming is desired... - MyClusterName = ?MY_CLUSTER_NAME, - case UpstreamName of - MyClusterName -> - {init_link, {ok, #{proto_ver => ProtoVer1}}}; - _ -> - ?SLOG(error, #{ - msg => "misconfigured_cluster_link_name", - %% How this cluster names itself - local_name => MyClusterName, - %% How the remote cluster names itself - link_name => ClusterName, - %% How the remote cluster names this local cluster - upstream_name => UpstreamName - }), - {init_link, {error, <<"bad_upstream_name">>}} - end; -decode_ctrl_msg1( - #{ - <<"op">> := ?ACK_LINK_OP, - <<"result">> := <<"ok">>, - <<"proto_ver">> := ProtoVer, - <<"need_bootstrap">> := IsBootstrapNeeded - }, - ClusterName -) -> - ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName), - {ack_link, {ok, #{proto_ver => ProtoVer1, need_bootstrap => IsBootstrapNeeded}}}; -decode_ctrl_msg1( - #{ - <<"op">> := ?ACK_LINK_OP, - <<"result">> := <<"error">>, - <<"reason">> := Reason - }, - _ClusterName -) -> - {ack_link, {error, Reason}}; -decode_ctrl_msg1(#{<<"op">> := ?UNLINK_OP}, _ClusterName) -> - unlink. - decode_route_op(Payload) -> decode_route_op1(?DECODE(Payload)). @@ -501,14 +307,6 @@ decode_route_op1(#{ }) -> RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps), {route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1}; -%%decode_route_op1(<<"add_", Topic/binary>>) -> -%% {add, Topic}; -%%decode_route_op1(<<"delete_", Topic/binary>>) -> -%% {delete, Topic}; -%%decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) -> -%% {add, Topics}; -%%decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) -> -%% cleanup_routes; decode_route_op1(Payload) -> ?SLOG(warning, #{ msg => "unexpected_cluster_link_route_op_payload", @@ -528,29 +326,6 @@ decode_forwarded_msg(Payload) -> {error, Payload} end. -decode_proto_ver(ProtoVer, ClusterName) -> - {MyMajor, MyMinor} = decode_proto_ver1(?PROTO_VER), - case decode_proto_ver1(ProtoVer) of - {Major, Minor} = Res when - Major > MyMajor; - Minor > MyMinor - -> - ?SLOG(notice, #{ - msg => "different_cluster_link_protocol_versions", - protocol_version => ?PROTO_VER, - link_protocol_version => ProtoVer, - link_name => ClusterName - }), - Res; - Res -> - Res - end. - -decode_proto_ver1(ProtoVer) -> - [Major, Minor] = binary:split(ProtoVer, <<".">>), - %% Let it fail (for now), we don't expect invalid data to pass through the linking protocol.. - {emqx_utils_conv:int(Major), emqx_utils_conv:int(Minor)}. - encode_field(route, {add, Route = {_Topic, _ID}}) -> Route; encode_field(route, {delete, {Topic, ID}}) -> @@ -573,38 +348,7 @@ forward(ClusterName, #delivery{message = #message{topic = Topic} = Msg}) -> %% Internal functions %%-------------------------------------------------------------------- -emqtt_client_opts( - ClientIdSuffix, #{server := Server, ssl := #{enable := EnableSsl} = Ssl} = ClusterConf -) -> - BaseClientId = maps:get(client_id, ClusterConf, ?MY_CLUSTER_NAME), - ClientId = ?CLIENTID(BaseClientId, ClientIdSuffix), - #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS), - Opts = #{ - host => Host, - port => Port, - clientid => ClientId, - proto_ver => v5, - ssl => EnableSsl, - ssl_opts => maps:to_list(maps:remove(enable, Ssl)) - }, - with_password(with_user(Opts, ClusterConf), ClusterConf). - -with_user(Opts, #{username := U} = _ClusterConf) -> - Opts#{username => U}; -with_user(Opts, _ClusterConf) -> - Opts. - -with_password(Opts, #{password := P} = _ClusterConf) -> - Opts#{password => emqx_secret:unwrap(P)}; -with_password(Opts, _ClusterConf) -> - Opts. - -start_pool(PoolName, ClientIdSuffix, #{pool_size := PoolSize} = ClusterConf) -> - ClientOpts = emqtt_client_opts(ClientIdSuffix, ClusterConf), - Opts = [ - {name, PoolName}, - {pool_size, PoolSize}, - {pool_type, hash}, - {client_opts, ClientOpts} - ], - ecpool:start_sup_pool(PoolName, ?MODULE, Opts). +emqtt_client_opts(ClientIdSuffix, ClusterConf) -> + #{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf), + ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]), + Opts#{clientid => ClientId}.