chore(clusterlink): remove code related to the rejected coordinator-based implementation

This commit is contained in:
Serge Tupchii 2024-05-23 12:47:48 +03:00
parent e26e7acaa1
commit ac19cf89df
7 changed files with 15 additions and 845 deletions

View File

@ -3,12 +3,9 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(TOPIC_PREFIX, "$LINK/cluster/"). -define(TOPIC_PREFIX, "$LINK/cluster/").
-define(CTRL_TOPIC_PREFIX, ?TOPIC_PREFIX "ctrl/").
-define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/"). -define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/").
-define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/"). -define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/").
-define(DEST(FromClusterName), {external, {link, FromClusterName}}).
%% Fairly compact text encoding. %% Fairly compact text encoding.
-define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>). -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>).
-define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>). -define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>).

View File

@ -9,7 +9,6 @@
kernel, kernel,
stdlib, stdlib,
emqtt, emqtt,
ecpool,
emqx, emqx,
emqx_resource emqx_resource
]}, ]},

View File

@ -100,22 +100,6 @@ on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, p
%% Just ignore it. It must be already logged by the decoder %% Just ignore it. It must be already logged by the decoder
{stop, []} {stop, []}
end; end;
on_message_publish(
#message{topic = <<?CTRL_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload} = Msg
) ->
case emqx_cluster_link_mqtt:decode_ctrl_msg(Payload, ClusterName) of
{init_link, InitRes} ->
on_init(InitRes, ClusterName, Msg);
{ack_link, Res} ->
on_init_ack(Res, ClusterName, Msg);
unlink ->
%% Stop pushing messages to the cluster that requested unlink,
%% It brings the link to a half-closed (unidirectional) state,
%% as this cluster may still replicate routes and receive messages from ClusterName.
emqx_cluster_link_mqtt:stop_msg_fwd_resource(ClusterName),
cleanup_routes(ClusterName)
end,
{stop, []};
on_message_publish(_Msg) -> on_message_publish(_Msg) ->
ok. ok.
@ -166,44 +150,6 @@ update_routes(ClusterName, Actor, Incarnation, RouteOps) ->
RouteOps RouteOps
). ).
cleanup_routes(ClusterName) ->
emqx_router:cleanup_routes(?DEST(ClusterName)).
lookup_link_conf(ClusterName) ->
lists:search(
fun(#{upstream := N}) -> N =:= ClusterName end,
emqx:get_config([cluster, links], [])
).
on_init(Res, ClusterName, Msg) ->
#{
'Correlation-Data' := ReqId,
'Response-Topic' := RespTopic
} = emqx_message:get_header(properties, Msg),
case lookup_link_conf(ClusterName) of
{value, LinkConf} ->
_ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
emqx_cluster_link_mqtt:ack_link(ClusterName, Res, RespTopic, ReqId);
false ->
?SLOG(error, #{
msg => "init_link_request_from_unknown_cluster",
link_name => ClusterName
}),
%% Cannot ack/reply since we don't know how to reach the link cluster,
%% The cluster that tried to initiatw this link is expected to eventually fail with timeout.
ok
end.
on_init_ack(Res, ClusterName, Msg) ->
#{'Correlation-Data' := ReqId} = emqx_message:get_header(properties, Msg),
emqx_cluster_link_coordinator:on_link_ack(ClusterName, ReqId, Res).
%% add_routes(Topics, ClusterName) ->
%% lists:foreach(
%% fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end,
%% Topics
%% ).
%% let it crash if extra is not a map, %% let it crash if extra is not a map,
%% we don't expect the message to be forwarded from an older EMQX release, %% we don't expect the message to be forwarded from an older EMQX release,
%% that doesn't set extra = #{} by default. %% that doesn't set extra = #{} by default.

View File

@ -21,7 +21,8 @@
link/1, link/1,
topic_filters/1, topic_filters/1,
%% Connections %% Connections
emqtt_options/1 emqtt_options/1,
mk_emqtt_options/1
]). ]).
-export([ -export([
@ -152,16 +153,18 @@ add_links(LinksConf) ->
add_link(#{enabled := true} = LinkConf) -> add_link(#{enabled := true} = LinkConf) ->
%% NOTE: this can be started later during init_link phase, but it looks not harmful to start it beforehand... %% NOTE: this can be started later during init_link phase, but it looks not harmful to start it beforehand...
MsgFwdRes = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), MsgFwdRes = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
CoordRes = ensure_coordinator(LinkConf), %% TODO
combine_results(CoordRes, MsgFwdRes); ActorRes = ok,
combine_results(ActorRes, MsgFwdRes);
add_link(_DisabledLinkConf) -> add_link(_DisabledLinkConf) ->
ok. ok.
remove_links(LinksConf) -> remove_links(LinksConf) ->
[remove_link(Link) || Link <- LinksConf]. [remove_link(Link) || Link <- LinksConf].
remove_link(LinkConf) -> remove_link(_LinkConf) ->
emqx_cluster_link_coord_sup:stop_coordinator(LinkConf). %% TODO
ok.
update_links(LinksConf) -> update_links(LinksConf) ->
[update_link(Link) || Link <- LinksConf]. [update_link(Link) || Link <- LinksConf].
@ -176,14 +179,6 @@ update_link(#{enabled := false} = LinkConf) ->
Other -> Other Other -> Other
end. end.
ensure_coordinator(LinkConf) ->
case emqx_cluster_link_coord_sup:start_coordinator(LinkConf) of
{error, {already_started, Pid}} ->
{ok, Pid};
{error, already_present} ->
emqx_cluster_link_coord_sup:restart_coordinator(LinkConf)
end.
combine_results(ok, ok) -> combine_results(ok, ok) ->
ok; ok;
combine_results(CoordRes, MsgFwdRes) -> combine_results(CoordRes, MsgFwdRes) ->

View File

@ -1,57 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_cluster_link_coord_sup).
-behaviour(supervisor).
-export([start_link/1]).
-export([init/1]).
-export([
start_coordinator/1,
restart_coordinator/1,
stop_coordinator/1
]).
-define(SERVER, ?MODULE).
-define(COORDINATOR_MOD, emqx_cluster_link_coordinator).
start_link(LinksConf) ->
supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf).
init(LinksConf) ->
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 5
},
{ok, {SupFlags, children(LinksConf)}}.
start_coordinator(#{upstream := Name} = LinkConf) ->
supervisor:start_child(?SERVER, worker_spec(Name, LinkConf)).
restart_coordinator(#{upstream := Name} = _LinkConf) ->
supervisor:restart_child(?SERVER, Name).
stop_coordinator(#{upstream := Name} = _LinkConf) ->
case supervisor:terminate_child(?SERVER, Name) of
ok ->
supervisor:delete_child(?SERVER, Name);
Err ->
Err
end.
worker_spec(Id, LinkConf) ->
#{
id => Id,
start => {?COORDINATOR_MOD, start_link, [LinkConf]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [?COORDINATOR_MOD]
}.
children(LinksConf) ->
[worker_spec(Name, Conf) || #{upstream := Name, enable := true} = Conf <- LinksConf].

View File

@ -1,454 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc experimental prototype implementation.
%% The idea is to add a sync point for all cluster route operations,
%% so that, routes can be batched/shrunk (via using emqx_route_syncer) before pushing them to linked clusters.
%% The expected result is reduced communication between linked clusters:
%% each nodes communicates with other clusters through coordinator.
%% The drawbacks are numerous though:
%% - complexity/leader elections,
%% - routes removal seems hard to implement unless remote cluster routes as stored per node,
%% in that case global coordinator per cluster is not needed any more. - TBD
-module(emqx_cluster_link_coordinator).
-behaviour(gen_statem).
%% API
-export([
route_op/2,
on_link_ack/3
]).
-export([start_link/1]).
%% gen_statem
-export([
callback_mode/0,
init/1,
terminate/3
]).
%% gen_statem state functions
-export([
wait_for_coordinator/3,
connecting/3,
init_linking/3,
bootstrapping/3,
coordinating/3,
following/3
]).
-export([select_routes/1]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("emqx/include/logger.hrl").
-define(COORDINATOR(UpstreamName), {?MODULE, UpstreamName}).
-define(SERVER, ?MODULE).
-define(WAIT_COORD_RETRY_INTERVAL, 100).
-define(CONN_RETRY_INTERVAL, 5000).
-define(INIT_LINK_RESP_TIMEOUT, 15_000).
-define(INIT_LINK_RETRIES, 5).
-define(UPSTREAM_DEST, {external, {link, _}}).
-define(IS_ROUTE_OP(Op), Op =:= <<"add">>; Op =:= <<"delete">>).
start_link(Conf) ->
gen_statem:start_link(?MODULE, Conf, []).
route_op(Op, Topic) ->
lists:foreach(
fun(#{upstream := UpstreamName, topics := LinkFilters}) ->
case topic_intersect_any(Topic, LinkFilters) of
false -> ok;
TopicOrFilter -> maybe_cast(UpstreamName, {Op, TopicOrFilter})
end
end,
emqx:get_config([cluster, links])
).
on_link_ack(ClusterName, ReqId, Res) ->
maybe_cast(ClusterName, {ack_link, ClusterName, ReqId, Res}).
callback_mode() ->
[state_functions, state_enter].
init(LinkConf) ->
process_flag(trap_exit, true),
%% It helps to avoid unnecessary global name conflicts (and, as a result, coordinator re-election),
%% e.g. when a down nodes comes back
%% TODO: need to better understand `global` behaviour
_ = global:sync(),
Data = #{is_coordinator => false, link_conf => LinkConf},
{ok, wait_for_coordinator, Data}.
wait_for_coordinator(enter, _OldState, _Data) ->
{keep_state_and_data, [{state_timeout, 0, do_wait_for_coordinator}]};
wait_for_coordinator(_, do_wait_for_coordinator, Data) ->
#{link_conf := #{upstream := Name}} = Data,
case global:whereis_name(?COORDINATOR(Name)) of
undefined ->
case register_coordinator(Name) of
yes ->
{next_state, connecting, Data#{is_coordinator => true}};
no ->
%% TODO: this should not happen forever, if it does, we need to detect it
{keep_state_and_data, [
{state_timeout, ?WAIT_COORD_RETRY_INTERVAL, do_wait_for_coordinator}
]}
end;
%% Can be a prev stale pid?
%% Let it crash with case_clause if it happens...
Pid when is_pid(Pid) andalso Pid =/= self() ->
Data1 = Data#{coordinator_mon => erlang:monitor(process, Pid), coordinator_pid => Pid},
{next_state, following, Data1}
end;
wait_for_coordinator(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
%% Ignore any route op, until bootstrapping is started.
%% All ignored route ops are expected to be caught up during the bootstrap.
keep_state_and_data;
wait_for_coordinator(EventType, Event, Data) ->
handle_event_(?FUNCTION_NAME, EventType, Event, Data).
connecting(enter, _OldState, _Data) ->
{keep_state_and_data, [{state_timeout, 0, reconnect}]};
connecting(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
%% Ignore any route op, until bootstrapping is started.
%% All ignored route ops are expected to be caught up during the bootstrap.
keep_state_and_data;
connecting(_EventType, reconnect, Data) ->
ensure_conn_pool(init_linking, Data);
connecting(EventType, Event, Data) ->
handle_event_(?FUNCTION_NAME, EventType, Event, Data).
init_linking(enter, _OldState, Data) ->
{keep_state, Data#{link_retries => ?INIT_LINK_RETRIES}, [{state_timeout, 0, init_link}]};
init_linking(cast, {ack_link, _ClusterName, ReqId, Res}, #{link_req_id := ReqId} = Data) ->
case Res of
%% This state machine is not suitable to bootstrap the upstream cluster conditionally,
%% since it ignores any route ops received before bootstrapping...
{ok, #{proto_ver := _, need_bootstrap := _}} ->
{next_state, bootstrapping, maps:without([link_req_id, link_retries], Data)};
{error, <<"bad_upstream_name">>} ->
%% unrecoverable error that needs a user intervention,
%% TODO: maybe need to transition to some error state
{keep_state, maps:without([link_req_id, link_retries], Data), [{state_timeout, cancel}]}
end;
init_linking(_, init_link, #{link_conf := #{upstream := Name}, link_retries := Retries} = Data) ->
case Retries > 0 of
true ->
{ReqId, {ok, _}} = emqx_cluster_link_mqtt:init_link(Name),
Data1 = Data#{link_req_id => ReqId, link_retries => Retries - 1},
{keep_state, Data1, [{state_timeout, ?INIT_LINK_RESP_TIMEOUT, init_link}]};
false ->
?SLOG(error, #{
msg => "no_link_ack_response_received",
link_name => Name
}),
%% unrecoverable error that needs a user intervention,
%% TODO: maybe need to transition to some error state
keep_state_and_data
end;
init_linking(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
%% Ignore any route op, until bootstrapping is started.
%% All ignored route ops are expected to be caught up during the bootstrap.
keep_state_and_data;
init_linking(EventType, Event, Data) ->
handle_event_(?FUNCTION_NAME, EventType, Event, Data).
bootstrapping(enter, _OldState, #{link_conf := LinkConf} = Data) ->
#{topics := LinkFilters, upstream := ClusterName} = LinkConf,
%% TODO add timeout?
{Pid, Ref} = erlang:spawn_monitor(fun() -> bootstrap(ClusterName, LinkFilters) end),
{keep_state, Data#{bootstrap_pid => Pid, bootstrap_ref => Ref}};
bootstrapping(info, {'DOWN', Ref, process, _Pid, Reason}, #{bootstrap_ref := Ref} = Data) ->
%% TODO: think about the best way to proceed if bootstrapping failed,
%% perhaps just transition back to connecting state?
normal = Reason,
Data1 = maps:without([bootstrap_ref, bootstrap_pid], Data),
{next_state, coordinating, Data1};
%% Accumulate new route ops, since there is no guarantee
%% they will be included in the bootstrapped data
bootstrapping(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
{keep_state_and_data, [postpone]};
bootstrapping(EventType, Event, Data) ->
handle_event_(?FUNCTION_NAME, EventType, Event, Data).
coordinating(enter, _OldState, _Data) ->
keep_state_and_data;
coordinating(cast, {Op, Topic}, Data) when ?IS_ROUTE_OP(Op) ->
#{link_conf := #{upstream := ClusterName}} = Data,
%% TODO: batching
case emqx_cluster_link_mqtt:publish_route_op(async, ClusterName, Op, Topic) of
{error, _} ->
%% Conn pool error, reconnect.
{next_state, connecting, stop_conn_pool(Data)};
_Ref ->
keep_state_and_data
end;
%% TODO: this can also be received in other states, move to generic handler?
coordinating(info, {global_name_conflict, CoordName}, Data) ->
LogData = #{
msg => "emqx_cluster_link_coordinator_name_conflict",
coordinator_name => CoordName
},
LogData1 =
%% TODO: this can be a previous (self) coordinator?
case global:whereis_name(CoordName) of
undefined -> LogData;
Pid -> LogData#{new_coordinator => Pid, coordinator_node => node(Pid)}
end,
?SLOG(warning, LogData1),
Data1 = stop_conn_pool(Data),
{next_state, wait_for_coordinator, Data1#{is_coordinator => false}};
%% only errors results are expected
%% TODO: a single error causes reconnection and re-bootstrapping,
%% it's worth considering some optimizations.
coordinating(info, {pub_result, _Ref, {error, Reason}}, #{link_conf := #{upstream := Name}} = Data) ->
?SLOG(error, #{
msg => "failed_to_replicate_route_op_to_linked_cluster",
link_name => Name,
reason => Reason
}),
%% TODO: check errors, some may be not possible to correct by re-connecting
Data1 = stop_conn_pool(Data),
{next_state, connecting, Data1};
coordinating(EventType, Event, Data) ->
handle_event_(?FUNCTION_NAME, EventType, Event, Data).
following(enter, _OldState, _Data) ->
keep_state_and_data;
following(info, {'DOWN', MRef, process, _Pid, _Info}, #{coordinator_mon := MRef} = Data) ->
{next_state, wait_for_coordinator, maps:without([coordinator_mon, coordinator_pid], Data)};
following(EventType, Event, Data) ->
handle_event_(?FUNCTION_NAME, EventType, Event, Data).
handle_event_(_State, info, {'DOWN', Ref, process, _Pid, Reason}, Data) ->
case Data of
#{conn_pool_mons := #{Ref := WorkerName}, is_coordinator := true} ->
?SLOG(warning, #{
msg => "cluster_link_route_connection_is_down",
reason => Reason,
worker => WorkerName
}),
{next_state, connecting, stop_conn_pool(Data)};
_ ->
%% Must be a stale 'DOWN' msg (e.g., from the next worker) which is already handled.
keep_state_and_data
end;
handle_event_(State, EventType, Event, Data) ->
?SLOG(warning, #{
msg => "unexpected_event",
event => Event,
event_type => EventType,
state => State,
data => Data
}),
keep_state_and_data.
terminate(Reason, _State, #{link_conf := #{upstream := ClusterName}} = Data) ->
%% TODO unregister coordinator?
IsCoordinator = maps:get(is_coordinator, Data, false),
case Reason of
shutdown when IsCoordinator ->
%% must be sync, since we are going to stop the pool
%% NOTE: there is no guarantee that unlink op will arrive the last one
%% (since there may be other route op sent over another pool worker)
%% and clear everything, but it must be good enough to GC most of the routes.
_ = emqx_cluster_link_mqtt:remove_link(ClusterName);
_ ->
ok
end,
_ = stop_conn_pool(Data),
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
topic_intersect_any(Topic, [LinkFilter | T]) ->
case emqx_topic:intersection(Topic, LinkFilter) of
false -> topic_intersect_any(Topic, T);
TopicOrFilter -> TopicOrFilter
end;
topic_intersect_any(_Topic, []) ->
false.
bootstrap(ClusterName, LinkFilters) ->
%% TODO: do this in chunks
Topics = select_routes(LinkFilters),
{ok, _} = emqx_cluster_link_mqtt:publish_routes(sync, ClusterName, Topics).
%% TODO: if a local route matches link filter exactly,
%% it's enough to only select this matching filter itself and skip any other routes?
%% E.g., local routes: "t/global/#", "t/global/1/+", clsuter link topics = ["t/global/#"],
%% it's enough to replicate "t/global/#" only to the linked cluster.
%% What to do when "t/global/#" subscriber unsubscribers
%% and we start to get forwarded messages (e.g. "t/global/2/3") matching no subscribers?
%% How can we efficiently replace "t/global/#" route with "t/global/1/+"
%% (intersection of "t/global/#" and "t/global/#")?
%% So maybe better not to do it at all and replicate both "t/global/1/+" and "t/global/#" ?
select_routes(LinkFilters) ->
{Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters),
Routes = select_routes_by_topics(Topics),
Routes1 = intersecting_routes(Wildcards),
AllRoutes = Routes ++ Routes1,
case emqx_router:get_schema_vsn() of
v1 -> AllRoutes;
%% v2 stores filters (Wildcard subscriptions routes) in a separate index,
%% so WildcardRoutes contains only non-wildcard routes matching wildcard link filters.
%% Thus, we need to select wildcard routes additionally
v2 -> intersecting_routes_v2(Wildcards) ++ AllRoutes
end.
select_routes_by_topics([]) ->
[];
select_routes_by_topics([Topic | T]) ->
case filter_out_upstream_routes(emqx_router:match_routes(Topic)) of
[_ | _] ->
%% These are non-wildcard link topics, so we don't care about actual
%% routes as long as they are matched, and just need to replicate
%% topic routes to the linked cluster
[Topic | select_routes_by_topics(T)];
_ ->
select_routes_by_topics(T)
end.
filter_out_upstream_routes(Routes) ->
lists:filter(
fun
(#route{dest = ?UPSTREAM_DEST}) -> false;
(_) -> true
end,
Routes
).
%% selects only non-wildcard routes that match wildcards (filters),
%% can only be done as a linear search over all routes
intersecting_routes([]) ->
[];
intersecting_routes(Wildcards) ->
Res = ets:foldl(
fun
(#route{dest = ?UPSTREAM_DEST}, Acc) ->
Acc;
(#route{topic = T}, Acc) ->
%% TODO: probably nice to validate cluster link topic filters
%% to have no intersections between each other?
case topic_intersect_any(T, Wildcards) of
false -> Acc;
Intersection -> Acc#{Intersection => undefined}
end
end,
#{},
?ROUTE_TAB
),
maps:keys(Res).
intersecting_routes_v2([]) ->
[];
intersecting_routes_v2(Wildcards) ->
lists:foldl(
fun(Wildcard, Acc) ->
MatchedFilters = matched_filters_v2(Wildcard),
all_intersections(Wildcard, MatchedFilters, Acc)
end,
[],
Wildcards
).
matched_filters_v2(Wildcard) ->
MatchesAcc = lists:foldl(
fun(M, Acc) ->
case emqx_topic_index:get_id(M) of
?UPSTREAM_DEST ->
Acc;
_ ->
Acc#{emqx_topic_index:get_topic(M) => undefined}
end
end,
#{},
emqx_topic_index:matches_filter(Wildcard, ?ROUTE_TAB_FILTERS, [])
),
maps:keys(MatchesAcc).
all_intersections(Wildcard, [W | Wildcards], Acc) ->
case emqx_topic:intersection(Wildcard, W) of
false -> all_intersections(Wildcard, Wildcards, Acc);
Intersection -> all_intersections(Wildcard, Wildcards, [Intersection | Acc])
end;
all_intersections(_, [], Acc) ->
lists:usort(Acc).
maybe_cast(UpstreamName, Msg) ->
case global:whereis_name(?COORDINATOR(UpstreamName)) of
Pid when is_pid(Pid) ->
gen_statem:cast(Pid, Msg);
undefined ->
%% Ignore and rely on coordinator bootstrapping once it's elected
ok
end.
register_coordinator(UpstreamName) ->
case mria_config:role() of
core ->
global:register_name(
?COORDINATOR(UpstreamName), self(), fun global:random_notify_name/3
);
_ ->
no
end.
%% connecting state helper
ensure_conn_pool(NextState, #{link_conf := LinkConf} = Data) ->
Res = start_conn_pool(LinkConf),
Data1 = Data#{conn_pool => Res},
case Res of
{ok, _} ->
Data2 = Data1#{conn_pool_mons => mon_pool_workers(LinkConf)},
{next_state, NextState, Data2};
_Err ->
{keep_state, Data1, [{state_timeout, ?CONN_RETRY_INTERVAL, reconnect}]}
end.
start_conn_pool(LinkConf) ->
case emqx_cluster_link_mqtt:start_routing_pool(LinkConf) of
{ok, _Pid} = Ok ->
Ok;
{error, Reason} = Err ->
#{upstream := Name} = LinkConf,
?SLOG(error, #{
msg => "failed_to_connect_to_linked_cluster",
cluster_name => Name,
reason => Reason
}),
Err
end.
stop_conn_pool(#{link_conf := #{upstream := Name}} = Data) ->
case Data of
#{conn_pool := {ok, _}} ->
Data1 = maybe_unmointor_workers(Data),
Data1#{conn_pool => {stopped, emqx_cluster_link_mqtt:stop_routing_pool(Name)}};
_ ->
Data
end.
maybe_unmointor_workers(#{conn_pool_mons := MonitorsMap} = Data) ->
_ = maps:foreach(
fun(Mref, _Name) ->
erlang:demonitor(Mref)
end,
MonitorsMap
),
maps:remove(conn_pool_mons, Data);
maybe_unmointor_workers(Data) ->
Data.
mon_pool_workers(LinkConf) ->
maps:from_list([
{erlang:monitor(process, Pid), Name}
|| {Name, Pid} <- emqx_cluster_link_mqtt:routing_pool_workers(LinkConf)
]).

View File

@ -9,8 +9,6 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%-include_lib("emqtt/include/emqtt.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
-behaviour(ecpool_worker). -behaviour(ecpool_worker).
@ -30,16 +28,6 @@
-export([ -export([
ensure_msg_fwd_resource/1, ensure_msg_fwd_resource/1,
stop_msg_fwd_resource/1, stop_msg_fwd_resource/1,
start_routing_pool/1,
stop_routing_pool/1,
routing_pool_workers/1,
init_link/1,
ack_link/4,
remove_link/1,
publish_route_op/4,
publish_routes/3,
cleanup_routes/1,
decode_ctrl_msg/2,
decode_route_op/1, decode_route_op/1,
decode_forwarded_msg/1 decode_forwarded_msg/1
]). ]).
@ -54,37 +42,24 @@
forward/2 forward/2
]). ]).
-define(ROUTE_CLIENTID_SUFFIX, ":route:").
-define(MSG_CLIENTID_SUFFIX, ":msg:"). -define(MSG_CLIENTID_SUFFIX, ":msg:").
-define(CLIENTID(Base, Suffix), emqx_bridge_mqtt_lib:clientid_base([Base, Suffix])).
-define(MQTT_HOST_OPTS, #{default_port => 1883}). -define(MQTT_HOST_OPTS, #{default_port => 1883}).
-define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()). -define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()).
-define(ROUTE_TOPIC, <<?ROUTE_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>). -define(ROUTE_TOPIC, <<?ROUTE_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
-define(MSG_FWD_TOPIC, <<?MSG_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>). -define(MSG_FWD_TOPIC, <<?MSG_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
-define(CTRL_TOPIC(ClusterName), <<?CTRL_TOPIC_PREFIX, (ClusterName)/binary>>). %%-define(CTRL_TOPIC(ClusterName), <<?CTRL_TOPIC_PREFIX, (ClusterName)/binary>>).
%% ecpool and emqx_resource names
-define(ROUTE_POOL_PREFIX, "emqx_cluster_link_mqtt:route:").
-define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:"). -define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:").
-define(RES_NAME(Prefix, ClusterName), <<Prefix, ClusterName/binary>>). -define(RES_NAME(Prefix, ClusterName), <<Prefix, ClusterName/binary>>).
-define(ROUTE_POOL_NAME(ClusterName), ?RES_NAME(?ROUTE_POOL_PREFIX, ClusterName)). -define(ROUTE_POOL_NAME(ClusterName), ?RES_NAME(?ROUTE_POOL_PREFIX, ClusterName)).
-define(MSG_RES_ID(ClusterName), ?RES_NAME(?MSG_POOL_PREFIX, ClusterName)). -define(MSG_RES_ID(ClusterName), ?RES_NAME(?MSG_POOL_PREFIX, ClusterName)).
-define(HEALTH_CHECK_TIMEOUT, 1000). -define(HEALTH_CHECK_TIMEOUT, 1000).
-define(RES_GROUP, <<"emqx_cluster_link">>). -define(RES_GROUP, <<"emqx_cluster_link">>).
-define(DEFAULT_POOL_KEY, <<"default">>).
%% Protocol %% Protocol
-define(PROTO_VER, <<"1.0">>). %% -define(PROTO_VER, <<"1.0">>).
-define(INIT_LINK_OP, <<"init_link">>).
-define(ACK_LINK_OP, <<"ack_link">>).
-define(UNLINK_OP, <<"unlink">>).
-define(BATCH_ROUTES_OP, <<"add_routes">>).
-define(CLEANUP_ROUTES_OP, <<"cleanup_routes">>).
%% It's worth optimizing non-batch op payload size,
%% thus it's encoded as a plain binary
-define(TOPIC_WITH_OP(Op, Topic), <<Op/binary, "_", Topic/binary>>).
-define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])). -define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])).
-define(ENCODE(Payload), erlang:term_to_binary(Payload)). -define(ENCODE(Payload), erlang:term_to_binary(Payload)).
@ -290,121 +265,9 @@ connect(Options) ->
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Routing %% Protocol
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
routing_pool_workers(#{upstream := ClusterName} = _ClusterConf) ->
ecpool:workers(?ROUTE_POOL_NAME(ClusterName)).
start_routing_pool(#{upstream := ClusterName} = ClusterConf) ->
start_pool(?ROUTE_POOL_NAME(ClusterName), ?ROUTE_CLIENTID_SUFFIX, ClusterConf).
stop_routing_pool(ClusterName) ->
ecpool:stop_sup_pool(?ROUTE_POOL_NAME(ClusterName)).
init_link(ClusterName) ->
Payload = #{
<<"op">> => ?INIT_LINK_OP,
<<"proto_ver">> => ?PROTO_VER,
<<"upstream">> => ClusterName,
%% TODO: may no need to reserve it as it is a map?
<<"extra">> => #{}
},
ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)),
Properties = #{
'Response-Topic' => ?CTRL_TOPIC(ClusterName),
'Correlation-Data' => ReqId
},
Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME),
{ReqId, publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, Properties, Topic, ?QOS_1)}.
ack_link(ClusterName, Result, RespTopic, ReqId) ->
Payload = #{
<<"op">> => ?ACK_LINK_OP,
%% The links may compare and downgrade/adjust protocol in future
<<"proto_ver">> => ?PROTO_VER,
%% may be used in future to avoud re-bootrstrapping all the routes,
%% for example, if the connection was abrupted for a while but the cluster was healthy
%% and didn't lost any routes. In that case, retrying lost route updates would be sufficient.
%% For now, it's always true for simplicitiy reasons.
<<"need_bootstrap">> => true,
<<"extra">> => #{}
},
Payload1 =
case Result of
{ok, _} ->
Payload#{<<"result">> => <<"ok">>};
{error, Reason} ->
Payload#{<<"result">> => <<"error">>, reason => Reason}
end,
Props = #{'Correlation-Data' => ReqId},
Query = {RespTopic, Props, Payload1, ?QOS_1},
%% Using msg forwading resource to send the response back.
%% TODO: maybe async query?
emqx_resource:query(?MSG_RES_ID(ClusterName), Query, #{
query_mode => simple_sync, pick_key => RespTopic
}).
remove_link(ClusterName) ->
Payload = #{<<"op">> => ?UNLINK_OP},
Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME),
publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, Topic, ?QOS_0).
publish_routes(QueryType, ClusterName, Topics) ->
%% Picks the same pool worker consistently.
%% Although, as writes are idompotent we can pick it randomly - TBD.
publish_routes(QueryType, ClusterName, ?DEFAULT_POOL_KEY, Topics).
publish_routes(QueryType, ClusterName, PoolKey, Topics) ->
Payload = #{<<"op">> => ?BATCH_ROUTES_OP, <<"topics">> => Topics},
publish(QueryType, ClusterName, PoolKey, Payload).
cleanup_routes(ClusterName) ->
Payload = #{<<"op">> => ?CLEANUP_ROUTES_OP},
publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, ?ROUTE_TOPIC, ?QOS_0).
publish_route_op(QueryType, ClusterName, Op, Topic) when Op =:= <<"add">>; Op =:= <<"delete">> ->
Payload = ?TOPIC_WITH_OP(Op, Topic),
publish(QueryType, ClusterName, Topic, Payload).
publish(QueryType, ClusterName, PoolKey, Payload) ->
publish(QueryType, ClusterName, PoolKey, Payload, #{}).
publish(QueryType, ClusterName, PoolKey, Payload, Props) ->
%% Deletes are not implemented for now, writes are idempotent, so QOS_1 is fine.
publish(QueryType, ClusterName, PoolKey, Payload, Props, ?ROUTE_TOPIC, ?QOS_1).
publish(async, ClusterName, PoolKey, Payload, Props, Topic, QoS) ->
ecpool:pick_and_do(
{?ROUTE_POOL_NAME(ClusterName), PoolKey},
fun(ConnPid) ->
Ref = erlang:make_ref(),
Cb = {fun publish_result/3, [self(), Ref]},
emqtt:publish_async(
ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}], ?PUB_TIMEOUT, Cb
),
Ref
end,
no_handover
);
publish(sync, ClusterName, PoolKey, Payload, Props, Topic, QoS) ->
ecpool:pick_and_do(
{?ROUTE_POOL_NAME(ClusterName), PoolKey},
fun(ConnPid) ->
emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}])
end,
no_handover
).
publish_result(Caller, Ref, Result) ->
case handle_send_result(Result) of
ok ->
%% avoid extra message passing, we only care about errors for now
ok;
Err ->
Caller ! {pub_result, Ref, Err}
end.
%%% New leader-less Syncer/Actor implementation %%% New leader-less Syncer/Actor implementation
publish_actor_init_sync(ClientPid, Actor, Incarnation) -> publish_actor_init_sync(ClientPid, Actor, Incarnation) ->
@ -427,63 +290,6 @@ publish_route_sync(ClientPid, Actor, Incarnation, Updates) ->
}, },
emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1). emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1).
%%--------------------------------------------------------------------
%% Protocol
%%--------------------------------------------------------------------
decode_ctrl_msg(Payload, ClusterName) ->
decode_ctrl_msg1(?DECODE(Payload), ClusterName).
decode_ctrl_msg1(
#{
<<"op">> := ?INIT_LINK_OP,
<<"proto_ver">> := ProtoVer,
<<"upstream">> := UpstreamName
},
ClusterName
) ->
ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName),
%% UpstreamName is the name the remote linked cluster refers to this cluster,
%% so it must equal to the local cluster name, more clear naming is desired...
MyClusterName = ?MY_CLUSTER_NAME,
case UpstreamName of
MyClusterName ->
{init_link, {ok, #{proto_ver => ProtoVer1}}};
_ ->
?SLOG(error, #{
msg => "misconfigured_cluster_link_name",
%% How this cluster names itself
local_name => MyClusterName,
%% How the remote cluster names itself
link_name => ClusterName,
%% How the remote cluster names this local cluster
upstream_name => UpstreamName
}),
{init_link, {error, <<"bad_upstream_name">>}}
end;
decode_ctrl_msg1(
#{
<<"op">> := ?ACK_LINK_OP,
<<"result">> := <<"ok">>,
<<"proto_ver">> := ProtoVer,
<<"need_bootstrap">> := IsBootstrapNeeded
},
ClusterName
) ->
ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName),
{ack_link, {ok, #{proto_ver => ProtoVer1, need_bootstrap => IsBootstrapNeeded}}};
decode_ctrl_msg1(
#{
<<"op">> := ?ACK_LINK_OP,
<<"result">> := <<"error">>,
<<"reason">> := Reason
},
_ClusterName
) ->
{ack_link, {error, Reason}};
decode_ctrl_msg1(#{<<"op">> := ?UNLINK_OP}, _ClusterName) ->
unlink.
decode_route_op(Payload) -> decode_route_op(Payload) ->
decode_route_op1(?DECODE(Payload)). decode_route_op1(?DECODE(Payload)).
@ -501,14 +307,6 @@ decode_route_op1(#{
}) -> }) ->
RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps), RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps),
{route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1}; {route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1};
%%decode_route_op1(<<"add_", Topic/binary>>) ->
%% {add, Topic};
%%decode_route_op1(<<"delete_", Topic/binary>>) ->
%% {delete, Topic};
%%decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) ->
%% {add, Topics};
%%decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) ->
%% cleanup_routes;
decode_route_op1(Payload) -> decode_route_op1(Payload) ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "unexpected_cluster_link_route_op_payload", msg => "unexpected_cluster_link_route_op_payload",
@ -528,29 +326,6 @@ decode_forwarded_msg(Payload) ->
{error, Payload} {error, Payload}
end. end.
decode_proto_ver(ProtoVer, ClusterName) ->
{MyMajor, MyMinor} = decode_proto_ver1(?PROTO_VER),
case decode_proto_ver1(ProtoVer) of
{Major, Minor} = Res when
Major > MyMajor;
Minor > MyMinor
->
?SLOG(notice, #{
msg => "different_cluster_link_protocol_versions",
protocol_version => ?PROTO_VER,
link_protocol_version => ProtoVer,
link_name => ClusterName
}),
Res;
Res ->
Res
end.
decode_proto_ver1(ProtoVer) ->
[Major, Minor] = binary:split(ProtoVer, <<".">>),
%% Let it fail (for now), we don't expect invalid data to pass through the linking protocol..
{emqx_utils_conv:int(Major), emqx_utils_conv:int(Minor)}.
encode_field(route, {add, Route = {_Topic, _ID}}) -> encode_field(route, {add, Route = {_Topic, _ID}}) ->
Route; Route;
encode_field(route, {delete, {Topic, ID}}) -> encode_field(route, {delete, {Topic, ID}}) ->
@ -573,38 +348,7 @@ forward(ClusterName, #delivery{message = #message{topic = Topic} = Msg}) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
emqtt_client_opts( emqtt_client_opts(ClientIdSuffix, ClusterConf) ->
ClientIdSuffix, #{server := Server, ssl := #{enable := EnableSsl} = Ssl} = ClusterConf #{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf),
) -> ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]),
BaseClientId = maps:get(client_id, ClusterConf, ?MY_CLUSTER_NAME), Opts#{clientid => ClientId}.
ClientId = ?CLIENTID(BaseClientId, ClientIdSuffix),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
Opts = #{
host => Host,
port => Port,
clientid => ClientId,
proto_ver => v5,
ssl => EnableSsl,
ssl_opts => maps:to_list(maps:remove(enable, Ssl))
},
with_password(with_user(Opts, ClusterConf), ClusterConf).
with_user(Opts, #{username := U} = _ClusterConf) ->
Opts#{username => U};
with_user(Opts, _ClusterConf) ->
Opts.
with_password(Opts, #{password := P} = _ClusterConf) ->
Opts#{password => emqx_secret:unwrap(P)};
with_password(Opts, _ClusterConf) ->
Opts.
start_pool(PoolName, ClientIdSuffix, #{pool_size := PoolSize} = ClusterConf) ->
ClientOpts = emqtt_client_opts(ClientIdSuffix, ClusterConf),
Opts = [
{name, PoolName},
{pool_size, PoolSize},
{pool_type, hash},
{client_opts, ClientOpts}
],
ecpool:start_sup_pool(PoolName, ?MODULE, Opts).