diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl index dd2544114..3ee7e9fdf 100644 --- a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -5,6 +5,12 @@ -define(TOPIC_PREFIX, "$LINK/cluster/"). -define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/"). -define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/"). +-define(RESP_TOPIC_PREFIX, ?TOPIC_PREFIX "resp/"). + +-define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()). +-define(ROUTE_TOPIC, <>). +-define(MSG_FWD_TOPIC, <>). +-define(RESP_TOPIC(Actor), <>). %% Fairly compact text encoding. -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 846204066..37456faea 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -83,11 +83,13 @@ should_route_to_external_dests(_Msg) -> %% EMQX Hooks %%-------------------------------------------------------------------- -on_message_publish(#message{topic = <>, payload = Payload}) -> +on_message_publish( + #message{topic = <>, payload = Payload} = Msg +) -> _ = case emqx_cluster_link_mqtt:decode_route_op(Payload) of - {actor_init, #{actor := Actor, incarnation := Incr}} -> - actor_init(ClusterName, Actor, Incr); + {actor_init, InitInfoMap} -> + actor_init(ClusterName, emqx_message:get_header(properties, Msg), InitInfoMap); {route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} -> update_routes(ClusterName, Actor, Incr, RouteOps) end, @@ -137,9 +139,61 @@ topic_intersect_any(Topic, [LinkFilter | T]) -> topic_intersect_any(_Topic, []) -> false. -actor_init(ClusterName, Actor, Incarnation) -> - Env = #{timestamp => erlang:system_time(millisecond)}, - {ok, _} = emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incarnation, Env). +actor_init( + ClusterName, + #{'Correlation-Data' := ReqId, 'Response-Topic' := RespTopic}, + #{ + actor := Actor, + incarnation := Incr, + cluster := TargetCluster, + proto_ver := _ + } +) -> + Res = + case emqx_cluster_link_config:link(ClusterName) of + undefined -> + ?SLOG( + error, + #{ + msg => "init_link_request_from_unknown_cluster", + link_name => ClusterName + } + ), + %% Avoid atom error reasons, since they can be sent to the remote cluster, + %% which will use safe binary_to_term decoding + %% TODO: add error details? + {error, <<"unknown_cluster">>}; + LinkConf -> + %% TODO: may be worth checking resource health and communicate it? + _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + MyClusterName = emqx_cluster_link_config:cluster(), + case MyClusterName of + TargetCluster -> + Env = #{timestamp => erlang:system_time(millisecond)}, + {ok, _} = emqx_cluster_link_extrouter:actor_init( + ClusterName, Actor, Incr, Env + ), + ok; + _ -> + %% The remote cluster uses a different name to refer to this cluster + ?SLOG(error, #{ + msg => "misconfigured_cluster_link_name", + %% How this cluster names itself + local_name => MyClusterName, + %% How the remote cluster names this local cluster + remote_name => TargetCluster, + %% How the remote cluster names itself + received_from => ClusterName + }), + {error, <<"bad_remote_cluster_link_name">>} + end + end, + _ = actor_init_ack(Actor, Res, ReqId, RespTopic), + {stop, []}. + +actor_init_ack(Actor, Res, ReqId, RespTopic) -> + RespMsg = emqx_cluster_link_mqtt:actor_init_ack_resp_msg(Actor, Res, ReqId, RespTopic), + emqx_broker:publish(RespMsg). update_routes(ClusterName, Actor, Incarnation, RouteOps) -> ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation), diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index d62965bb2..3ebfd6de6 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -29,11 +29,13 @@ ensure_msg_fwd_resource/1, stop_msg_fwd_resource/1, decode_route_op/1, - decode_forwarded_msg/1 + decode_forwarded_msg/1, + decode_resp/1 ]). -export([ - publish_actor_init_sync/3, + publish_actor_init_sync/6, + actor_init_ack_resp_msg/4, publish_route_sync/4, encode_field/2 ]). @@ -45,11 +47,6 @@ -define(MSG_CLIENTID_SUFFIX, ":msg:"). -define(MQTT_HOST_OPTS, #{default_port => 1883}). --define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()). - --define(ROUTE_TOPIC, <>). --define(MSG_FWD_TOPIC, <>). -%%-define(CTRL_TOPIC(ClusterName), <>). -define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:"). -define(RES_NAME(Prefix, ClusterName), <>). @@ -58,8 +55,7 @@ -define(HEALTH_CHECK_TIMEOUT, 1000). -define(RES_GROUP, <<"emqx_cluster_link">>). -%% Protocol -%% -define(PROTO_VER, <<"1.0">>). +-define(PROTO_VER, 1). -define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])). -define(ENCODE(Payload), erlang:term_to_binary(Payload)). @@ -67,10 +63,14 @@ -define(F_OPERATION, '$op'). -define(OP_ROUTE, <<"route">>). -define(OP_ACTOR_INIT, <<"actor_init">>). +-define(OP_ACTOR_INIT_ACK, <<"actor_init_ack">>). -define(F_ACTOR, 10). -define(F_INCARNATION, 11). -define(F_ROUTES, 12). +-define(F_TARGET_CLUSTER, 13). +-define(F_PROTO_VER, 14). +-define(F_RESULT, 15). -define(ROUTE_DELETE, 100). @@ -128,16 +128,6 @@ on_query(_ResourceId, FwdMsg, #{pool_name := PoolName, topic := LinkTopic} = _St end, no_handover ) - ); -on_query(_ResourceId, {Topic, Props, Payload, QoS}, #{pool_name := PoolName} = _State) -> - handle_send_result( - ecpool:pick_and_do( - {PoolName, Topic}, - fun(ConnPid) -> - emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}]) - end, - no_handover - ) ). on_query_async( @@ -270,15 +260,36 @@ connect(Options) -> %%% New leader-less Syncer/Actor implementation -publish_actor_init_sync(ClientPid, Actor, Incarnation) -> - %% TODO: handshake (request / response) to make sure the link is established +publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incarnation) -> PubTopic = ?ROUTE_TOPIC, Payload = #{ ?F_OPERATION => ?OP_ACTOR_INIT, + ?F_PROTO_VER => ?PROTO_VER, + ?F_TARGET_CLUSTER => TargetCluster, ?F_ACTOR => Actor, ?F_INCARNATION => Incarnation }, - emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1). + Properties = #{ + 'Response-Topic' => RespTopic, + 'Correlation-Data' => ReqId + }, + emqtt:publish(ClientPid, PubTopic, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]). + +actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) -> + Payload = #{ + ?F_OPERATION => ?OP_ACTOR_INIT_ACK, + ?F_PROTO_VER => ?PROTO_VER, + ?F_ACTOR => Actor, + ?F_RESULT => InitRes + }, + emqx_message:make( + undefined, + ?QOS_1, + RespTopic, + ?ENCODE(Payload), + #{}, + #{properties => #{'Correlation-Data' => ReqId}} + ). publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> PubTopic = ?ROUTE_TOPIC, @@ -293,12 +304,22 @@ publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> decode_route_op(Payload) -> decode_route_op1(?DECODE(Payload)). +decode_resp(Payload) -> + decode_resp1(?DECODE(Payload)). + decode_route_op1(#{ ?F_OPERATION := ?OP_ACTOR_INIT, + ?F_PROTO_VER := ProtoVer, + ?F_TARGET_CLUSTER := TargetCluster, ?F_ACTOR := Actor, ?F_INCARNATION := Incr }) -> - {actor_init, #{actor => Actor, incarnation => Incr}}; + {actor_init, #{ + actor => Actor, + incarnation => Incr, + cluster => TargetCluster, + proto_ver => ProtoVer + }}; decode_route_op1(#{ ?F_OPERATION := ?OP_ROUTE, ?F_ACTOR := Actor, @@ -314,6 +335,14 @@ decode_route_op1(Payload) -> }), {error, Payload}. +decode_resp1(#{ + ?F_OPERATION := ?OP_ACTOR_INIT_ACK, + ?F_ACTOR := Actor, + ?F_PROTO_VER := ProtoVer, + ?F_RESULT := InitResult +}) -> + {actor_init_ack, #{actor => Actor, result => InitResult, proto_ver => ProtoVer}}. + decode_forwarded_msg(Payload) -> case ?DECODE(Payload) of #message{} = Msg -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 2a19a54b3..506eeb176 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -4,6 +4,8 @@ -module(emqx_cluster_link_router_syncer). -include_lib("emqtt/include/emqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include("emqx_cluster_link.hrl"). %% API -export([start_link/1]). @@ -50,9 +52,10 @@ -define(ERROR_DELAY, 200). -define(RECONNECT_TIMEOUT, 5_000). +-define(ACTOR_REINIT_TIMEOUT, 7000). --define(CLIENT_SUFFIX, ":routesync"). --define(PS_CLIENT_SUFFIX, ":routesync-ps"). +-define(CLIENT_SUFFIX, ":routesync:"). +-define(PS_CLIENT_SUFFIX, ":routesync-ps:"). %% Special actor for persistent routes that has the same actor name on all nodes. %% Node actors with the same name nay race with each other (e.g. during bootstrap), @@ -65,6 +68,21 @@ -define(PS_SYNCER_REF(Cluster), {via, gproc, ?PS_SYNCER_NAME(Cluster)}). -define(PS_SYNCER_NAME(Cluster), ?NAME(Cluster, ps_syncer)). +-define(SAFE_MQTT_PUB(Expr, ClientPid), ?SAFE_MQTT_PUB(Expr, ClientPid, ok)). +-define(SAFE_MQTT_PUB(Expr, ClientPid, OnSuccess), + try Expr of + {ok, #{reason_code := __RC}} when __RC < ?RC_UNSPECIFIED_ERROR -> + OnSuccess; + {ok, #{reason_code_name := __RCN}} -> + {error, {mqtt, __RCN}}; + {error, __Reason} -> + {error, __Reason} + catch + exit:__Reason -> + {error, {client, ClientPid, __Reason}} + end +). + push(TargetCluster, OpName, Topic, ID) -> do_push(?SYNCER_NAME(TargetCluster), OpName, Topic, ID). @@ -164,17 +182,11 @@ announce_client(Actor, TargetCluster, Pid) -> ok. publish_routes(ClientPid, Actor, Incarnation, Updates) -> - try emqx_cluster_link_mqtt:publish_route_sync(ClientPid, Actor, Incarnation, Updates) of - {ok, #{reason_code := RC}} when RC < ?RC_UNSPECIFIED_ERROR -> - #{}; - {ok, #{reason_code_name := RCN}} -> - {error, {mqtt, RCN}}; - {error, Reason} -> - {error, Reason} - catch - exit:Reason -> - {error, {client, ClientPid, Reason}} - end. + ?SAFE_MQTT_PUB( + emqx_cluster_link_mqtt:publish_route_sync(ClientPid, Actor, Incarnation, Updates), + ClientPid, + #{} + ). %% Route syncer @@ -227,6 +239,7 @@ batch_get_opname(Op) -> init({sup, TargetCluster}) -> %% FIXME: Intensity. SupFlags = #{ + %% TODO: one_for_one? strategy => one_for_all, intensity => 10, period => 60 @@ -288,7 +301,10 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> incarnation :: non_neg_integer(), client :: {pid(), reference()}, bootstrapped :: boolean(), - reconnect_timer :: reference() + reconnect_timer :: reference(), + actor_init_req_id :: binary(), + actor_init_timer :: reference(), + remote_actor_info :: undefined | map() }). mk_state(TargetCluster, Actor, Incarnation) -> @@ -314,27 +330,91 @@ handle_cast(_Request, State) -> handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) -> {noreply, handle_client_down(Reason, St)}; -handle_info({timeout, TRef, _Reconnect}, St = #st{reconnect_timer = TRef}) -> +handle_info( + {publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}}, + St = #st{actor_init_req_id = ReqId} +) -> + {actor_init_ack, #{result := Res} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(Payload), + St1 = St#st{ + actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap + }, + case Res of + ok -> + {noreply, post_actor_init(St1)}; + Error -> + ?SLOG(error, #{ + msg => "failed_to_init_link", + reason => Error, + target_cluster => St#st.target, + actor => St#st.actor, + remote_link_proto_ver => maps:get(proto_ver, AckInfoMap, undefined) + }), + %% TODO: It doesn't fit permanent workers/one_for_all restart/strategy. + %% The actor may be kept alive with some error status instead (waiting for a user intervention to fix it)? + {stop, {shutdown, Error}, St1} + end; +handle_info({publish, #{}}, St) -> + {noreply, St}; +handle_info({timeout, TRef, reconnect}, St = #st{reconnect_timer = TRef}) -> {noreply, process_connect(St#st{reconnect_timer = undefined})}; -handle_info(_Info, St) -> - %% TODO: log? +handle_info({timeout, TRef, actor_reinit}, St = #st{reconnect_timer = TRef}) -> + ?SLOG(error, #{ + msg => "remote_actor_init_timeout", + target_cluster => St#st.target, + actor => St#st.actor + }), + {noreply, init_remote_actor(St#st{reconnect_timer = undefined})}; +%% Stale timeout. +handle_info({timeout, _, _}, St) -> + {noreply, St}; +handle_info(Info, St) -> + ?SLOG(warning, #{msg => "unexpected_info", info => Info}), {noreply, St}. terminate(_Reason, _State) -> ok. -process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) -> +process_connect(St = #st{target = TargetCluster, actor = Actor}) -> case start_link_client(TargetCluster, Actor) of {ok, ClientPid} -> - %% TODO: error handling, handshake - {ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr), - ok = start_syncer(TargetCluster, Actor, Incr), ok = announce_client(Actor, TargetCluster, ClientPid), - process_bootstrap(St#st{client = ClientPid}); + %% TODO: handle subscribe errors + {ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1), + init_remote_actor(St#st{client = ClientPid}); {error, Reason} -> handle_connect_error(Reason, St) end. +init_remote_actor( + St = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr} +) -> + ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)), + Res = ?SAFE_MQTT_PUB( + emqx_cluster_link_mqtt:publish_actor_init_sync( + ClientPid, ReqId, ?RESP_TOPIC(Actor), TargetCluster, Actor, Incr + ), + ClientPid + ), + case Res of + ok -> + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_init_remote_actor", + reason => Reason, + target_cluster => TargetCluster, + actor => Actor + }) + end, + TRef = erlang:start_timer(?ACTOR_REINIT_TIMEOUT, self(), actor_reinit), + St#st{actor_init_req_id = ReqId, actor_init_timer = TRef}. + +post_actor_init( + St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr} +) -> + ok = start_syncer(TargetCluster, Actor, Incr), + process_bootstrap(St#st{client = ClientPid}). + handle_connect_error(_Reason, St) -> %% TODO: logs TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect), @@ -342,6 +422,7 @@ handle_connect_error(_Reason, St) -> handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) -> %% TODO: logs + %% TODO: syncer may be already down due to one_for_all strategy ok = close_syncer(TargetCluster, Actor), process_connect(St#st{client = undefined}).