From 94e81ba81269dc61a88eda8eef7468cd679e2a16 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 3 Jun 2024 21:17:46 +0300 Subject: [PATCH] feat(clusterlink): implement actor config handler --- .../src/emqx_cluster_link.erl | 35 ++++- .../src/emqx_cluster_link_api.erl | 116 +++++++++++++++ .../src/emqx_cluster_link_app.erl | 14 +- .../src/emqx_cluster_link_config.erl | 136 ++++++++++++------ .../src/emqx_cluster_link_extrouter.erl | 11 +- .../src/emqx_cluster_link_mqtt.erl | 26 ++-- .../emqx_cluster_link_router_bootstrap.erl | 5 +- .../src/emqx_cluster_link_router_syncer.erl | 95 ++++++------ .../src/emqx_cluster_link_schema.erl | 23 +-- .../src/emqx_cluster_link_sup.erl | 29 +++- 10 files changed, 354 insertions(+), 136 deletions(-) create mode 100644 apps/emqx_cluster_link/src/emqx_cluster_link_api.erl diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index a567fb6cc..fd5280262 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -7,6 +7,7 @@ -behaviour(emqx_external_broker). -export([ + is_registered/0, register_external_broker/0, unregister_external_broker/0, add_route/1, @@ -36,8 +37,14 @@ %% emqx_external_broker API %%-------------------------------------------------------------------- +is_registered() -> + emqx_external_broker:provider() =:= ?MODULE. + register_external_broker() -> - emqx_external_broker:register_provider(?MODULE). + case is_registered() of + true -> ok; + false -> emqx_external_broker:register_provider(?MODULE) + end. unregister_external_broker() -> emqx_external_broker:unregister_provider(?MODULE). @@ -94,13 +101,18 @@ on_message_publish( {route_updates, #{actor := Actor}, RouteOps} -> ok = update_routes(ClusterName, Actor, RouteOps); {heartbeat, #{actor := Actor}} -> - ok = actor_heartbeat(ClusterName, Actor) + ok = actor_heartbeat(ClusterName, Actor); + {error, {unknown_payload, ParsedPayload}} -> + ?SLOG(warning, #{ + msg => "unexpected_cluster_link_route_op_payload", + payload => ParsedPayload + }) end, {stop, []}; on_message_publish(#message{topic = <>, payload = Payload}) -> case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of #message{} = ForwardedMsg -> - {stop, with_sender_name(ForwardedMsg, ClusterName)}; + {stop, maybe_filter_incomming_msg(ForwardedMsg, ClusterName)}; _Err -> %% Just ignore it. It must be already logged by the decoder {stop, []} @@ -163,9 +175,7 @@ actor_init( %% which will use safe binary_to_term decoding %% TODO: add error details? {error, <<"unknown_cluster">>}; - LinkConf -> - %% TODO: may be worth checking resource health and communicate it? - _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + #{enable := true} = _LinkConf -> MyClusterName = emqx_cluster_link_config:cluster(), case MyClusterName of TargetCluster -> @@ -187,7 +197,9 @@ actor_init( received_from => ClusterName }), {error, <<"bad_remote_cluster_link_name">>} - end + end; + #{enable := false} -> + {error, <<"clster_link_disabled">>} end. actor_init_ack(#{actor := Actor}, Res, MsgIn) -> @@ -226,3 +238,12 @@ update_actor_state(ActorSt) -> %% that doesn't set extra = #{} by default. with_sender_name(#message{extra = Extra} = Msg, ClusterName) when is_map(Extra) -> Msg#message{extra = Extra#{link_origin => ClusterName}}. + +maybe_filter_incomming_msg(#message{topic = T} = Msg, ClusterName) -> + %% Should prevent irrelevant messages from being dispatched in case + %% the remote routing state lags behind the local config changes. + #{enable := Enable, topics := Topics} = emqx_cluster_link_config:link(ClusterName), + case Enable andalso emqx_topic:match_any(T, Topics) of + true -> with_sender_name(Msg, ClusterName); + false -> [] + end. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl new file mode 100644 index 000000000..c74d2d3f7 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -0,0 +1,116 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_api). + +-behaviour(minirest_api). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/http_api.hrl"). + +-export([ + api_spec/0, + paths/0, + schema/1 +]). + +-export([config/2]). + +-define(CONF_PATH, [cluster, links]). +-define(TAGS, [<<"Cluster">>]). + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/cluster/links" + ]. + +schema("/cluster/links") -> + #{ + 'operationId' => config, + get => + #{ + description => "Get cluster links configuration", + tags => ?TAGS, + responses => + #{200 => links_config_schema()} + }, + put => + #{ + description => "Update cluster links configuration", + tags => ?TAGS, + 'requestBody' => links_config_schema(), + responses => + #{ + 200 => links_config_schema(), + 400 => + emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST], <<"Update Config Failed">> + ) + } + } + }. + +%%-------------------------------------------------------------------- +%% API Handler funcs +%%-------------------------------------------------------------------- + +config(get, _Params) -> + {200, get_raw()}; +config(put, #{body := Body}) -> + case emqx_cluster_link_config:update(Body) of + {ok, NewConfig} -> + {200, NewConfig}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {400, ?BAD_REQUEST, Message} + end. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +get_raw() -> + #{<<"links">> := Conf} = + emqx_config:fill_defaults( + #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)}, + #{obfuscate_sensitive_values => true} + ), + Conf. + +links_config_schema() -> + emqx_cluster_link_schema:links_schema( + #{ + examples => #{<<"example">> => links_config_example()} + } + ). + +links_config_example() -> + [ + #{ + <<"enable">> => true, + <<"pool_size">> => 10, + <<"server">> => <<"emqxcl_b.host:1883">>, + <<"ssl">> => #{<<"enable">> => false}, + <<"topics">> => + [ + <<"t/topic-example">>, + <<"t/topic-filter-example/1/#">> + ], + <<"upstream">> => <<"emqxcl_b">> + }, + #{ + <<"enable">> => true, + <<"pool_size">> => 10, + <<"server">> => <<"emqxcl_c.host:1883">>, + <<"ssl">> => #{<<"enable">> => false}, + <<"topics">> => + [ + <<"t/topic-example">>, + <<"t/topic-filter-example/1/#">> + ], + <<"upstream">> => <<"emqxcl_c">> + } + ]. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl index f05c5c1a0..750387ca9 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -13,7 +13,7 @@ start(_StartType, _StartArgs) -> ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()), emqx_cluster_link_config:add_handler(), - LinksConf = enabled_links(), + LinksConf = emqx_cluster_link_config:enabled_links(), _ = case LinksConf of [_ | _] -> @@ -32,19 +32,13 @@ prep_stop(State) -> stop(_State) -> _ = emqx_cluster_link:delete_hook(), _ = emqx_cluster_link:unregister_external_broker(), - _ = stop_msg_fwd_resources(emqx_cluster_link_config:links()), + _ = remove_msg_fwd_resources(emqx_cluster_link_config:links()), ok. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -enabled_links() -> - lists:filter( - fun(#{enable := IsEnabled}) -> IsEnabled =:= true end, - emqx_cluster_link_config:links() - ). - start_msg_fwd_resources(LinksConf) -> lists:foreach( fun(LinkConf) -> @@ -53,10 +47,10 @@ start_msg_fwd_resources(LinksConf) -> LinksConf ). -stop_msg_fwd_resources(LinksConf) -> +remove_msg_fwd_resources(LinksConf) -> lists:foreach( fun(#{upstream := Name}) -> - emqx_cluster_link_mqtt:stop_msg_fwd_resource(Name) + emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name) end, LinksConf ). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 568d1a69d..67dc267e6 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -21,6 +21,7 @@ -export([ %% General + update/1, cluster/0, enabled_links/0, links/0, @@ -47,6 +48,20 @@ %% +update(Config) -> + case + emqx_conf:update( + ?LINKS_PATH, + Config, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, #{raw_config := NewConfigRows}} -> + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + cluster() -> atom_to_binary(emqx_config:get([cluster, name])). @@ -83,7 +98,7 @@ actor_heartbeat_interval() -> %% mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) -> - ClientId = maps:get(client_id, LinkConf, cluster()), + ClientId = maps:get(clientid, LinkConf, cluster()), #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS), Opts = #{ host => Host, @@ -115,13 +130,13 @@ remove_handler() -> pre_config_update(?LINKS_PATH, RawConf, RawConf) -> {ok, RawConf}; -pre_config_update(?LINKS_PATH, NewRawConf, _RawConf) -> - {ok, convert_certs(NewRawConf)}. +pre_config_update(?LINKS_PATH, NewRawConf, OldRawConf) -> + {ok, convert_certs(maybe_increment_ps_actor_incr(NewRawConf, OldRawConf))}. post_config_update(?LINKS_PATH, _Req, Old, Old, _AppEnvs) -> ok; post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) -> - ok = maybe_toggle_hook_and_provider(New), + ok = toggle_hook_and_broker(enabled_links(New), enabled_links(Old)), #{ removed := Removed, added := Added, @@ -142,22 +157,17 @@ post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) -> %% Internal functions %%-------------------------------------------------------------------- -maybe_toggle_hook_and_provider(LinksConf) -> - case is_any_enabled(LinksConf) of - true -> - ok = emqx_cluster_link:register_external_broker(), - ok = emqx_cluster_link:put_hook(); - false -> - _ = emqx_cluster_link:delete_hook(), - _ = emqx_cluster_link:unregister_external_broker(), - ok - end. +toggle_hook_and_broker([_ | _] = _NewEnabledLinks, [] = _OldEnabledLinks) -> + ok = emqx_cluster_link:register_external_broker(), + ok = emqx_cluster_link:put_hook(); +toggle_hook_and_broker([] = _NewEnabledLinks, _OldLinks) -> + ok = emqx_cluster_link:unregister_external_broker(), + ok = emqx_cluster_link:delete_hook(); +toggle_hook_and_broker(_, _) -> + ok. -is_any_enabled(LinksConf) -> - lists:any( - fun(#{enable := IsEnabled}) -> IsEnabled =:= true end, - LinksConf - ). +enabled_links(LinksConf) -> + [L || #{enable := true} = L <- LinksConf]. all_ok(Results) -> lists:all( @@ -172,42 +182,86 @@ all_ok(Results) -> add_links(LinksConf) -> [add_link(Link) || Link <- LinksConf]. -add_link(#{enabled := true} = LinkConf) -> - %% NOTE: this can be started later during init_link phase, but it looks not harmful to start it beforehand... - MsgFwdRes = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), - %% TODO - ActorRes = ok, - combine_results(ActorRes, MsgFwdRes); +add_link(#{enable := true} = LinkConf) -> + {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf), + {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + ok; add_link(_DisabledLinkConf) -> ok. remove_links(LinksConf) -> - [remove_link(Link) || Link <- LinksConf]. + [remove_link(Name) || #{upstream := Name} <- LinksConf]. -remove_link(_LinkConf) -> - %% TODO - ok. +remove_link(Name) -> + _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), + ensure_actor_stopped(Name). update_links(LinksConf) -> [update_link(Link) || Link <- LinksConf]. -update_link(#{enabled := true} = LinkConf) -> - _ = remove_link(LinkConf), - add_link(LinkConf); -update_link(#{enabled := false} = LinkConf) -> - case remove_link(LinkConf) of - {error, not_found} -> ok; - Other -> Other - end. - -combine_results(ok, ok) -> +update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) -> + _ = ensure_actor_stopped(Name), + {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf), + %% TODO: if only msg_fwd resource related config is changed, + %% we can skip actor reincarnation/restart. + ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf), ok; -combine_results(CoordRes, MsgFwdRes) -> - {error, #{coordinator => CoordRes, msg_fwd_resource => MsgFwdRes}}. +update_link({_OldLinkConf, #{enable := false, upstream := Name} = _NewLinkConf}) -> + _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), + ensure_actor_stopped(Name). + +update_msg_fwd_resource(#{pool_size := Old}, #{pool_size := Old} = NewConf) -> + {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf), + ok; +update_msg_fwd_resource(_, #{upstream := Name} = NewConf) -> + _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), + {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf), + ok. + +ensure_actor_stopped(ClusterName) -> + emqx_cluster_link_sup:ensure_actor_stopped(ClusterName). upstream_name(#{upstream := N}) -> N; upstream_name(#{<<"upstream">> := N}) -> N. +maybe_increment_ps_actor_incr(New, Old) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + %% TODO: what if a link was removed and then added again? + %% Assume that incarnation was 0 when the link was removed + %% and now it's also 0 (a default value for new actor). + %% If persistent routing state changed during this link absence + %% and remote GC has not started before ps actor restart (with the same incarnation), + %% then some old (stale) external ps routes may be never cleaned on the remote side. + %% No (permanent) message loss is expected, as new actor incrantaion will re-bootstrap. + %% Similarly, irrelevant messages will be filtered out at receiving end, so + %% the main risk is having some stale routes unreachable for GC... + #{changed := Changed} = emqx_utils:diff_lists(New, Old, fun upstream_name/1), + ChangedNames = [upstream_name(C) || {_, C} <- Changed], + lists:foldr( + fun(LConf, Acc) -> + case lists:member(upstream_name(LConf), ChangedNames) of + true -> [increment_ps_actor_incr(LConf) | Acc]; + false -> [LConf | Acc] + end + end, + [], + New + ); + false -> + New + end. + +increment_ps_actor_incr(#{ps_actor_incarnation := Incr} = Conf) -> + Conf#{ps_actor_incarnation => Incr + 1}; +increment_ps_actor_incr(#{<<"ps_actor_incarnation">> := Incr} = Conf) -> + Conf#{<<"ps_actor_incarnation">> => Incr + 1}; +%% Default value set in schema is 0, so need to set it to 1 during the first update. +increment_ps_actor_incr(#{<<"upstream">> := _} = Conf) -> + Conf#{<<"ps_actor_incarnation">> => 1}; +increment_ps_actor_incr(#{upstream := _} = Conf) -> + Conf#{ps_actor_incarnation => 1}. + convert_certs(LinksConf) -> lists:map( fun diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index 1a69733ee..a97aa7ece 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -22,7 +22,8 @@ actor_apply_operation/2, actor_apply_operation/3, actor_gc/1, - is_present_incarnation/1 + is_present_incarnation/1, + list_actors/1 ]). %% Internal API @@ -167,6 +168,14 @@ is_present_incarnation(#state{extra = #{is_present_incarnation := IsNew}}) -> is_present_incarnation(_State) -> false. +-spec list_actors(cluster()) -> [#{actor := actor(), incarnation := incarnation()}]. +list_actors(Cluster) -> + Matches = ets:match( + emqx_external_router_actor, + #actor{id = {Cluster, '$1'}, incarnation = '$2', _ = '_'} + ), + [#{actor => Actor, incarnation => Incr} || [Actor, Incr] <- Matches]. + mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> %% NOTE %% We perform this heavy-weight transaction only in the case of a new route diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 62e021289..e4b398397 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -28,7 +28,7 @@ -export([ ensure_msg_fwd_resource/1, - stop_msg_fwd_resource/1, + remove_msg_fwd_resource/1, decode_route_op/1, decode_forwarded_msg/1, decode_resp/1 @@ -80,6 +80,15 @@ -define(PUB_TIMEOUT, 10_000). +-spec ensure_msg_fwd_resource(binary() | map()) -> + {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}. +ensure_msg_fwd_resource(ClusterName) when is_binary(ClusterName) -> + case emqx_cluster_link_config:link(ClusterName) of + #{} = Conf -> + ensure_msg_fwd_resource(Conf); + undefined -> + {error, link_config_not_found} + end; ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) -> ResConf = #{ query_mode => async, @@ -91,8 +100,9 @@ ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf }, emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResConf). -stop_msg_fwd_resource(ClusterName) -> - emqx_resource:stop(?MSG_RES_ID(ClusterName)). +-spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}. +remove_msg_fwd_resource(ClusterName) -> + emqx_resource:remove_local(?MSG_RES_ID(ClusterName)). %%-------------------------------------------------------------------- %% emqx_resource callbacks (message forwarding) @@ -247,9 +257,9 @@ combine_status(Statuses) -> %%-------------------------------------------------------------------- connect(Options) -> - WorkerId = proplists:get_value(ecpool_worker_id, Options), + WorkerIdBin = integer_to_binary(proplists:get_value(ecpool_worker_id, Options)), #{clientid := ClientId} = ClientOpts = proplists:get_value(client_opts, Options), - ClientId1 = emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId), + ClientId1 = <>, ClientOpts1 = ClientOpts#{clientid => ClientId1}, case emqtt:start_link(ClientOpts1) of {ok, Pid} -> @@ -369,11 +379,7 @@ decode_route_op1(#{ }) -> {heartbeat, #{actor => Actor, incarnation => Incr}}; decode_route_op1(Payload) -> - ?SLOG(warning, #{ - msg => "unexpected_cluster_link_route_op_payload", - payload => Payload - }), - {error, Payload}. + {error, {unknown_payload, Payload}}. decode_resp1(#{ ?F_OPERATION := ?OP_ACTOR_INIT_ACK, diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl index 105b8d94c..1670c2ab4 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl @@ -10,7 +10,7 @@ -include("emqx_cluster_link.hrl"). -export([ - init/2, + init/3, next_batch/1 ]). @@ -27,8 +27,7 @@ %% -init(TargetCluster, Options) -> - LinkFilters = emqx_cluster_link_config:topic_filters(TargetCluster), +init(TargetCluster, LinkFilters, Options) -> {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters), IsPersistentRoute = maps:get(is_persistent_route, Options, false), #bootstrap{ diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 5fc267bd9..bccb3e349 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -47,6 +47,7 @@ -define(SYNCER_NAME(Cluster), ?NAME(Cluster, syncer)). -define(SYNCER_REF(Cluster), {via, gproc, ?SYNCER_NAME(Cluster)}). -define(ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, actor)}). +-define(ACTOR_NAME(Cluster), ?NAME(Cluster, actor)). -define(MAX_BATCH_SIZE, 4000). -define(MIN_SYNC_INTERVAL, 10). @@ -63,8 +64,8 @@ %% but it must be tolerable, since persistent route destination is a client ID, %% which is unique cluster-wide. -define(PS_ACTOR, <<"ps-routes-v1">>). --define(PS_INCARNATION, 0). -define(PS_ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, ps_actor)}). +-define(PS_ACTOR_NAME(Cluster), ?NAME(Cluster, ps_actor)). -define(PS_CLIENT_NAME(Cluster), ?NAME(Cluster, ps_client)). -define(PS_SYNCER_REF(Cluster), {via, gproc, ?PS_SYNCER_NAME(Cluster)}). -define(PS_SYNCER_NAME(Cluster), ?NAME(Cluster, ps_syncer)). @@ -102,43 +103,30 @@ do_push(SyncerName, OpName, Topic, ID) -> %% 1. Actor + MQTT Client %% 2. Syncer -start_link(TargetCluster) -> - supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, TargetCluster}). +start_link(#{upstream := TargetCluster} = LinkConf) -> + supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, LinkConf}). %% Actor -start_link_actor(ActorRef, Actor, Incarnation, TargetCluster) -> +new_incarnation() -> + %% TODO: Subject to clock skew, need something more robust. + erlang:system_time(millisecond). + +start_link_actor(ActorRef, Actor, Incarnation, LinkConf) -> gen_server:start_link( ActorRef, ?MODULE, - {actor, mk_state(TargetCluster, Actor, Incarnation)}, + {actor, mk_state(LinkConf, Actor, Incarnation)}, [] ). get_actor_id() -> atom_to_binary(node()). -get_actor_incarnation() -> - persistent_term:get({?MODULE, incarnation}). - -set_actor_incarnation(Incarnation) -> - ok = persistent_term:put({?MODULE, incarnation}, Incarnation), - Incarnation. - -ensure_actor_incarnation() -> - try - get_actor_incarnation() - catch - error:badarg -> - %% TODO: Subject to clock skew, need something more robust. - Incarnation = erlang:system_time(millisecond), - set_actor_incarnation(Incarnation) - end. - %% MQTT Client -start_link_client(TargetCluster, Actor) -> - Options = emqx_cluster_link_config:emqtt_options(TargetCluster), +start_link_client(Actor, LinkConf) -> + Options = emqx_cluster_link_config:mk_emqtt_options(LinkConf), case emqtt:start_link(refine_client_options(Options, Actor)) of {ok, Pid} -> case emqtt:connect(Pid) of @@ -245,7 +233,7 @@ batch_get_opname(Op) -> %% -init({sup, TargetCluster}) -> +init({sup, LinkConf}) -> %% FIXME: Intensity. SupFlags = #{ %% TODO: one_for_one? @@ -254,24 +242,24 @@ init({sup, TargetCluster}) -> period => 60 }, Children = lists:append([ - [child_spec(actor, TargetCluster)], - [child_spec(ps_actor, TargetCluster) || emqx_persistent_message:is_persistence_enabled()] + [child_spec(actor, LinkConf)], + [child_spec(ps_actor, LinkConf) || emqx_persistent_message:is_persistence_enabled()] ]), {ok, {SupFlags, Children}}; init({actor, State}) -> init_actor(State). -child_spec(actor, TargetCluster) -> +child_spec(actor, #{upstream := TargetCluster} = LinkConf) -> %% Actor process. %% Wraps MQTT Client process. %% ClientID: `mycluster:emqx1@emqx.local:routesync` %% Occasional TCP/MQTT-level disconnects are expected, and should be handled %% gracefully. Actor = get_actor_id(), - Incarnation = ensure_actor_incarnation(), - actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, TargetCluster); -child_spec(ps_actor, TargetCluster) -> - actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, ?PS_INCARNATION, TargetCluster). + Incarnation = new_incarnation(), + actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, LinkConf); +child_spec(ps_actor, #{upstream := TargetCluster, ps_actor_incarnation := Incr} = LinkConf) -> + actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, Incr, LinkConf). child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) -> SyncerRef = ?PS_SYNCER_REF(TargetCluster), @@ -286,10 +274,10 @@ child_spec(syncer, Actor, Incarnation, TargetCluster) -> ClientName = ?CLIENT_NAME(TargetCluster), syncer_spec(syncer, Actor, Incarnation, SyncerRef, ClientName). -actor_spec(ChildID, ActorRef, Actor, Incarnation, TargetCluster) -> +actor_spec(ChildID, ActorRef, Actor, Incarnation, LinkConf) -> #{ id => ChildID, - start => {?MODULE, start_link_actor, [ActorRef, Actor, Incarnation, TargetCluster]}, + start => {?MODULE, start_link_actor, [ActorRef, Actor, Incarnation, LinkConf]}, restart => permanent, type => worker }. @@ -308,7 +296,7 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> target :: binary(), actor :: binary(), incarnation :: non_neg_integer(), - client :: {pid(), reference()}, + client :: {pid(), reference()} | undefined, bootstrapped :: boolean(), reconnect_timer :: reference(), heartbeat_timer :: reference(), @@ -316,30 +304,31 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> actor_init_timer :: reference(), remote_actor_info :: undefined | map(), status :: connecting | connected | disconnected, - error :: undefined | term() + error :: undefined | term(), + link_conf :: map() }). -mk_state(TargetCluster, Actor, Incarnation) -> +mk_state(#{upstream := TargetCluster} = LinkConf, Actor, Incarnation) -> #st{ target = TargetCluster, actor = Actor, incarnation = Incarnation, bootstrapped = false, - status = connecting + status = connecting, + link_conf = LinkConf }. init_actor(State = #st{}) -> _ = erlang:process_flag(trap_exit, true), {ok, State, {continue, connect}}. -handle_continue(connect, State) -> - {noreply, process_connect(State)}. +handle_continue(connect, St) -> + {noreply, process_connect(St)}. +handle_call(_Request, _From, St) -> + {reply, ignored, St}. -handle_call(_Request, _From, State) -> - {reply, ignored, State}. - -handle_cast(_Request, State) -> - {noreply, State}. +handle_cast(_Request, St) -> + {noreply, St}. handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) -> {noreply, handle_client_down(Reason, St)}; @@ -396,8 +385,8 @@ handle_info(Info, St) -> terminate(_Reason, _State) -> ok. -process_connect(St = #st{target = TargetCluster, actor = Actor}) -> - case start_link_client(TargetCluster, Actor) of +process_connect(St = #st{target = TargetCluster, actor = Actor, link_conf = Conf}) -> + case start_link_client(Actor, Conf) of {ok, ClientPid} -> _ = maybe_deactivate_alarm(St), ok = announce_client(Actor, TargetCluster, ClientPid), @@ -498,17 +487,17 @@ cancel_heartbeat(St = #st{heartbeat_timer = TRef}) -> %% is re-established with a clean session. Once bootstrapping is done, it %% opens the syncer. -run_bootstrap(St = #st{target = TargetCluster, actor = ?PS_ACTOR}) -> +run_bootstrap(St = #st{target = TargetCluster, actor = ?PS_ACTOR, link_conf = #{topics := Topics}}) -> case mria_config:whoami() of Role when Role /= replicant -> Opts = #{is_persistent_route => true}, - Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Opts), + Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Topics, Opts), run_bootstrap(Bootstrap, St); _ -> process_bootstrapped(St) end; -run_bootstrap(St = #st{target = TargetCluster}) -> - Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, #{}), +run_bootstrap(St = #st{target = TargetCluster, link_conf = #{topics := Topics}}) -> + Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Topics, #{}), run_bootstrap(Bootstrap, St). run_bootstrap(Bootstrap, St) -> @@ -527,7 +516,9 @@ run_bootstrap(Bootstrap, St) -> end end. -process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) -> +process_bootstrapped( + St = #st{target = TargetCluster, actor = Actor} +) -> ok = open_syncer(TargetCluster, Actor), St#st{bootstrapped = true}. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl index 22c9e31ec..03c8902df 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -11,6 +11,9 @@ -export([injected_fields/0]). +%% Used in emqx_cluster_link_api +-export([links_schema/1]). + -export([ roots/0, fields/1, @@ -27,14 +30,14 @@ roots() -> []. injected_fields() -> #{cluster => fields("cluster_linking")}. +links_schema(Meta) -> + ?HOCON(?ARRAY(?R_REF("link")), Meta#{default => [], validator => fun links_validator/1}). + fields("cluster_linking") -> - [ - {links, - ?HOCON(?ARRAY(?R_REF("link")), #{default => [], validator => fun links_validator/1})} - ]; + [{links, links_schema(#{})}]; fields("link") -> [ - {enable, ?HOCON(boolean(), #{default => false})}, + {enable, ?HOCON(boolean(), #{default => true})}, {upstream, ?HOCON(binary(), #{required => true})}, {server, emqx_schema:servers_sc(#{required => true, desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, @@ -46,13 +49,13 @@ fields("link") -> default => #{<<"enable">> => false}, desc => ?DESC("ssl") }}, - %% TODO: validate topics: - %% - basic topic validation - %% - non-overlapping (not intersecting) filters? - %% (this may be not required, depends on config update implementation) {topics, ?HOCON(?ARRAY(binary()), #{required => true, validator => fun topics_validator/1})}, - {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})} + {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})}, + %% Must not be configured manually. The value is incremented by cluster link config handler + %% and is used as a globally synchronized sequence to ensure persistent routes actors have + %% the same next incarnation after each config change. + {ps_actor_incarnation, ?HOCON(integer(), #{default => 0, importance => ?IMPORTANCE_HIDDEN})} ]. desc(_) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl index 872054fa0..0991583e2 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -8,9 +8,15 @@ -export([start_link/1]). +-export([ + ensure_actor/1, + ensure_actor_stopped/1 +]). + -export([init/1]). -define(SERVER, ?MODULE). +-define(ACTOR_MODULE, emqx_cluster_link_router_syncer). start_link(LinksConf) -> supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf). @@ -23,8 +29,8 @@ init(LinksConf) -> }, ExtrouterGC = extrouter_gc_spec(), RouteActors = [ - sup_spec(Name, emqx_cluster_link_router_syncer, [Name]) - || #{upstream := Name} <- LinksConf + sup_spec(Name, ?ACTOR_MODULE, [LinkConf]) + || #{upstream := Name} = LinkConf <- LinksConf ], {ok, {SupFlags, [ExtrouterGC | RouteActors]}}. @@ -46,3 +52,22 @@ sup_spec(Id, Mod, Args) -> type => supervisor, modules => [Mod] }. + +ensure_actor(#{upstream := Name} = LinkConf) -> + case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid}; + Err -> + Err + end. + +ensure_actor_stopped(ClusterName) -> + case supervisor:terminate_child(?MODULE, ClusterName) of + ok -> + _ = supervisor:delete_child(?MODULE, ClusterName), + ok; + {error, not_found} -> + ok + end.