diff --git a/.gitignore b/.gitignore index 8d95669ac..e97571448 100644 --- a/.gitignore +++ b/.gitignore @@ -79,3 +79,7 @@ rebar-git-cache.tar apps/emqx_utils/src/emqx_variform_parser.erl apps/emqx_utils/src/emqx_variform_scan.erl default-profile.mk +# local +/_compat +/scratch +SCRATCH diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 7a64b0ff7..515d8d125 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -91,7 +91,9 @@ on_message_publish( {actor_init, InitInfoMap} -> actor_init(ClusterName, emqx_message:get_header(properties, Msg), InitInfoMap); {route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} -> - update_routes(ClusterName, Actor, Incr, RouteOps) + update_routes(ClusterName, Actor, Incr, RouteOps); + {heartbeat, #{actor := Actor, incarnation := Incr}} -> + actor_heartbeat(ClusterName, Actor, Incr) end, {stop, []}; on_message_publish(#message{topic = <>, payload = Payload}) -> @@ -201,6 +203,11 @@ update_routes(ClusterName, Actor, Incarnation, RouteOps) -> RouteOps ). +actor_heartbeat(ClusterName, Actor, Incarnation) -> + Env = #{timestamp => erlang:system_time(millisecond)}, + ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation), + _State = emqx_cluster_link_extrouter:actor_apply_operation(heartbeat, ActorState, Env). + %% let it crash if extra is not a map, %% we don't expect the message to be forwarded from an older EMQX release, %% that doesn't set extra = #{} by default. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index 9b6cf07e2..6e4fff7c7 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -244,7 +244,6 @@ apply_operation(Entry, OpName, Lane) -> %% This is safe sequence of operations only on core nodes. On replicants, %% `mria:dirty_update_counter/3` will be replicated asynchronously, which %% means this read can be stale. - % MCounter = ets:lookup_element(Tab, Entry, 2, 0), case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of [#extroute{mcounter = MCounter}] -> apply_operation(Entry, MCounter, OpName, Lane); diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index db4e39224..b126ce886 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -37,6 +37,7 @@ publish_actor_init_sync/6, actor_init_ack_resp_msg/4, publish_route_sync/4, + publish_heartbeat/3, encode_field/2 ]). @@ -62,6 +63,7 @@ -define(F_OPERATION, '$op'). -define(OP_ROUTE, <<"route">>). +-define(OP_HEARTBEAT, <<"heartbeat">>). -define(OP_ACTOR_INIT, <<"actor_init">>). -define(OP_ACTOR_INIT_ACK, <<"actor_init_ack">>). @@ -262,7 +264,6 @@ connect(Options) -> %%% New leader-less Syncer/Actor implementation publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incarnation) -> - PubTopic = ?ROUTE_TOPIC, Payload = #{ ?F_OPERATION => ?OP_ACTOR_INIT, ?F_PROTO_VER => ?PROTO_VER, @@ -274,7 +275,7 @@ publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incar 'Response-Topic' => RespTopic, 'Correlation-Data' => ReqId }, - emqtt:publish(ClientPid, PubTopic, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]). + emqtt:publish(ClientPid, ?ROUTE_TOPIC, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]). actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) -> Payload = #{ @@ -304,14 +305,21 @@ with_res_and_bootstrap(Payload, Error) -> }. publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> - PubTopic = ?ROUTE_TOPIC, Payload = #{ ?F_OPERATION => ?OP_ROUTE, ?F_ACTOR => Actor, ?F_INCARNATION => Incarnation, ?F_ROUTES => Updates }, - emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1). + emqtt:publish(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_1). + +publish_heartbeat(ClientPid, Actor, Incarnation) -> + Payload = #{ + ?F_OPERATION => ?OP_HEARTBEAT, + ?F_ACTOR => Actor, + ?F_INCARNATION => Incarnation + }, + emqtt:publish_async(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_0, undefined). decode_route_op(Payload) -> decode_route_op1(?DECODE(Payload)). @@ -340,6 +348,12 @@ decode_route_op1(#{ }) -> RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps), {route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1}; +decode_route_op1(#{ + ?F_OPERATION := ?OP_HEARTBEAT, + ?F_ACTOR := Actor, + ?F_INCARNATION := Incr +}) -> + {heartbeat, #{actor => Actor, incarnation => Incr}}; decode_route_op1(Payload) -> ?SLOG(warning, #{ msg => "unexpected_cluster_link_route_op_payload", diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 3c87b9e37..dc3903f72 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -53,6 +53,7 @@ -define(RECONNECT_TIMEOUT, 5_000). -define(ACTOR_REINIT_TIMEOUT, 7000). +-define(HEARTBEAT_INTERVAL, 10_000). -define(CLIENT_SUFFIX, ":routesync:"). -define(PS_CLIENT_SUFFIX, ":routesync-ps:"). @@ -180,6 +181,10 @@ publish_routes(ClientPid, Actor, Incarnation, Updates) -> #{} ). +publish_heartbeat(ClientPid, Actor, Incarnation) -> + %% NOTE: Fully asynchronous, no need for error handling. + emqx_cluster_link_mqtt:publish_heartbeat(ClientPid, Actor, Incarnation). + %% Route syncer start_syncer(TargetCluster, Actor, Incr) -> @@ -294,6 +299,7 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> client :: {pid(), reference()}, bootstrapped :: boolean(), reconnect_timer :: reference(), + heartbeat_timer :: reference(), actor_init_req_id :: binary(), actor_init_timer :: reference(), remote_actor_info :: undefined | map(), @@ -366,6 +372,8 @@ handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) -> _ = maybe_alarm(Reason, St), {noreply, init_remote_actor(St#st{reconnect_timer = undefined, status = disconnected, error = Reason})}; +handle_info({timeout, TRef, _Heartbeat}, St = #st{heartbeat_timer = TRef}) -> + {noreply, process_heartbeat(St#st{heartbeat_timer = undefined})}; %% Stale timeout. handle_info({timeout, _, _}, St) -> {noreply, St}; @@ -420,7 +428,9 @@ post_actor_init( NeedBootstrap ) -> ok = start_syncer(TargetCluster, Actor, Incr), - process_bootstrap(St#st{client = ClientPid}, NeedBootstrap). + %% TODO: Heartbeats are currently blocked by bootstrapping. + NSt = schedule_heartbeat(St#st{client = ClientPid}), + process_bootstrap(NSt, NeedBootstrap). handle_connect_error(Reason, St) -> ?SLOG(error, #{ @@ -455,6 +465,14 @@ process_bootstrap(St = #st{bootstrapped = true}, NeedBootstrap) -> process_bootstrapped(St) end. +process_heartbeat(St = #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) -> + ok = publish_heartbeat(ClientPid, Actor, Incarnation), + schedule_heartbeat(St). + +schedule_heartbeat(St = #st{heartbeat_timer = undefined}) -> + TRef = erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat), + St#st{heartbeat_timer = TRef}. + %% Bootstrapping. %% Responsible for transferring local routing table snapshot to the target %% cluster. Does so either during the initial startup or when MQTT connection