feat(cluster-link): preserve replication actor state in pdict

This commit is contained in:
Andrew Mayorov 2024-05-20 11:55:56 +02:00 committed by Serge Tupchii
parent 5771a41a32
commit 43d114546c
2 changed files with 82 additions and 66 deletions

View File

@ -86,15 +86,16 @@ should_route_to_external_dests(_Msg) ->
on_message_publish(
#message{topic = <<?ROUTE_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload} = Msg
) ->
_ =
case emqx_cluster_link_mqtt:decode_route_op(Payload) of
{actor_init, InitInfoMap} ->
actor_init(ClusterName, emqx_message:get_header(properties, Msg), InitInfoMap);
{route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} ->
update_routes(ClusterName, Actor, Incr, RouteOps);
{heartbeat, #{actor := Actor, incarnation := Incr}} ->
actor_heartbeat(ClusterName, Actor, Incr)
end,
case emqx_cluster_link_mqtt:decode_route_op(Payload) of
{actor_init, Actor, InitInfo} ->
Result = actor_init(ClusterName, Actor, InitInfo),
_ = actor_init_ack(Actor, Result, Msg),
ok;
{route_updates, #{actor := Actor}, RouteOps} ->
ok = update_routes(ClusterName, Actor, RouteOps);
{heartbeat, #{actor := Actor}} ->
ok = actor_heartbeat(ClusterName, Actor)
end,
{stop, []};
on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of
@ -117,6 +118,9 @@ delete_hook() ->
%% Internal functions
%%--------------------------------------------------------------------
-define(PD_EXTROUTER_ACTOR, '$clink_extrouter_actor').
-define(PD_EXTROUTER_ACTOR_STATE, '$clink_extrouter_actor_state').
maybe_push_route_op(Op, Topic, RouteID) ->
maybe_push_route_op(Op, Topic, RouteID, push).
@ -143,70 +147,79 @@ topic_intersect_any(_Topic, []) ->
actor_init(
ClusterName,
#{'Correlation-Data' := ReqId, 'Response-Topic' := RespTopic},
#{actor := Actor, incarnation := Incr},
#{
actor := Actor,
incarnation := Incr,
cluster := TargetCluster,
target_cluster := TargetCluster,
proto_ver := _
}
) ->
Res =
case emqx_cluster_link_config:link(ClusterName) of
undefined ->
?SLOG(
error,
#{
msg => "init_link_request_from_unknown_cluster",
link_name => ClusterName
}
),
%% Avoid atom error reasons, since they can be sent to the remote cluster,
%% which will use safe binary_to_term decoding
%% TODO: add error details?
{error, <<"unknown_cluster">>};
LinkConf ->
%% TODO: may be worth checking resource health and communicate it?
_ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
MyClusterName = emqx_cluster_link_config:cluster(),
case MyClusterName of
TargetCluster ->
Env = #{timestamp => erlang:system_time(millisecond)},
emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incr, Env);
_ ->
%% The remote cluster uses a different name to refer to this cluster
?SLOG(error, #{
msg => "misconfigured_cluster_link_name",
%% How this cluster names itself
local_name => MyClusterName,
%% How the remote cluster names this local cluster
remote_name => TargetCluster,
%% How the remote cluster names itself
received_from => ClusterName
}),
{error, <<"bad_remote_cluster_link_name">>}
end
end,
_ = actor_init_ack(Actor, Res, ReqId, RespTopic),
{stop, []}.
case emqx_cluster_link_config:link(ClusterName) of
undefined ->
?SLOG(error, #{
msg => "init_link_request_from_unknown_cluster",
link_name => ClusterName
}),
%% Avoid atom error reasons, since they can be sent to the remote cluster,
%% which will use safe binary_to_term decoding
%% TODO: add error details?
{error, <<"unknown_cluster">>};
LinkConf ->
%% TODO: may be worth checking resource health and communicate it?
_ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
MyClusterName = emqx_cluster_link_config:cluster(),
case MyClusterName of
TargetCluster ->
Env = #{timestamp => erlang:system_time(millisecond)},
{ok, ActorSt} = emqx_cluster_link_extrouter:actor_init(
ClusterName, Actor, Incr, Env
),
undefined = set_actor_state(ClusterName, Actor, ActorSt),
ok;
_ ->
%% The remote cluster uses a different name to refer to this cluster
?SLOG(error, #{
msg => "misconfigured_cluster_link_name",
%% How this cluster names itself
local_name => MyClusterName,
%% How the remote cluster names this local cluster
remote_name => TargetCluster,
%% How the remote cluster names itself
received_from => ClusterName
}),
{error, <<"bad_remote_cluster_link_name">>}
end
end.
actor_init_ack(Actor, Res, ReqId, RespTopic) ->
RespMsg = emqx_cluster_link_mqtt:actor_init_ack_resp_msg(Actor, Res, ReqId, RespTopic),
actor_init_ack(#{actor := Actor}, Res, MsgIn) ->
RespMsg = emqx_cluster_link_mqtt:actor_init_ack_resp_msg(Actor, Res, MsgIn),
emqx_broker:publish(RespMsg).
update_routes(ClusterName, Actor, Incarnation, RouteOps) ->
ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation),
update_routes(ClusterName, Actor, RouteOps) ->
ActorSt = get_actor_state(ClusterName, Actor),
lists:foreach(
fun(RouteOp) ->
emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorState)
emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorSt)
end,
RouteOps
).
actor_heartbeat(ClusterName, Actor, Incarnation) ->
actor_heartbeat(ClusterName, Actor) ->
Env = #{timestamp => erlang:system_time(millisecond)},
ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation),
_State = emqx_cluster_link_extrouter:actor_apply_operation(heartbeat, ActorState, Env).
ActorSt0 = get_actor_state(ClusterName, Actor),
ActorSt = emqx_cluster_link_extrouter:actor_apply_operation(heartbeat, ActorSt0, Env),
_ = update_actor_state(ActorSt),
ok.
get_actor_state(ClusterName, Actor) ->
{ClusterName, Actor} = erlang:get(?PD_EXTROUTER_ACTOR),
erlang:get(?PD_EXTROUTER_ACTOR_STATE).
set_actor_state(ClusterName, Actor, ActorSt) ->
undefined = erlang:put(?PD_EXTROUTER_ACTOR, {ClusterName, Actor}),
update_actor_state(ActorSt).
update_actor_state(ActorSt) ->
erlang:put(?PD_EXTROUTER_ACTOR_STATE, ActorSt).
%% let it crash if extra is not a map,
%% we don't expect the message to be forwarded from an older EMQX release,

View File

@ -35,7 +35,7 @@
-export([
publish_actor_init_sync/6,
actor_init_ack_resp_msg/4,
actor_init_ack_resp_msg/3,
publish_route_sync/4,
publish_heartbeat/3,
encode_field/2
@ -277,13 +277,17 @@ publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incar
},
emqtt:publish(ClientPid, ?ROUTE_TOPIC, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]).
actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) ->
actor_init_ack_resp_msg(Actor, InitRes, MsgIn) ->
Payload = #{
?F_OPERATION => ?OP_ACTOR_INIT_ACK,
?F_PROTO_VER => ?PROTO_VER,
?F_ACTOR => Actor
},
Payload1 = with_res_and_bootstrap(Payload, InitRes),
#{
'Response-Topic' := RespTopic,
'Correlation-Data' := ReqId
} = emqx_message:get_header(properties, MsgIn),
emqx_message:make(
undefined,
?QOS_1,
@ -334,12 +338,11 @@ decode_route_op1(#{
?F_ACTOR := Actor,
?F_INCARNATION := Incr
}) ->
{actor_init, #{
actor => Actor,
incarnation => Incr,
cluster => TargetCluster,
Info = #{
target_cluster => TargetCluster,
proto_ver => ProtoVer
}};
},
{actor_init, #{actor => Actor, incarnation => Incr}, Info};
decode_route_op1(#{
?F_OPERATION := ?OP_ROUTE,
?F_ACTOR := Actor,