diff --git a/apps/emqx_bridge_pulsar/rebar.config b/apps/emqx_bridge_pulsar/rebar.config
index d5a63f320..c77007b93 100644
--- a/apps/emqx_bridge_pulsar/rebar.config
+++ b/apps/emqx_bridge_pulsar/rebar.config
@@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
- {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}},
+ {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.2"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
index 7d1b20d24..721937cd2 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
@@ -57,6 +57,14 @@ fields(config) ->
sensitive => true,
desc => ?DESC("authentication")
}
+ )},
+ {connect_timeout,
+ mk(
+ emqx_schema:duration_ms(),
+ #{
+ default => <<"5s">>,
+ desc => ?DESC("connect_timeout")
+ }
)}
] ++ emqx_connector_schema_lib:ssl_fields();
fields(producer_opts) ->
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
index 59956e1b6..5ed706511 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
@@ -48,6 +48,7 @@
memory_overload_protection := boolean()
},
compression := compression_mode(),
+ connect_timeout := emqx_schema:duration_ms(),
max_batch_bytes := emqx_schema:bytesize(),
message := message_template_raw(),
pulsar_topic := binary(),
@@ -81,7 +82,9 @@ on_start(InstanceId, Config) ->
Servers = format_servers(Servers0),
ClientId = make_client_id(InstanceId, BridgeName),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
+ ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
ClientOpts = #{
+ connect_timeout => ConnectTimeout,
ssl_opts => SSLOpts,
conn_opts => conn_opts(Config)
},
@@ -96,13 +99,19 @@ on_start(InstanceId, Config) ->
}
);
{error, Reason} ->
+ RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1),
?SLOG(error, #{
msg => "failed_to_start_pulsar_client",
instance_id => InstanceId,
pulsar_hosts => Servers,
- reason => emqx_utils:redact(Reason, fun is_sensitive_key/1)
+ reason => RedactedReason
}),
- throw(failed_to_start_pulsar_client)
+ Message =
+ case get_error_message(RedactedReason) of
+ {ok, Msg} -> Msg;
+ error -> failed_to_start_pulsar_client
+ end,
+ throw(Message)
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
@@ -422,3 +431,19 @@ partition_strategy(Strategy) -> Strategy.
is_sensitive_key(auth_data) -> true;
is_sensitive_key(_) -> false.
+
+get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
+ Iter = maps:iterator(BrokerErrorMap),
+ do_get_error_message(Iter);
+get_error_message(_Error) ->
+ error.
+
+do_get_error_message(Iter) ->
+ case maps:next(Iter) of
+ {{_Broker, _Port}, #{message := Message}, _NIter} ->
+ {ok, Message};
+ {_K, _V, NIter} ->
+ do_get_error_message(NIter);
+ none ->
+ error
+ end.
diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl
index aec811e5d..364853eec 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard.erl
@@ -192,7 +192,9 @@ ranch_opts(Options) ->
RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
proto_opts(#{proxy_header := ProxyHeader}) ->
- #{proxy_header => ProxyHeader}.
+ #{proxy_header => ProxyHeader};
+proto_opts(_Opts) ->
+ #{}.
filter_false(_K, false, S) -> S;
filter_false(K, V, S) -> [{K, V} | S].
diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon
index 92294bb75..d1f5c8b13 100644
--- a/rel/i18n/emqx_bridge_pulsar.hocon
+++ b/rel/i18n/emqx_bridge_pulsar.hocon
@@ -67,6 +67,11 @@ emqx_bridge_pulsar {
label = "Enable or Disable"
}
+ connect_timeout {
+ desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
+ label = "Connect Timeout"
+ }
+
desc_name {
desc = "Bridge name, used as a human-readable description of the bridge."
label = "Bridge Name"
diff --git a/rel/i18n/zh/emqx_bridge_pulsar.hocon b/rel/i18n/zh/emqx_bridge_pulsar.hocon
index 23643060b..4e2fd5c9f 100644
--- a/rel/i18n/zh/emqx_bridge_pulsar.hocon
+++ b/rel/i18n/zh/emqx_bridge_pulsar.hocon
@@ -20,6 +20,11 @@ emqx_bridge_pulsar {
label = "启用或停用"
}
+ connect_timeout {
+ desc = "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
+ label = "连接超时时间"
+ }
+
servers {
desc = "以逗号分隔的 scheme://host[:port]
格式的 Pulsar URL 列表,"
"支持的 scheme 有 pulsar://
(默认)"