fix(clusterlink): add schema descriptions, fix dialyzer warnings, add resource_opts

This commit is contained in:
Serge Tupchii 2024-06-07 14:00:43 +03:00
parent 94e81ba812
commit ff16521d4f
13 changed files with 203 additions and 91 deletions

View File

@ -16,7 +16,7 @@
-module(emqx_external_broker). -module(emqx_external_broker).
-callback forward(emqx_router:external_dest(), emqx_types:delivery()) -> -callback forward(dest(), emqx_types:delivery()) ->
emqx_types:deliver_result(). emqx_types:deliver_result().
-callback should_route_to_external_dests(emqx_types:message()) -> boolean(). -callback should_route_to_external_dests(emqx_types:message()) -> boolean().
@ -64,8 +64,8 @@
Provider:IfRegistered Provider:IfRegistered
catch catch
Err:Reason:St -> Err:Reason:St ->
?SLOG(error, #{ ?SLOG_THROTTLE(error, #{
msg => "external_broker_crashed", msg => external_broker_crashed,
provider => Provider, provider => Provider,
callback => ?FUNCTION_NAME, callback => ?FUNCTION_NAME,
stacktrace => St, stacktrace => St,

View File

@ -29,8 +29,8 @@
-export([push/5]). -export([push/5]).
-export([wait/1]). -export([wait/1]).
-export([close/1]). -export([suspend/1]).
-export([open/1]). -export([activate/1]).
-export([stats/0]). -export([stats/0]).
@ -49,7 +49,7 @@
min_sync_interval => non_neg_integer(), min_sync_interval => non_neg_integer(),
error_delay => non_neg_integer(), error_delay => non_neg_integer(),
error_retry_interval => non_neg_integer(), error_retry_interval => non_neg_integer(),
initial_state => open | closed, initial_state => activated | suspended,
batch_handler => {module(), _Function :: atom(), _Args :: list()} batch_handler => {module(), _Function :: atom(), _Args :: list()}
}. }.
@ -166,11 +166,13 @@ mk_push_context(_) ->
%% %%
close(Ref) -> %% Suspended syncer receives and accumulates route ops but doesn't apply them
gen_server:call(Ref, close, infinity). %% until it is activated.
suspend(Ref) ->
gen_server:call(Ref, suspend, infinity).
open(Ref) -> activate(Ref) ->
gen_server:call(Ref, open, infinity). gen_server:call(Ref, activate, infinity).
%% %%
@ -191,7 +193,7 @@ stats() ->
mk_state(Options) -> mk_state(Options) ->
#{ #{
state => maps:get(initial_state, Options, open), state => maps:get(initial_state, Options, active),
stash => stash_new(), stash => stash_new(),
retry_timer => undefined, retry_timer => undefined,
max_batch_size => maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE), max_batch_size => maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE),
@ -209,13 +211,13 @@ init({Pool, Id, State}) ->
init(State) -> init(State) ->
{ok, State}. {ok, State}.
handle_call(close, _From, State) -> handle_call(suspend, _From, State) ->
NState = State#{state := closed}, NState = State#{state := suspended},
{reply, ok, NState}; {reply, ok, NState};
handle_call(open, _From, State = #{state := closed}) -> handle_call(activate, _From, State = #{state := suspended}) ->
NState = run_batch_loop([], State#{state := open}), NState = run_batch_loop([], State#{state := active}),
{reply, ok, NState}; {reply, ok, NState};
handle_call(open, _From, State) -> handle_call(activate, _From, State) ->
{reply, ok, State}; {reply, ok, State};
handle_call(stats, _From, State = #{stash := Stash}) -> handle_call(stats, _From, State = #{stash := Stash}) ->
{reply, stash_stats(Stash), State}; {reply, stash_stats(Stash), State};
@ -239,7 +241,7 @@ terminate(_Reason, _State) ->
%% %%
run_batch_loop(Incoming, State = #{stash := Stash0, state := closed}) -> run_batch_loop(Incoming, State = #{stash := Stash0, state := suspended}) ->
Stash1 = stash_add(Incoming, Stash0), Stash1 = stash_add(Incoming, Stash0),
Stash2 = stash_drain(Stash1), Stash2 = stash_drain(Stash1),
State#{stash := Stash2}; State#{stash := Stash2};

View File

@ -210,7 +210,7 @@ update_routes(ClusterName, Actor, RouteOps) ->
ActorSt = get_actor_state(ClusterName, Actor), ActorSt = get_actor_state(ClusterName, Actor),
lists:foreach( lists:foreach(
fun(RouteOp) -> fun(RouteOp) ->
emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorSt) _ = emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorSt)
end, end,
RouteOps RouteOps
). ).

View File

@ -14,15 +14,14 @@ start(_StartType, _StartArgs) ->
ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()), ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()),
emqx_cluster_link_config:add_handler(), emqx_cluster_link_config:add_handler(),
LinksConf = emqx_cluster_link_config:enabled_links(), LinksConf = emqx_cluster_link_config:enabled_links(),
_ = case LinksConf of
case LinksConf of [_ | _] ->
[_ | _] -> ok = emqx_cluster_link:register_external_broker(),
ok = emqx_cluster_link:register_external_broker(), ok = emqx_cluster_link:put_hook(),
ok = emqx_cluster_link:put_hook(), ok = start_msg_fwd_resources(LinksConf);
ok = start_msg_fwd_resources(LinksConf); _ ->
_ -> ok
ok end,
end,
emqx_cluster_link_sup:start_link(LinksConf). emqx_cluster_link_sup:start_link(LinksConf).
prep_stop(State) -> prep_stop(State) ->

View File

@ -100,7 +100,8 @@ actor_heartbeat_interval() ->
mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) -> mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->
ClientId = maps:get(clientid, LinkConf, cluster()), ClientId = maps:get(clientid, LinkConf, cluster()),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS), #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
Opts = #{ Opts = maps:with([username, retry_interval, max_inflight], LinkConf),
Opts1 = Opts#{
host => Host, host => Host,
port => Port, port => Port,
clientid => ClientId, clientid => ClientId,
@ -108,12 +109,7 @@ mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = Link
ssl => EnableSsl, ssl => EnableSsl,
ssl_opts => maps:to_list(maps:remove(enable, Ssl)) ssl_opts => maps:to_list(maps:remove(enable, Ssl))
}, },
with_password(with_user(Opts, LinkConf), LinkConf). with_password(Opts1, LinkConf).
with_user(Opts, #{username := U} = _LinkConf) ->
Opts#{username => U};
with_user(Opts, _LinkConf) ->
Opts.
with_password(Opts, #{password := P} = _LinkConf) -> with_password(Opts, #{password := P} = _LinkConf) ->
Opts#{password => emqx_secret:unwrap(P)}; Opts#{password => emqx_secret:unwrap(P)};

View File

@ -119,23 +119,34 @@ create_tables() ->
%% %%
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
match_routes(Topic) -> match_routes(Topic) ->
Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]), Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]),
%% `unique` opt is not enough, since we keep the original Topic as a part of RouteID %% `unique` opt is not enough, since we keep the original Topic as a part of RouteID
lists:ukeysort(#route.dest, [match_to_route(M) || M <- Matches]). lists:ukeysort(#route.dest, [match_to_route(M) || M <- Matches]).
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
lookup_routes(Topic) -> lookup_routes(Topic) ->
Pat = #extroute{entry = emqx_topic_index:make_key(Topic, '$1'), _ = '_'}, Pat = make_extroute_rec_pat(emqx_topic_index:make_key(Topic, '$1')),
[match_to_route(R#extroute.entry) || Records <- ets:match(?EXTROUTE_TAB, Pat), R <- Records]. [match_to_route(R#extroute.entry) || Records <- ets:match(?EXTROUTE_TAB, Pat), R <- Records].
-spec topics() -> [emqx_types:topic()].
topics() -> topics() ->
Pat = #extroute{entry = '$1', _ = '_'}, Pat = make_extroute_rec_pat('$1'),
[emqx_topic_index:get_topic(K) || [K] <- ets:match(?EXTROUTE_TAB, Pat)]. [emqx_topic_index:get_topic(K) || [K] <- ets:match(?EXTROUTE_TAB, Pat)].
match_to_route(M) -> match_to_route(M) ->
?ROUTE_ID(Cluster, _) = emqx_topic_index:get_id(M), ?ROUTE_ID(Cluster, _) = emqx_topic_index:get_id(M),
#route{topic = emqx_topic_index:get_topic(M), dest = Cluster}. #route{topic = emqx_topic_index:get_topic(M), dest = Cluster}.
%% Make Dialyzer happy
make_extroute_rec_pat(Entry) ->
erlang:make_tuple(
record_info(size, extroute),
'_',
[{1, extroute}, {#extroute.entry, Entry}]
).
%% %%
-record(state, { -record(state, {
@ -143,12 +154,12 @@ match_to_route(M) ->
actor :: actor(), actor :: actor(),
incarnation :: incarnation(), incarnation :: incarnation(),
lane :: lane() | undefined, lane :: lane() | undefined,
extra :: map() extra = #{} :: map()
}). }).
-type state() :: #state{}. -type state() :: #state{}.
-type env() :: #{timestamp := _Milliseconds}. -type env() :: #{timestamp => _Milliseconds}.
-spec actor_init(cluster(), actor(), incarnation(), env()) -> {ok, state()}. -spec actor_init(cluster(), actor(), incarnation(), env()) -> {ok, state()}.
actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) -> actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) ->
@ -170,10 +181,8 @@ is_present_incarnation(_State) ->
-spec list_actors(cluster()) -> [#{actor := actor(), incarnation := incarnation()}]. -spec list_actors(cluster()) -> [#{actor := actor(), incarnation := incarnation()}].
list_actors(Cluster) -> list_actors(Cluster) ->
Matches = ets:match( Pat = make_actor_rec_pat([{#actor.id, {Cluster, '$1'}}, {#actor.incarnation, '$2'}]),
emqx_external_router_actor, Matches = ets:match(emqx_external_router_actor, Pat),
#actor{id = {Cluster, '$1'}, incarnation = '$2', _ = '_'}
),
[#{actor => Actor, incarnation => Incr} || [Actor, Incr] <- Matches]. [#{actor => Actor, incarnation => Incr} || [Actor, Incr] <- Matches].
mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
@ -291,7 +300,8 @@ apply_operation(Entry, MCounter, OpName, Lane) ->
-spec actor_gc(env()) -> _NumCleaned :: non_neg_integer(). -spec actor_gc(env()) -> _NumCleaned :: non_neg_integer().
actor_gc(#{timestamp := Now}) -> actor_gc(#{timestamp := Now}) ->
MS = [{#actor{until = '$1', _ = '_'}, [{'<', '$1', Now}], ['$_']}], Pat = make_actor_rec_pat([{#actor.until, '$1'}]),
MS = [{Pat, [{'<', '$1', Now}], ['$_']}],
Dead = mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS), Dead = mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS),
try_clean_incarnation(Dead). try_clean_incarnation(Dead).
@ -316,9 +326,18 @@ mnesia_assign_lane(Cluster) ->
Lane. Lane.
select_cluster_lanes(Cluster) -> select_cluster_lanes(Cluster) ->
MS = [{#actor{id = {Cluster, '_'}, lane = '$1', _ = '_'}, [], ['$1']}], Pat = make_actor_rec_pat([{#actor.id, {Cluster, '_'}}, {#actor.lane, '$1'}]),
MS = [{Pat, [], ['$1']}],
mnesia:select(?EXTROUTE_ACTOR_TAB, MS, write). mnesia:select(?EXTROUTE_ACTOR_TAB, MS, write).
%% Make Dialyzer happy
make_actor_rec_pat(PosValues) ->
erlang:make_tuple(
record_info(size, actor),
'_',
[{1, actor} | PosValues]
).
mnesia_actor_heartbeat(ActorID, Incarnation, TS) -> mnesia_actor_heartbeat(ActorID, Incarnation, TS) ->
case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of
[#actor{incarnation = Incarnation} = Rec] -> [#actor{incarnation = Incarnation} = Rec] ->

View File

@ -37,7 +37,7 @@ run() ->
%% %%
-record(st, { -record(st, {
gc_timer :: reference() gc_timer :: undefined | reference()
}). }).
init(_) -> init(_) ->

View File

@ -89,16 +89,12 @@ ensure_msg_fwd_resource(ClusterName) when is_binary(ClusterName) ->
undefined -> undefined ->
{error, link_config_not_found} {error, link_config_not_found}
end; end;
ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) -> ensure_msg_fwd_resource(#{upstream := Name, resource_opts := ResOpts} = ClusterConf) ->
ResConf = #{ ResOpts1 = ResOpts#{
query_mode => async, query_mode => async,
start_after_created => true, start_after_created => true
start_timeout => 5000,
health_check_interval => 5000,
%% TODO: configure res_buf_worker pool separately?
worker_pool_size => PoolSize
}, },
emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResConf). emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResOpts1).
-spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}. -spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}.
remove_msg_fwd_resource(ClusterName) -> remove_msg_fwd_resource(ClusterName) ->
@ -344,7 +340,7 @@ publish_heartbeat(ClientPid, Actor, Incarnation) ->
?F_ACTOR => Actor, ?F_ACTOR => Actor,
?F_INCARNATION => Incarnation ?F_INCARNATION => Incarnation
}, },
emqtt:publish_async(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_0, undefined). emqtt:publish_async(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_0, {fun(_) -> ok end, []}).
decode_route_op(Payload) -> decode_route_op(Payload) ->
decode_route_op1(?DECODE(Payload)). decode_route_op1(?DECODE(Payload)).

View File

@ -196,20 +196,20 @@ start_link_syncer(Actor, Incarnation, SyncerRef, ClientName) ->
max_batch_size => ?MAX_BATCH_SIZE, max_batch_size => ?MAX_BATCH_SIZE,
min_sync_interval => ?MIN_SYNC_INTERVAL, min_sync_interval => ?MIN_SYNC_INTERVAL,
error_delay => ?ERROR_DELAY, error_delay => ?ERROR_DELAY,
initial_state => closed, initial_state => suspended,
batch_handler => {?MODULE, process_syncer_batch, [ClientName, Actor, Incarnation]} batch_handler => {?MODULE, process_syncer_batch, [ClientName, Actor, Incarnation]}
%% TODO: enable_replies => false %% TODO: enable_replies => false
}). }).
close_syncer(TargetCluster, ?PS_ACTOR) -> suspend_syncer(TargetCluster, ?PS_ACTOR) ->
emqx_router_syncer:close(?PS_SYNCER_REF(TargetCluster)); emqx_router_syncer:suspend(?PS_SYNCER_REF(TargetCluster));
close_syncer(TargetCluster, _Actor) -> suspend_syncer(TargetCluster, _Actor) ->
emqx_router_syncer:close(?SYNCER_REF(TargetCluster)). emqx_router_syncer:suspend(?SYNCER_REF(TargetCluster)).
open_syncer(TargetCluster, ?PS_ACTOR) -> activate_syncer(TargetCluster, ?PS_ACTOR) ->
emqx_router_syncer:open(?PS_SYNCER_REF(TargetCluster)); emqx_router_syncer:activate(?PS_SYNCER_REF(TargetCluster));
open_syncer(TargetCluster, _Actor) -> activate_syncer(TargetCluster, _Actor) ->
emqx_router_syncer:open(?SYNCER_REF(TargetCluster)). emqx_router_syncer:activate(?SYNCER_REF(TargetCluster)).
process_syncer_batch(Batch, ClientName, Actor, Incarnation) -> process_syncer_batch(Batch, ClientName, Actor, Incarnation) ->
Updates = maps:fold( Updates = maps:fold(
@ -296,12 +296,12 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
target :: binary(), target :: binary(),
actor :: binary(), actor :: binary(),
incarnation :: non_neg_integer(), incarnation :: non_neg_integer(),
client :: {pid(), reference()} | undefined, client :: undefined | pid(),
bootstrapped :: boolean(), bootstrapped :: boolean(),
reconnect_timer :: reference(), reconnect_timer :: undefined | reference(),
heartbeat_timer :: reference(), heartbeat_timer :: undefined | reference(),
actor_init_req_id :: binary(), actor_init_req_id :: undefined | binary(),
actor_init_timer :: reference(), actor_init_timer :: undefined | reference(),
remote_actor_info :: undefined | map(), remote_actor_info :: undefined | map(),
status :: connecting | connected | disconnected, status :: connecting | connected | disconnected,
error :: undefined | term(), error :: undefined | term(),
@ -336,7 +336,11 @@ handle_info(
{publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}}, {publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}},
St = #st{actor_init_req_id = ReqId} St = #st{actor_init_req_id = ReqId}
) -> ) ->
{actor_init_ack, #{result := Res, need_bootstrap := NeedBootstrap} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp( {actor_init_ack,
#{
result := Res,
need_bootstrap := NeedBootstrap
} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(
Payload Payload
), ),
St1 = St#st{ St1 = St#st{
@ -451,7 +455,7 @@ handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) ->
actor => St#st.actor actor => St#st.actor
}), }),
%% TODO: syncer may be already down due to one_for_all strategy %% TODO: syncer may be already down due to one_for_all strategy
ok = close_syncer(TargetCluster, Actor), ok = suspend_syncer(TargetCluster, Actor),
_ = maybe_alarm(Reason, St), _ = maybe_alarm(Reason, St),
NSt = cancel_heartbeat(St), NSt = cancel_heartbeat(St),
process_connect(NSt#st{client = undefined, error = Reason, status = connecting}). process_connect(NSt#st{client = undefined, error = Reason, status = connecting}).
@ -519,7 +523,7 @@ run_bootstrap(Bootstrap, St) ->
process_bootstrapped( process_bootstrapped(
St = #st{target = TargetCluster, actor = Actor} St = #st{target = TargetCluster, actor = Actor}
) -> ) ->
ok = open_syncer(TargetCluster, Actor), ok = activate_syncer(TargetCluster, Actor),
St#st{bootstrapped = true}. St#st{bootstrapped = true}.
process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) -> process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) ->
@ -529,7 +533,7 @@ ensure_bootstrap_heartbeat(St = #st{heartbeat_timer = TRef}) ->
case erlang:read_timer(TRef) of case erlang:read_timer(TRef) of
false -> false ->
ok = emqx_utils:cancel_timer(TRef), ok = emqx_utils:cancel_timer(TRef),
process_heartbeat(St); process_heartbeat(St#st{heartbeat_timer = undefined});
_TimeLeft -> _TimeLeft ->
St St
end. end.

View File

@ -21,45 +21,87 @@
desc/1 desc/1
]). ]).
-import(emqx_schema, [mk_duration/2]).
-define(MQTT_HOST_OPTS, #{default_port => 1883}). -define(MQTT_HOST_OPTS, #{default_port => 1883}).
namespace() -> "cluster_linking". namespace() -> "cluster".
roots() -> []. roots() -> [].
injected_fields() -> injected_fields() ->
#{cluster => fields("cluster_linking")}. #{cluster => [{links, links_schema(#{})}]}.
links_schema(Meta) -> links_schema(Meta) ->
?HOCON(?ARRAY(?R_REF("link")), Meta#{default => [], validator => fun links_validator/1}). ?HOCON(?ARRAY(?R_REF("link")), Meta#{
default => [], validator => fun links_validator/1, desc => ?DESC("links")
}).
fields("cluster_linking") ->
[{links, links_schema(#{})}];
fields("link") -> fields("link") ->
[ [
{enable, ?HOCON(boolean(), #{default => true})}, {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})},
{upstream, ?HOCON(binary(), #{required => true})}, {upstream, ?HOCON(binary(), #{required => true, desc => ?DESC(upstream)})},
{server, {server,
emqx_schema:servers_sc(#{required => true, desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, emqx_schema:servers_sc(#{required => true, desc => ?DESC(server)}, ?MQTT_HOST_OPTS)},
{clientid, ?HOCON(binary(), #{desc => ?DESC("clientid")})}, {clientid, ?HOCON(binary(), #{desc => ?DESC(clientid)})},
{username, ?HOCON(binary(), #{desc => ?DESC("username")})}, {username, ?HOCON(binary(), #{desc => ?DESC(username)})},
{password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}, {password, emqx_schema_secret:mk(#{desc => ?DESC(password)})},
{ssl, #{ {ssl, #{
type => ?R_REF(emqx_schema, "ssl_client_opts"), type => ?R_REF(emqx_schema, "ssl_client_opts"),
default => #{<<"enable">> => false}, default => #{<<"enable">> => false},
desc => ?DESC("ssl") desc => ?DESC(ssl)
}}, }},
{topics, {topics,
?HOCON(?ARRAY(binary()), #{required => true, validator => fun topics_validator/1})}, ?HOCON(?ARRAY(binary()), #{
{pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})}, desc => ?DESC(topics), required => true, validator => fun topics_validator/1
})},
{pool_size, ?HOCON(pos_integer(), #{default => 8, desc => ?DESC(pool_size)})},
{retry_interval,
mk_duration(
"MQTT Message retry interval. Delay for the link to retry sending the QoS1/QoS2 "
"messages in case of ACK not received.",
#{default => <<"15s">>}
)},
{max_inflight,
?HOCON(
non_neg_integer(),
#{
default => 32,
desc => ?DESC("max_inflight")
}
)},
{resource_opts,
?HOCON(
?R_REF(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)},
%% Must not be configured manually. The value is incremented by cluster link config handler %% Must not be configured manually. The value is incremented by cluster link config handler
%% and is used as a globally synchronized sequence to ensure persistent routes actors have %% and is used as a globally synchronized sequence to ensure persistent routes actors have
%% the same next incarnation after each config change. %% the same next incarnation after each config change.
{ps_actor_incarnation, ?HOCON(integer(), #{default => 0, importance => ?IMPORTANCE_HIDDEN})} {ps_actor_incarnation, ?HOCON(integer(), #{default => 0, importance => ?IMPORTANCE_HIDDEN})}
]. ];
fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_res_opt(Field)].
desc("links") ->
?DESC("links");
desc("link") ->
?DESC("link");
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) -> desc(_) ->
"todo". undefined.
is_hidden_res_opt(Field) ->
lists:member(
Field,
[start_after_created, query_mode, enable_batch, batch_size, batch_time]
).
%% TODO: check that no link name equals local cluster name, %% TODO: check that no link name equals local cluster name,
%% but this may be tricky since the link config is injected into cluster config (emqx_conf_schema). %% but this may be tricky since the link config is injected into cluster config (emqx_conf_schema).

View File

@ -82,7 +82,8 @@
connection_rejected_due_to_license_limit_reached, connection_rejected_due_to_license_limit_reached,
dropped_msg_due_to_mqueue_is_full, dropped_msg_due_to_mqueue_is_full,
socket_receive_paused_by_rate_limit, socket_receive_paused_by_rate_limit,
data_bridge_buffer_overflow data_bridge_buffer_overflow,
external_broker_crashed
]). ]).
%% Callback to upgrade config after loaded from config file but before validation. %% Callback to upgrade config after loaded from config file but before validation.

View File

@ -565,10 +565,10 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
{error, {unsupported_os, OS}}. {error, {unsupported_os, OS}}.
format(Term) -> format(Term) ->
iolist_to_binary(io_lib:format("~0p", [Term])). unicode:characters_to_binary(io_lib:format("~0p", [Term])).
format(Fmt, Args) -> format(Fmt, Args) ->
iolist_to_binary(io_lib:format(Fmt, Args)). unicode:characters_to_binary(io_lib:format(Fmt, Args)).
%% @doc Helper function for log formatters. %% @doc Helper function for log formatters.
-spec format_mfal(map(), map()) -> undefined | binary(). -spec format_mfal(map(), map()) -> undefined | binary().

View File

@ -0,0 +1,53 @@
emqx_cluster_link_schema {
links.desc:
"""The list of the linked EMQX clusters."""
links.label: "Cluster Links"
link.desc:
"""Cluster link configuration"""
link.label: "Cluster Link"
enable.desc:
"""Enable or disable a cluster link. The link is enabled by default, disabling it allows stopping the link without removing its configuration. The link must be enabled on both sides to be operational. Disabling the link should also be done on both clusters in order to free up all associated resources."""
enable.label: "Enable"
upstream.desc:
"""Upstream cluster name. Must be exactly equal to the value of `cluster.name` configured at the remote cluster. Must not be equal to the local cluster.name. All configured cluster link upstream names must be unique."""
upstream.label: "Upstream Name"
server.desc:
"""MQTT host and port of the remote EMQX broker."""
server.label: "MQTT Server"
username.desc:
"""Optional MQTT username for connecting to the remote EMQX cluster."""
username.label: "Username"
password.desc:
"""Optional MQTT username for connecting to the remote EMQX cluster."""
password.label: "Password"
clientid.desc:
"""Optional Base MQTT client ID for connecting to the remote EMQX cluster. If omitted, local `cluster.name` is used. EMQX maintains several connections between linked clusters, so distinct suffixes are automatically appended to the base client ID."""
clientid.label: "Base Client ID"
ssl.desc: """SSL configuration for connecting to the remote EMQX cluster."""
ssl.label: "SSL Options"
topics.desc: """MQTT topics to be forwarded by the linked remote EMQX broker to the local broker. Messages are only forwarded if the local EMQX broker has matching subscriber(s).
Wildcards are supported. Setting empty topics list on one side of the link can be used to establish unidirectional links: the side with the empty topics won't receive remote messages, but it can forward relevant messages to its linked counterpart (according to the topics configured on that side of the link)."""
topics.label: "Topics"
pool_size.desc:
"""Size of the pool of MQTT clients that will publish messages to the linked EMQX broker."""
pool_size.label:
"""Connection Pool Size"""
max_inflight.desc:
"""Max inflight (sent, but un-acked) messages of the MQTT protocol"""
max_inflight.label:
"""Max Inflight Message"""
}