From 43d114546c3c348b450d5f835b9c2269d5c909fe Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 20 May 2024 11:55:56 +0200 Subject: [PATCH] feat(cluster-link): preserve replication actor state in pdict --- .../src/emqx_cluster_link.erl | 131 ++++++++++-------- .../src/emqx_cluster_link_mqtt.erl | 17 ++- 2 files changed, 82 insertions(+), 66 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 515d8d125..2b469b114 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -86,15 +86,16 @@ should_route_to_external_dests(_Msg) -> on_message_publish( #message{topic = <>, payload = Payload} = Msg ) -> - _ = - case emqx_cluster_link_mqtt:decode_route_op(Payload) of - {actor_init, InitInfoMap} -> - actor_init(ClusterName, emqx_message:get_header(properties, Msg), InitInfoMap); - {route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} -> - update_routes(ClusterName, Actor, Incr, RouteOps); - {heartbeat, #{actor := Actor, incarnation := Incr}} -> - actor_heartbeat(ClusterName, Actor, Incr) - end, + case emqx_cluster_link_mqtt:decode_route_op(Payload) of + {actor_init, Actor, InitInfo} -> + Result = actor_init(ClusterName, Actor, InitInfo), + _ = actor_init_ack(Actor, Result, Msg), + ok; + {route_updates, #{actor := Actor}, RouteOps} -> + ok = update_routes(ClusterName, Actor, RouteOps); + {heartbeat, #{actor := Actor}} -> + ok = actor_heartbeat(ClusterName, Actor) + end, {stop, []}; on_message_publish(#message{topic = <>, payload = Payload}) -> case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of @@ -117,6 +118,9 @@ delete_hook() -> %% Internal functions %%-------------------------------------------------------------------- +-define(PD_EXTROUTER_ACTOR, '$clink_extrouter_actor'). +-define(PD_EXTROUTER_ACTOR_STATE, '$clink_extrouter_actor_state'). + maybe_push_route_op(Op, Topic, RouteID) -> maybe_push_route_op(Op, Topic, RouteID, push). @@ -143,70 +147,79 @@ topic_intersect_any(_Topic, []) -> actor_init( ClusterName, - #{'Correlation-Data' := ReqId, 'Response-Topic' := RespTopic}, + #{actor := Actor, incarnation := Incr}, #{ - actor := Actor, - incarnation := Incr, - cluster := TargetCluster, + target_cluster := TargetCluster, proto_ver := _ } ) -> - Res = - case emqx_cluster_link_config:link(ClusterName) of - undefined -> - ?SLOG( - error, - #{ - msg => "init_link_request_from_unknown_cluster", - link_name => ClusterName - } - ), - %% Avoid atom error reasons, since they can be sent to the remote cluster, - %% which will use safe binary_to_term decoding - %% TODO: add error details? - {error, <<"unknown_cluster">>}; - LinkConf -> - %% TODO: may be worth checking resource health and communicate it? - _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), - MyClusterName = emqx_cluster_link_config:cluster(), - case MyClusterName of - TargetCluster -> - Env = #{timestamp => erlang:system_time(millisecond)}, - emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incr, Env); - _ -> - %% The remote cluster uses a different name to refer to this cluster - ?SLOG(error, #{ - msg => "misconfigured_cluster_link_name", - %% How this cluster names itself - local_name => MyClusterName, - %% How the remote cluster names this local cluster - remote_name => TargetCluster, - %% How the remote cluster names itself - received_from => ClusterName - }), - {error, <<"bad_remote_cluster_link_name">>} - end - end, - _ = actor_init_ack(Actor, Res, ReqId, RespTopic), - {stop, []}. + case emqx_cluster_link_config:link(ClusterName) of + undefined -> + ?SLOG(error, #{ + msg => "init_link_request_from_unknown_cluster", + link_name => ClusterName + }), + %% Avoid atom error reasons, since they can be sent to the remote cluster, + %% which will use safe binary_to_term decoding + %% TODO: add error details? + {error, <<"unknown_cluster">>}; + LinkConf -> + %% TODO: may be worth checking resource health and communicate it? + _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + MyClusterName = emqx_cluster_link_config:cluster(), + case MyClusterName of + TargetCluster -> + Env = #{timestamp => erlang:system_time(millisecond)}, + {ok, ActorSt} = emqx_cluster_link_extrouter:actor_init( + ClusterName, Actor, Incr, Env + ), + undefined = set_actor_state(ClusterName, Actor, ActorSt), + ok; + _ -> + %% The remote cluster uses a different name to refer to this cluster + ?SLOG(error, #{ + msg => "misconfigured_cluster_link_name", + %% How this cluster names itself + local_name => MyClusterName, + %% How the remote cluster names this local cluster + remote_name => TargetCluster, + %% How the remote cluster names itself + received_from => ClusterName + }), + {error, <<"bad_remote_cluster_link_name">>} + end + end. -actor_init_ack(Actor, Res, ReqId, RespTopic) -> - RespMsg = emqx_cluster_link_mqtt:actor_init_ack_resp_msg(Actor, Res, ReqId, RespTopic), +actor_init_ack(#{actor := Actor}, Res, MsgIn) -> + RespMsg = emqx_cluster_link_mqtt:actor_init_ack_resp_msg(Actor, Res, MsgIn), emqx_broker:publish(RespMsg). -update_routes(ClusterName, Actor, Incarnation, RouteOps) -> - ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation), +update_routes(ClusterName, Actor, RouteOps) -> + ActorSt = get_actor_state(ClusterName, Actor), lists:foreach( fun(RouteOp) -> - emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorState) + emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorSt) end, RouteOps ). -actor_heartbeat(ClusterName, Actor, Incarnation) -> +actor_heartbeat(ClusterName, Actor) -> Env = #{timestamp => erlang:system_time(millisecond)}, - ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation), - _State = emqx_cluster_link_extrouter:actor_apply_operation(heartbeat, ActorState, Env). + ActorSt0 = get_actor_state(ClusterName, Actor), + ActorSt = emqx_cluster_link_extrouter:actor_apply_operation(heartbeat, ActorSt0, Env), + _ = update_actor_state(ActorSt), + ok. + +get_actor_state(ClusterName, Actor) -> + {ClusterName, Actor} = erlang:get(?PD_EXTROUTER_ACTOR), + erlang:get(?PD_EXTROUTER_ACTOR_STATE). + +set_actor_state(ClusterName, Actor, ActorSt) -> + undefined = erlang:put(?PD_EXTROUTER_ACTOR, {ClusterName, Actor}), + update_actor_state(ActorSt). + +update_actor_state(ActorSt) -> + erlang:put(?PD_EXTROUTER_ACTOR_STATE, ActorSt). %% let it crash if extra is not a map, %% we don't expect the message to be forwarded from an older EMQX release, diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index b126ce886..6091b6ffc 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -35,7 +35,7 @@ -export([ publish_actor_init_sync/6, - actor_init_ack_resp_msg/4, + actor_init_ack_resp_msg/3, publish_route_sync/4, publish_heartbeat/3, encode_field/2 @@ -277,13 +277,17 @@ publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incar }, emqtt:publish(ClientPid, ?ROUTE_TOPIC, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]). -actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) -> +actor_init_ack_resp_msg(Actor, InitRes, MsgIn) -> Payload = #{ ?F_OPERATION => ?OP_ACTOR_INIT_ACK, ?F_PROTO_VER => ?PROTO_VER, ?F_ACTOR => Actor }, Payload1 = with_res_and_bootstrap(Payload, InitRes), + #{ + 'Response-Topic' := RespTopic, + 'Correlation-Data' := ReqId + } = emqx_message:get_header(properties, MsgIn), emqx_message:make( undefined, ?QOS_1, @@ -334,12 +338,11 @@ decode_route_op1(#{ ?F_ACTOR := Actor, ?F_INCARNATION := Incr }) -> - {actor_init, #{ - actor => Actor, - incarnation => Incr, - cluster => TargetCluster, + Info = #{ + target_cluster => TargetCluster, proto_ver => ProtoVer - }}; + }, + {actor_init, #{actor => Actor, incarnation => Incr}, Info}; decode_route_op1(#{ ?F_OPERATION := ?OP_ROUTE, ?F_ACTOR := Actor,