From ff16521d4fd3ca355199c1c79c405835c34921bb Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 7 Jun 2024 14:00:43 +0300 Subject: [PATCH] fix(clusterlink): add schema descriptions, fix dialyzer warnings, add resource_opts --- apps/emqx/src/emqx_external_broker.erl | 6 +- apps/emqx/src/emqx_router_syncer.erl | 30 ++++---- .../src/emqx_cluster_link.erl | 2 +- .../src/emqx_cluster_link_app.erl | 17 ++--- .../src/emqx_cluster_link_config.erl | 10 +-- .../src/emqx_cluster_link_extrouter.erl | 39 +++++++--- .../src/emqx_cluster_link_extrouter_gc.erl | 2 +- .../src/emqx_cluster_link_mqtt.erl | 14 ++-- .../src/emqx_cluster_link_router_syncer.erl | 40 +++++----- .../src/emqx_cluster_link_schema.erl | 74 +++++++++++++++---- apps/emqx_conf/src/emqx_conf_schema.erl | 3 +- apps/emqx_utils/src/emqx_utils.erl | 4 +- rel/i18n/emqx_cluster_link_schema.hocon | 53 +++++++++++++ 13 files changed, 203 insertions(+), 91 deletions(-) create mode 100644 rel/i18n/emqx_cluster_link_schema.hocon diff --git a/apps/emqx/src/emqx_external_broker.erl b/apps/emqx/src/emqx_external_broker.erl index bf6448490..ebcd48994 100644 --- a/apps/emqx/src/emqx_external_broker.erl +++ b/apps/emqx/src/emqx_external_broker.erl @@ -16,7 +16,7 @@ -module(emqx_external_broker). --callback forward(emqx_router:external_dest(), emqx_types:delivery()) -> +-callback forward(dest(), emqx_types:delivery()) -> emqx_types:deliver_result(). -callback should_route_to_external_dests(emqx_types:message()) -> boolean(). @@ -64,8 +64,8 @@ Provider:IfRegistered catch Err:Reason:St -> - ?SLOG(error, #{ - msg => "external_broker_crashed", + ?SLOG_THROTTLE(error, #{ + msg => external_broker_crashed, provider => Provider, callback => ?FUNCTION_NAME, stacktrace => St, diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index 4756d0a37..09bdd6129 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -29,8 +29,8 @@ -export([push/5]). -export([wait/1]). --export([close/1]). --export([open/1]). +-export([suspend/1]). +-export([activate/1]). -export([stats/0]). @@ -49,7 +49,7 @@ min_sync_interval => non_neg_integer(), error_delay => non_neg_integer(), error_retry_interval => non_neg_integer(), - initial_state => open | closed, + initial_state => activated | suspended, batch_handler => {module(), _Function :: atom(), _Args :: list()} }. @@ -166,11 +166,13 @@ mk_push_context(_) -> %% -close(Ref) -> - gen_server:call(Ref, close, infinity). +%% Suspended syncer receives and accumulates route ops but doesn't apply them +%% until it is activated. +suspend(Ref) -> + gen_server:call(Ref, suspend, infinity). -open(Ref) -> - gen_server:call(Ref, open, infinity). +activate(Ref) -> + gen_server:call(Ref, activate, infinity). %% @@ -191,7 +193,7 @@ stats() -> mk_state(Options) -> #{ - state => maps:get(initial_state, Options, open), + state => maps:get(initial_state, Options, active), stash => stash_new(), retry_timer => undefined, max_batch_size => maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE), @@ -209,13 +211,13 @@ init({Pool, Id, State}) -> init(State) -> {ok, State}. -handle_call(close, _From, State) -> - NState = State#{state := closed}, +handle_call(suspend, _From, State) -> + NState = State#{state := suspended}, {reply, ok, NState}; -handle_call(open, _From, State = #{state := closed}) -> - NState = run_batch_loop([], State#{state := open}), +handle_call(activate, _From, State = #{state := suspended}) -> + NState = run_batch_loop([], State#{state := active}), {reply, ok, NState}; -handle_call(open, _From, State) -> +handle_call(activate, _From, State) -> {reply, ok, State}; handle_call(stats, _From, State = #{stash := Stash}) -> {reply, stash_stats(Stash), State}; @@ -239,7 +241,7 @@ terminate(_Reason, _State) -> %% -run_batch_loop(Incoming, State = #{stash := Stash0, state := closed}) -> +run_batch_loop(Incoming, State = #{stash := Stash0, state := suspended}) -> Stash1 = stash_add(Incoming, Stash0), Stash2 = stash_drain(Stash1), State#{stash := Stash2}; diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index fd5280262..cdfe22f3d 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -210,7 +210,7 @@ update_routes(ClusterName, Actor, RouteOps) -> ActorSt = get_actor_state(ClusterName, Actor), lists:foreach( fun(RouteOp) -> - emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorSt) + _ = emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorSt) end, RouteOps ). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl index 750387ca9..ddf3028a2 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -14,15 +14,14 @@ start(_StartType, _StartArgs) -> ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()), emqx_cluster_link_config:add_handler(), LinksConf = emqx_cluster_link_config:enabled_links(), - _ = - case LinksConf of - [_ | _] -> - ok = emqx_cluster_link:register_external_broker(), - ok = emqx_cluster_link:put_hook(), - ok = start_msg_fwd_resources(LinksConf); - _ -> - ok - end, + case LinksConf of + [_ | _] -> + ok = emqx_cluster_link:register_external_broker(), + ok = emqx_cluster_link:put_hook(), + ok = start_msg_fwd_resources(LinksConf); + _ -> + ok + end, emqx_cluster_link_sup:start_link(LinksConf). prep_stop(State) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 67dc267e6..28344cd7e 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -100,7 +100,8 @@ actor_heartbeat_interval() -> mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) -> ClientId = maps:get(clientid, LinkConf, cluster()), #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS), - Opts = #{ + Opts = maps:with([username, retry_interval, max_inflight], LinkConf), + Opts1 = Opts#{ host => Host, port => Port, clientid => ClientId, @@ -108,12 +109,7 @@ mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = Link ssl => EnableSsl, ssl_opts => maps:to_list(maps:remove(enable, Ssl)) }, - with_password(with_user(Opts, LinkConf), LinkConf). - -with_user(Opts, #{username := U} = _LinkConf) -> - Opts#{username => U}; -with_user(Opts, _LinkConf) -> - Opts. + with_password(Opts1, LinkConf). with_password(Opts, #{password := P} = _LinkConf) -> Opts#{password => emqx_secret:unwrap(P)}; diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index a97aa7ece..79d96e207 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -119,23 +119,34 @@ create_tables() -> %% +-spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) -> Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]), %% `unique` opt is not enough, since we keep the original Topic as a part of RouteID lists:ukeysort(#route.dest, [match_to_route(M) || M <- Matches]). +-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. lookup_routes(Topic) -> - Pat = #extroute{entry = emqx_topic_index:make_key(Topic, '$1'), _ = '_'}, + Pat = make_extroute_rec_pat(emqx_topic_index:make_key(Topic, '$1')), [match_to_route(R#extroute.entry) || Records <- ets:match(?EXTROUTE_TAB, Pat), R <- Records]. +-spec topics() -> [emqx_types:topic()]. topics() -> - Pat = #extroute{entry = '$1', _ = '_'}, + Pat = make_extroute_rec_pat('$1'), [emqx_topic_index:get_topic(K) || [K] <- ets:match(?EXTROUTE_TAB, Pat)]. match_to_route(M) -> ?ROUTE_ID(Cluster, _) = emqx_topic_index:get_id(M), #route{topic = emqx_topic_index:get_topic(M), dest = Cluster}. +%% Make Dialyzer happy +make_extroute_rec_pat(Entry) -> + erlang:make_tuple( + record_info(size, extroute), + '_', + [{1, extroute}, {#extroute.entry, Entry}] + ). + %% -record(state, { @@ -143,12 +154,12 @@ match_to_route(M) -> actor :: actor(), incarnation :: incarnation(), lane :: lane() | undefined, - extra :: map() + extra = #{} :: map() }). -type state() :: #state{}. --type env() :: #{timestamp := _Milliseconds}. +-type env() :: #{timestamp => _Milliseconds}. -spec actor_init(cluster(), actor(), incarnation(), env()) -> {ok, state()}. actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) -> @@ -170,10 +181,8 @@ is_present_incarnation(_State) -> -spec list_actors(cluster()) -> [#{actor := actor(), incarnation := incarnation()}]. list_actors(Cluster) -> - Matches = ets:match( - emqx_external_router_actor, - #actor{id = {Cluster, '$1'}, incarnation = '$2', _ = '_'} - ), + Pat = make_actor_rec_pat([{#actor.id, {Cluster, '$1'}}, {#actor.incarnation, '$2'}]), + Matches = ets:match(emqx_external_router_actor, Pat), [#{actor => Actor, incarnation => Incr} || [Actor, Incr] <- Matches]. mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> @@ -291,7 +300,8 @@ apply_operation(Entry, MCounter, OpName, Lane) -> -spec actor_gc(env()) -> _NumCleaned :: non_neg_integer(). actor_gc(#{timestamp := Now}) -> - MS = [{#actor{until = '$1', _ = '_'}, [{'<', '$1', Now}], ['$_']}], + Pat = make_actor_rec_pat([{#actor.until, '$1'}]), + MS = [{Pat, [{'<', '$1', Now}], ['$_']}], Dead = mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS), try_clean_incarnation(Dead). @@ -316,9 +326,18 @@ mnesia_assign_lane(Cluster) -> Lane. select_cluster_lanes(Cluster) -> - MS = [{#actor{id = {Cluster, '_'}, lane = '$1', _ = '_'}, [], ['$1']}], + Pat = make_actor_rec_pat([{#actor.id, {Cluster, '_'}}, {#actor.lane, '$1'}]), + MS = [{Pat, [], ['$1']}], mnesia:select(?EXTROUTE_ACTOR_TAB, MS, write). +%% Make Dialyzer happy +make_actor_rec_pat(PosValues) -> + erlang:make_tuple( + record_info(size, actor), + '_', + [{1, actor} | PosValues] + ). + mnesia_actor_heartbeat(ActorID, Incarnation, TS) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of [#actor{incarnation = Incarnation} = Rec] -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl index 89258b506..695273808 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl @@ -37,7 +37,7 @@ run() -> %% -record(st, { - gc_timer :: reference() + gc_timer :: undefined | reference() }). init(_) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index e4b398397..7a8bf1dff 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -89,16 +89,12 @@ ensure_msg_fwd_resource(ClusterName) when is_binary(ClusterName) -> undefined -> {error, link_config_not_found} end; -ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) -> - ResConf = #{ +ensure_msg_fwd_resource(#{upstream := Name, resource_opts := ResOpts} = ClusterConf) -> + ResOpts1 = ResOpts#{ query_mode => async, - start_after_created => true, - start_timeout => 5000, - health_check_interval => 5000, - %% TODO: configure res_buf_worker pool separately? - worker_pool_size => PoolSize + start_after_created => true }, - emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResConf). + emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResOpts1). -spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}. remove_msg_fwd_resource(ClusterName) -> @@ -344,7 +340,7 @@ publish_heartbeat(ClientPid, Actor, Incarnation) -> ?F_ACTOR => Actor, ?F_INCARNATION => Incarnation }, - emqtt:publish_async(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_0, undefined). + emqtt:publish_async(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_0, {fun(_) -> ok end, []}). decode_route_op(Payload) -> decode_route_op1(?DECODE(Payload)). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index bccb3e349..fdcbd91c7 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -196,20 +196,20 @@ start_link_syncer(Actor, Incarnation, SyncerRef, ClientName) -> max_batch_size => ?MAX_BATCH_SIZE, min_sync_interval => ?MIN_SYNC_INTERVAL, error_delay => ?ERROR_DELAY, - initial_state => closed, + initial_state => suspended, batch_handler => {?MODULE, process_syncer_batch, [ClientName, Actor, Incarnation]} %% TODO: enable_replies => false }). -close_syncer(TargetCluster, ?PS_ACTOR) -> - emqx_router_syncer:close(?PS_SYNCER_REF(TargetCluster)); -close_syncer(TargetCluster, _Actor) -> - emqx_router_syncer:close(?SYNCER_REF(TargetCluster)). +suspend_syncer(TargetCluster, ?PS_ACTOR) -> + emqx_router_syncer:suspend(?PS_SYNCER_REF(TargetCluster)); +suspend_syncer(TargetCluster, _Actor) -> + emqx_router_syncer:suspend(?SYNCER_REF(TargetCluster)). -open_syncer(TargetCluster, ?PS_ACTOR) -> - emqx_router_syncer:open(?PS_SYNCER_REF(TargetCluster)); -open_syncer(TargetCluster, _Actor) -> - emqx_router_syncer:open(?SYNCER_REF(TargetCluster)). +activate_syncer(TargetCluster, ?PS_ACTOR) -> + emqx_router_syncer:activate(?PS_SYNCER_REF(TargetCluster)); +activate_syncer(TargetCluster, _Actor) -> + emqx_router_syncer:activate(?SYNCER_REF(TargetCluster)). process_syncer_batch(Batch, ClientName, Actor, Incarnation) -> Updates = maps:fold( @@ -296,12 +296,12 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> target :: binary(), actor :: binary(), incarnation :: non_neg_integer(), - client :: {pid(), reference()} | undefined, + client :: undefined | pid(), bootstrapped :: boolean(), - reconnect_timer :: reference(), - heartbeat_timer :: reference(), - actor_init_req_id :: binary(), - actor_init_timer :: reference(), + reconnect_timer :: undefined | reference(), + heartbeat_timer :: undefined | reference(), + actor_init_req_id :: undefined | binary(), + actor_init_timer :: undefined | reference(), remote_actor_info :: undefined | map(), status :: connecting | connected | disconnected, error :: undefined | term(), @@ -336,7 +336,11 @@ handle_info( {publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}}, St = #st{actor_init_req_id = ReqId} ) -> - {actor_init_ack, #{result := Res, need_bootstrap := NeedBootstrap} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp( + {actor_init_ack, + #{ + result := Res, + need_bootstrap := NeedBootstrap + } = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp( Payload ), St1 = St#st{ @@ -451,7 +455,7 @@ handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) -> actor => St#st.actor }), %% TODO: syncer may be already down due to one_for_all strategy - ok = close_syncer(TargetCluster, Actor), + ok = suspend_syncer(TargetCluster, Actor), _ = maybe_alarm(Reason, St), NSt = cancel_heartbeat(St), process_connect(NSt#st{client = undefined, error = Reason, status = connecting}). @@ -519,7 +523,7 @@ run_bootstrap(Bootstrap, St) -> process_bootstrapped( St = #st{target = TargetCluster, actor = Actor} ) -> - ok = open_syncer(TargetCluster, Actor), + ok = activate_syncer(TargetCluster, Actor), St#st{bootstrapped = true}. process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) -> @@ -529,7 +533,7 @@ ensure_bootstrap_heartbeat(St = #st{heartbeat_timer = TRef}) -> case erlang:read_timer(TRef) of false -> ok = emqx_utils:cancel_timer(TRef), - process_heartbeat(St); + process_heartbeat(St#st{heartbeat_timer = undefined}); _TimeLeft -> St end. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl index 03c8902df..b6d0fbcda 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -21,45 +21,87 @@ desc/1 ]). +-import(emqx_schema, [mk_duration/2]). + -define(MQTT_HOST_OPTS, #{default_port => 1883}). -namespace() -> "cluster_linking". +namespace() -> "cluster". roots() -> []. injected_fields() -> - #{cluster => fields("cluster_linking")}. + #{cluster => [{links, links_schema(#{})}]}. links_schema(Meta) -> - ?HOCON(?ARRAY(?R_REF("link")), Meta#{default => [], validator => fun links_validator/1}). + ?HOCON(?ARRAY(?R_REF("link")), Meta#{ + default => [], validator => fun links_validator/1, desc => ?DESC("links") + }). -fields("cluster_linking") -> - [{links, links_schema(#{})}]; fields("link") -> [ - {enable, ?HOCON(boolean(), #{default => true})}, - {upstream, ?HOCON(binary(), #{required => true})}, + {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, + {upstream, ?HOCON(binary(), #{required => true, desc => ?DESC(upstream)})}, {server, - emqx_schema:servers_sc(#{required => true, desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, - {clientid, ?HOCON(binary(), #{desc => ?DESC("clientid")})}, - {username, ?HOCON(binary(), #{desc => ?DESC("username")})}, - {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}, + emqx_schema:servers_sc(#{required => true, desc => ?DESC(server)}, ?MQTT_HOST_OPTS)}, + {clientid, ?HOCON(binary(), #{desc => ?DESC(clientid)})}, + {username, ?HOCON(binary(), #{desc => ?DESC(username)})}, + {password, emqx_schema_secret:mk(#{desc => ?DESC(password)})}, {ssl, #{ type => ?R_REF(emqx_schema, "ssl_client_opts"), default => #{<<"enable">> => false}, - desc => ?DESC("ssl") + desc => ?DESC(ssl) }}, {topics, - ?HOCON(?ARRAY(binary()), #{required => true, validator => fun topics_validator/1})}, - {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})}, + ?HOCON(?ARRAY(binary()), #{ + desc => ?DESC(topics), required => true, validator => fun topics_validator/1 + })}, + {pool_size, ?HOCON(pos_integer(), #{default => 8, desc => ?DESC(pool_size)})}, + {retry_interval, + mk_duration( + "MQTT Message retry interval. Delay for the link to retry sending the QoS1/QoS2 " + "messages in case of ACK not received.", + #{default => <<"15s">>} + )}, + {max_inflight, + ?HOCON( + non_neg_integer(), + #{ + default => 32, + desc => ?DESC("max_inflight") + } + )}, + {resource_opts, + ?HOCON( + ?R_REF(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )}, %% Must not be configured manually. The value is incremented by cluster link config handler %% and is used as a globally synchronized sequence to ensure persistent routes actors have %% the same next incarnation after each config change. {ps_actor_incarnation, ?HOCON(integer(), #{default => 0, importance => ?IMPORTANCE_HIDDEN})} - ]. + ]; +fields("creation_opts") -> + Opts = emqx_resource_schema:fields("creation_opts"), + [O || {Field, _} = O <- Opts, not is_hidden_res_opt(Field)]. +desc("links") -> + ?DESC("links"); +desc("link") -> + ?DESC("link"); +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); desc(_) -> - "todo". + undefined. + +is_hidden_res_opt(Field) -> + lists:member( + Field, + [start_after_created, query_mode, enable_batch, batch_size, batch_time] + ). %% TODO: check that no link name equals local cluster name, %% but this may be tricky since the link config is injected into cluster config (emqx_conf_schema). diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index b4c59d291..64d341bce 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -82,7 +82,8 @@ connection_rejected_due_to_license_limit_reached, dropped_msg_due_to_mqueue_is_full, socket_receive_paused_by_rate_limit, - data_bridge_buffer_overflow + data_bridge_buffer_overflow, + external_broker_crashed ]). %% Callback to upgrade config after loaded from config file but before validation. diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 8f41a4919..a6efcb443 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -565,10 +565,10 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) -> {error, {unsupported_os, OS}}. format(Term) -> - iolist_to_binary(io_lib:format("~0p", [Term])). + unicode:characters_to_binary(io_lib:format("~0p", [Term])). format(Fmt, Args) -> - iolist_to_binary(io_lib:format(Fmt, Args)). + unicode:characters_to_binary(io_lib:format(Fmt, Args)). %% @doc Helper function for log formatters. -spec format_mfal(map(), map()) -> undefined | binary(). diff --git a/rel/i18n/emqx_cluster_link_schema.hocon b/rel/i18n/emqx_cluster_link_schema.hocon new file mode 100644 index 000000000..77e4987f7 --- /dev/null +++ b/rel/i18n/emqx_cluster_link_schema.hocon @@ -0,0 +1,53 @@ +emqx_cluster_link_schema { + +links.desc: +"""The list of the linked EMQX clusters.""" +links.label: "Cluster Links" + +link.desc: +"""Cluster link configuration""" +link.label: "Cluster Link" + +enable.desc: +"""Enable or disable a cluster link. The link is enabled by default, disabling it allows stopping the link without removing its configuration. The link must be enabled on both sides to be operational. Disabling the link should also be done on both clusters in order to free up all associated resources.""" +enable.label: "Enable" + +upstream.desc: +"""Upstream cluster name. Must be exactly equal to the value of `cluster.name` configured at the remote cluster. Must not be equal to the local cluster.name. All configured cluster link upstream names must be unique.""" +upstream.label: "Upstream Name" + +server.desc: +"""MQTT host and port of the remote EMQX broker.""" +server.label: "MQTT Server" + +username.desc: +"""Optional MQTT username for connecting to the remote EMQX cluster.""" +username.label: "Username" + +password.desc: +"""Optional MQTT username for connecting to the remote EMQX cluster.""" +password.label: "Password" + +clientid.desc: +"""Optional Base MQTT client ID for connecting to the remote EMQX cluster. If omitted, local `cluster.name` is used. EMQX maintains several connections between linked clusters, so distinct suffixes are automatically appended to the base client ID.""" +clientid.label: "Base Client ID" + +ssl.desc: """SSL configuration for connecting to the remote EMQX cluster.""" +ssl.label: "SSL Options" + +topics.desc: """MQTT topics to be forwarded by the linked remote EMQX broker to the local broker. Messages are only forwarded if the local EMQX broker has matching subscriber(s). +Wildcards are supported. Setting empty topics list on one side of the link can be used to establish unidirectional links: the side with the empty topics won't receive remote messages, but it can forward relevant messages to its linked counterpart (according to the topics configured on that side of the link).""" +topics.label: "Topics" + +pool_size.desc: +"""Size of the pool of MQTT clients that will publish messages to the linked EMQX broker.""" + +pool_size.label: +"""Connection Pool Size""" + +max_inflight.desc: +"""Max inflight (sent, but un-acked) messages of the MQTT protocol""" + +max_inflight.label: +"""Max Inflight Message""" +}