diff --git a/apps/emqx/include/emqx_channel.hrl b/apps/emqx/include/emqx_channel.hrl
index d4362633a..be2448a20 100644
--- a/apps/emqx/include/emqx_channel.hrl
+++ b/apps/emqx/include/emqx_channel.hrl
@@ -40,3 +40,5 @@
session,
will_msg
]).
+
+-define(EXPIRE_INTERVAL_INFINITE, 4294967295000).
diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl
index d583b1691..5637bb171 100644
--- a/apps/emqx/src/emqx_channel.erl
+++ b/apps/emqx/src/emqx_channel.erl
@@ -2088,7 +2088,7 @@ maybe_resume_session(#channel{
maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
case maps:get(expiry_interval, ConnInfo) of
- ?UINT_MAX ->
+ ?EXPIRE_INTERVAL_INFINITE ->
{ok, Channel};
I when I > 0 ->
{ok, ensure_timer(expire_timer, I, Channel)};
diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl
index 9a3b4e39b..ebcf9c434 100644
--- a/apps/emqx/src/emqx_cm.erl
+++ b/apps/emqx/src/emqx_cm.erl
@@ -773,6 +773,7 @@ mark_channel_connected(ChanPid) ->
mark_channel_disconnected(ChanPid) ->
?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}),
ets:delete(?CHAN_LIVE_TAB, ChanPid),
+ ?tp(emqx_cm_connected_client_count_dec_done, #{chan_pid => ChanPid}),
ok.
get_connected_client_count() ->
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 914f87d1f..051b1d4c1 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1556,7 +1556,8 @@ fields("broker") ->
boolean(),
#{
default => true,
- desc => ?DESC(broker_route_batch_clean)
+ desc => "This config is stale since 4.3",
+ importance => ?IMPORTANCE_HIDDEN
}
)},
{"perf",
diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl
index 68f783283..3e9e00c81 100644
--- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl
+++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl
@@ -60,14 +60,12 @@
-export_type([sess_msg_key/0]).
-include("emqx.hrl").
+-include("emqx_channel.hrl").
-include("emqx_persistent_session.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-compile({inline, [is_store_enabled/0]}).
-%% 16#FFFFFFFF * 1000
--define(MAX_EXPIRY_INTERVAL, 4294967295000).
-
%% NOTE: Order is significant because of traversal order of the table.
-define(MARKER, 3).
-define(DELIVERED, 2).
@@ -424,7 +422,7 @@ pending(SessionID, MarkerIds) ->
%% @private [MQTT-3.1.2-23]
persistent_session_status(#session_store{expiry_interval = 0}) ->
not_persistent;
-persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) ->
+persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) ->
persistent;
persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
case E + TS > erlang:system_time(millisecond) of
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index 3aade0369..1fbd6902e 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -211,7 +211,7 @@ send_message(BridgeId, Message) ->
query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of
- Timeout when is_integer(Timeout) ->
+ Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_timeout is configured
#{timeout => Timeout};
_ ->
diff --git a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
index 5e6bf9ac5..5d693547a 100644
--- a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
+++ b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
@@ -5,7 +5,7 @@
-ifndef(EMQX_BRIDGE_IOTDB_HRL).
-define(EMQX_BRIDGE_IOTDB_HRL, true).
--define(VSN_1_0_X, 'v1.0.x').
+-define(VSN_1_X, 'v1.x').
-define(VSN_0_13_X, 'v0.13.x').
-endif.
diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
index 9c5108307..cebf60cb1 100644
--- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
+++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"},
- {vsn, "0.1.0"},
+ {vsn, "0.1.1"},
{modules, [
emqx_bridge_iotdb,
emqx_bridge_iotdb_impl
diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
index 90e8d18a4..aa2c32589 100644
--- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
+++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
@@ -109,10 +109,10 @@ basic_config() ->
)},
{iotdb_version,
mk(
- hoconsc:enum([?VSN_1_0_X, ?VSN_0_13_X]),
+ hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]),
#{
desc => ?DESC("config_iotdb_version"),
- default => ?VSN_1_0_X
+ default => ?VSN_1_X
}
)}
] ++ resource_creation_opts() ++
@@ -217,7 +217,7 @@ conn_bridge_example(_Method, Type) ->
is_aligned => false,
device_id => <<"my_device">>,
base_url => <<"http://iotdb.local:18080/">>,
- iotdb_version => ?VSN_1_0_X,
+ iotdb_version => ?VSN_1_X,
connect_timeout => <<"15s">>,
pool_type => <<"random">>,
pool_size => 8,
diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
index 2f8794560..8331e715f 100644
--- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
+++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
@@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
%% Internal Functions
%%--------------------------------------------------------------------
-preproc_data(DataList) ->
+make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
+ emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
+make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
+ lists:map(fun make_parsed_payload/1, PayloadUnparsed);
+make_parsed_payload(
+ #{
+ measurement := Measurement,
+ data_type := DataType,
+ value := Value
+ } = Data
+) ->
+ Data#{
+ <<"measurement">> => Measurement,
+ <<"data_type">> => DataType,
+ <<"value">> => Value
+ }.
+
+preproc_data(
+ #{
+ <<"measurement">> := Measurement,
+ <<"data_type">> := DataType,
+ <<"value">> := Value
+ } = Data
+) ->
+ #{
+ timestamp => emqx_plugin_libs_rule:preproc_tmpl(
+ maps:get(<<"timestamp">>, Data, <<"now">>)
+ ),
+ measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
+ data_type => DataType,
+ value => emqx_plugin_libs_rule:preproc_tmpl(Value)
+ }.
+
+preproc_data_list(DataList) ->
lists:map(
- fun(
- #{
- measurement := Measurement,
- data_type := DataType,
- value := Value
- } = Data
- ) ->
- #{
- timestamp => emqx_plugin_libs_rule:preproc_tmpl(
- maps:get(<<"timestamp">>, Data, <<"now">>)
- ),
- measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
- data_type => DataType,
- value => emqx_plugin_libs_rule:preproc_tmpl(Value)
- }
- end,
+ fun preproc_data/1,
DataList
).
@@ -258,12 +276,13 @@ convert_float(Str) when is_binary(Str) ->
convert_float(undefined) ->
null.
-make_iotdb_insert_request(Message, State) ->
+make_iotdb_insert_request(MessageUnparsedPayload, State) ->
+ Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
IsAligned = maps:get(is_aligned, State, false),
DeviceId = device_id(Message, State),
- IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X),
+ IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
Payload = make_list(maps:get(payload, Message)),
- PreProcessedData = preproc_data(Payload),
+ PreProcessedData = preproc_data_list(Payload),
DataList = proc_data(PreProcessedData, Message),
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
@@ -330,15 +349,15 @@ insert_value(1, Data, [Value | Values]) ->
insert_value(Index, Data, [Value | Values]) ->
[[null | Value] | insert_value(Index - 1, Data, Values)].
-iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
+iotdb_field_key(is_aligned, ?VSN_1_X) ->
<<"is_aligned">>;
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
<<"isAligned">>;
-iotdb_field_key(device_id, ?VSN_1_0_X) ->
+iotdb_field_key(device_id, ?VSN_1_X) ->
<<"device">>;
iotdb_field_key(device_id, ?VSN_0_13_X) ->
<<"deviceId">>;
-iotdb_field_key(data_types, ?VSN_1_0_X) ->
+iotdb_field_key(data_types, ?VSN_1_X) ->
<<"data_types">>;
iotdb_field_key(data_types, ?VSN_0_13_X) ->
<<"dataTypes">>.
@@ -350,6 +369,8 @@ device_id(Message, State) ->
case maps:get(device_id, State, undefined) of
undefined ->
case maps:get(payload, Message) of
+ #{<<"device_id">> := DeviceId} ->
+ DeviceId;
#{device_id := DeviceId} ->
DeviceId;
_NotFound ->
diff --git a/apps/emqx_bridge_pulsar/rebar.config b/apps/emqx_bridge_pulsar/rebar.config
index d5a63f320..c77007b93 100644
--- a/apps/emqx_bridge_pulsar/rebar.config
+++ b/apps/emqx_bridge_pulsar/rebar.config
@@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
- {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}},
+ {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.2"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
index 7d1b20d24..721937cd2 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
@@ -57,6 +57,14 @@ fields(config) ->
sensitive => true,
desc => ?DESC("authentication")
}
+ )},
+ {connect_timeout,
+ mk(
+ emqx_schema:duration_ms(),
+ #{
+ default => <<"5s">>,
+ desc => ?DESC("connect_timeout")
+ }
)}
] ++ emqx_connector_schema_lib:ssl_fields();
fields(producer_opts) ->
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
index 59956e1b6..5ed706511 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
@@ -48,6 +48,7 @@
memory_overload_protection := boolean()
},
compression := compression_mode(),
+ connect_timeout := emqx_schema:duration_ms(),
max_batch_bytes := emqx_schema:bytesize(),
message := message_template_raw(),
pulsar_topic := binary(),
@@ -81,7 +82,9 @@ on_start(InstanceId, Config) ->
Servers = format_servers(Servers0),
ClientId = make_client_id(InstanceId, BridgeName),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
+ ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
ClientOpts = #{
+ connect_timeout => ConnectTimeout,
ssl_opts => SSLOpts,
conn_opts => conn_opts(Config)
},
@@ -96,13 +99,19 @@ on_start(InstanceId, Config) ->
}
);
{error, Reason} ->
+ RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1),
?SLOG(error, #{
msg => "failed_to_start_pulsar_client",
instance_id => InstanceId,
pulsar_hosts => Servers,
- reason => emqx_utils:redact(Reason, fun is_sensitive_key/1)
+ reason => RedactedReason
}),
- throw(failed_to_start_pulsar_client)
+ Message =
+ case get_error_message(RedactedReason) of
+ {ok, Msg} -> Msg;
+ error -> failed_to_start_pulsar_client
+ end,
+ throw(Message)
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
@@ -422,3 +431,19 @@ partition_strategy(Strategy) -> Strategy.
is_sensitive_key(auth_data) -> true;
is_sensitive_key(_) -> false.
+
+get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
+ Iter = maps:iterator(BrokerErrorMap),
+ do_get_error_message(Iter);
+get_error_message(_Error) ->
+ error.
+
+do_get_error_message(Iter) ->
+ case maps:next(Iter) of
+ {{_Broker, _Port}, #{message := Message}, _NIter} ->
+ {ok, Message};
+ {_K, _V, NIter} ->
+ do_get_error_message(NIter);
+ none ->
+ error
+ end.
diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
index 36f47aaf6..2b572a98c 100644
--- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
+++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"},
- {vsn, "0.1.0"},
+ {vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]},
{env, []},
diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
index 6f833d659..3e809d99c 100644
--- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
+++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
@@ -72,7 +72,7 @@ fields(config) ->
desc => ?DESC("username")
}
)},
- {password, fun emqx_connector_schema_lib:password/1},
+ {password, fun emqx_connector_schema_lib:password_required/1},
{pool_size,
hoconsc:mk(
typerefl:pos_integer(),
diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl
index 6adb456ca..b6e511398 100644
--- a/apps/emqx_conf/src/emqx_conf_schema.erl
+++ b/apps/emqx_conf/src/emqx_conf_schema.erl
@@ -508,6 +508,7 @@ fields("node") ->
desc => ?DESC(node_crash_dump_file),
default => crash_dump_file_default(),
importance => ?IMPORTANCE_HIDDEN,
+ converter => fun ensure_unicode_path/2,
'readOnly' => true
}
)},
@@ -755,6 +756,7 @@ fields("rpc") ->
file(),
#{
mapping => "gen_rpc.certfile",
+ converter => fun ensure_unicode_path/2,
desc => ?DESC(rpc_certfile)
}
)},
@@ -763,6 +765,7 @@ fields("rpc") ->
file(),
#{
mapping => "gen_rpc.keyfile",
+ converter => fun ensure_unicode_path/2,
desc => ?DESC(rpc_keyfile)
}
)},
@@ -771,6 +774,7 @@ fields("rpc") ->
file(),
#{
mapping => "gen_rpc.cacertfile",
+ converter => fun ensure_unicode_path/2,
desc => ?DESC(rpc_cacertfile)
}
)},
@@ -897,10 +901,11 @@ fields("log_file_handler") ->
#{
desc => ?DESC("log_file_handler_file"),
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
- converter => fun emqx_schema:naive_env_interpolation/1,
- validator => fun validate_file_location/1,
aliases => [file],
- importance => ?IMPORTANCE_HIGH
+ importance => ?IMPORTANCE_HIGH,
+ converter => fun(Path, Opts) ->
+ emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts))
+ end
}
)},
{"rotation_count",
@@ -1318,11 +1323,6 @@ emqx_schema_high_prio_roots() ->
)},
lists:keyreplace("authorization", 1, Roots, Authz).
-validate_file_location(File) ->
- ValidFile = "^[/\\_a-zA-Z0-9\\.\\-]*$",
- Error = "Invalid file name: " ++ ValidFile,
- validator_string_re(File, ValidFile, Error).
-
validate_time_offset(Offset) ->
ValidTimeOffset = "^([\\-\\+][0-1][0-9]:[0-6][0-9]|system|utc)$",
Error =
@@ -1356,3 +1356,20 @@ ensure_file_handlers(Conf, _Opts) ->
convert_rotation(undefined, _Opts) -> undefined;
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
+
+ensure_unicode_path(undefined, _) ->
+ undefined;
+ensure_unicode_path(Path, #{make_serializable := true}) ->
+ %% format back to serializable string
+ unicode:characters_to_binary(Path, utf8);
+ensure_unicode_path(Path, Opts) when is_binary(Path) ->
+ case unicode:characters_to_list(Path, utf8) of
+ {R, _, _} when R =:= error orelse R =:= incomplete ->
+ throw({"bad_file_path_string", Path});
+ PathStr ->
+ ensure_unicode_path(PathStr, Opts)
+ end;
+ensure_unicode_path(Path, _) when is_list(Path) ->
+ Path;
+ensure_unicode_path(Path, _) ->
+ throw({"not_string", Path}).
diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl
index b59a5f819..32c66fb90 100644
--- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl
+++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl
@@ -438,3 +438,63 @@ ensure_acl_conf() ->
true -> ok;
false -> file:write_file(File, <<"">>)
end.
+
+log_path_test_() ->
+ Fh = fun(Path) ->
+ #{<<"log">> => #{<<"file_handlers">> => #{<<"name1">> => #{<<"file">> => Path}}}}
+ end,
+ Assert = fun(Name, Path, Conf) ->
+ ?assertMatch(#{log := #{file := #{Name := #{to := Path}}}}, Conf)
+ end,
+
+ [
+ {"default-values", fun() -> Assert(default, "log/emqx.log", check(#{})) end},
+ {"file path with space", fun() -> Assert(name1, "a /b", check(Fh(<<"a /b">>))) end},
+ {"windows", fun() -> Assert(name1, "c:\\a\\ b\\", check(Fh(<<"c:\\a\\ b\\">>))) end},
+ {"unicoded", fun() -> Assert(name1, "路 径", check(Fh(<<"路 径"/utf8>>))) end},
+ {"bad utf8", fun() ->
+ ?assertThrow(
+ {emqx_conf_schema, [
+ #{
+ kind := validation_error,
+ mismatches :=
+ #{
+ "handler_name" :=
+ #{
+ kind := validation_error,
+ path := "log.file.name1.to",
+ reason := {"bad_file_path_string", _}
+ }
+ }
+ }
+ ]},
+ check(Fh(<<239, 32, 132, 47, 117, 116, 102, 56>>))
+ )
+ end},
+ {"not string", fun() ->
+ ?assertThrow(
+ {emqx_conf_schema, [
+ #{
+ kind := validation_error,
+ mismatches :=
+ #{
+ "handler_name" :=
+ #{
+ kind := validation_error,
+ path := "log.file.name1.to",
+ reason := {"not_string", _}
+ }
+ }
+ }
+ ]},
+ check(Fh(#{<<"foo">> => <<"bar">>}))
+ )
+ end}
+ ].
+
+check(Config) ->
+ Schema = emqx_conf_schema,
+ {_, Conf} = hocon_tconf:map(Schema, Config, [log], #{
+ atom_key => false, required => false, format => map
+ }),
+ emqx_utils_maps:unsafe_atom_key_map(Conf).
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 6ecfd0b59..9426e16f2 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -406,7 +406,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
ok ->
- {connected, State};
+ connected;
+ {error, still_connecting} ->
+ connecting;
{error, Reason} ->
{disconnected, State, Reason}
end.
@@ -428,7 +430,8 @@ do_get_status(PoolName, Timeout) ->
end
end,
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
- % we crash in case of non-empty lists since we don't know what to do in that case
+ [] ->
+ {error, still_connecting};
[_ | _] = Results ->
case [E || {error, _} = E <- Results] of
[] ->
diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
index f64208311..a277fe8c8 100644
--- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
@@ -30,6 +30,7 @@
database/1,
username/1,
password/1,
+ password_required/1,
auto_reconnect/1
]).
@@ -104,6 +105,14 @@ password(sensitive) -> true;
password(converter) -> fun emqx_schema:password_converter/2;
password(_) -> undefined.
+password_required(type) -> binary();
+password_required(desc) -> ?DESC("password");
+password_required(required) -> true;
+password_required(format) -> <<"password">>;
+password_required(sensitive) -> true;
+password_required(converter) -> fun emqx_schema:password_converter/2;
+password_required(_) -> undefined.
+
auto_reconnect(type) -> boolean();
auto_reconnect(desc) -> ?DESC("auto_reconnect");
auto_reconnect(default) -> true;
diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl
index aec811e5d..364853eec 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard.erl
@@ -192,7 +192,9 @@ ranch_opts(Options) ->
RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
proto_opts(#{proxy_header := ProxyHeader}) ->
- #{proxy_header => ProxyHeader}.
+ #{proxy_header => ProxyHeader};
+proto_opts(_Opts) ->
+ #{}.
filter_false(_K, false, S) -> S;
filter_false(K, V, S) -> [{K, V} | S].
diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
index 239d9052e..7e6cf5b95 100644
--- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
+++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
@@ -1,6 +1,6 @@
{application, emqx_eviction_agent, [
{description, "EMQX Eviction Agent"},
- {vsn, "5.0.0"},
+ {vsn, "5.0.1"},
{registered, [
emqx_eviction_agent_sup,
emqx_eviction_agent,
diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl
index a6097f03d..1369ee969 100644
--- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl
+++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl
@@ -218,10 +218,10 @@ cancel_expiry_timer(_) ->
set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
case maps:get(expiry_interval, ConnInfo) of
- ?UINT_MAX ->
+ ?EXPIRE_INTERVAL_INFINITE ->
{ok, Channel};
I when I > 0 ->
- Timer = erlang:send_after(timer:seconds(I), self(), expire_session),
+ Timer = erlang:send_after(I, self(), expire_session),
{ok, Channel#{expiry_timer => Timer}};
_ ->
{error, should_be_expired}
diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
index a68a1f292..883407a94 100644
--- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
+++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
@@ -177,7 +177,7 @@ t_explicit_session_takeover(Config) ->
?assert(false, "Connection not evicted")
end
end,
- #{?snk_kind := emqx_cm_connected_client_count_dec, chan_pid := ChanPid},
+ #{?snk_kind := emqx_cm_connected_client_count_dec_done, chan_pid := ChanPid},
2000
),
@@ -383,7 +383,7 @@ t_ws_conn(_Config) ->
?assertWaitEvent(
ok = emqx_eviction_agent:evict_connections(1),
- #{?snk_kind := emqx_cm_connected_client_count_dec},
+ #{?snk_kind := emqx_cm_connected_client_count_dec_done},
1000
),
@@ -418,7 +418,7 @@ t_quic_conn(_Config) ->
?assertWaitEvent(
ok = emqx_eviction_agent:evict_connections(1),
- #{?snk_kind := emqx_cm_connected_client_count_dec},
+ #{?snk_kind := emqx_cm_connected_client_count_dec_done},
1000
),
diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
index 3b7ef6672..4ace80893 100644
--- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
+++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
@@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_channel.hrl").
-define(CLIENT_ID, <<"client_with_session">>).
@@ -101,7 +102,7 @@ t_start_infinite_expire(_Config) ->
conninfo => #{
clientid => ?CLIENT_ID,
receive_maximum => 32,
- expiry_interval => ?UINT_MAX
+ expiry_interval => ?EXPIRE_INTERVAL_INFINITE
}
},
?assertMatch(
diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl
index 6b0e1f622..d451261ff 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl
@@ -139,9 +139,9 @@ lookup(#{topic := Topic}) ->
%%%==============================================================================================
%% internal
generate_topic(Params = #{<<"topic">> := Topic}) ->
- Params#{<<"topic">> => uri_string:percent_decode(Topic)};
+ Params#{<<"topic">> => Topic};
generate_topic(Params = #{topic := Topic}) ->
- Params#{topic => uri_string:percent_decode(Topic)};
+ Params#{topic => Topic};
generate_topic(Params) ->
Params.
diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl
index 659ae0d44..e617c6dcb 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl
@@ -92,4 +92,35 @@ t_nodes_api(Config) ->
#{<<"topic">> := Topic, <<"node">> := Node2}
] = emqx_utils_json:decode(RouteResponse, [return_maps]),
- ?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])).
+ ?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])),
+
+ ok = emqtt:stop(Client).
+
+t_percent_topics(_Config) ->
+ Node = atom_to_binary(node(), utf8),
+ Topic = <<"test_%%1">>,
+ {ok, Client} = emqtt:start_link(#{
+ username => <<"routes_username">>, clientid => <<"routes_cid">>
+ }),
+ {ok, _} = emqtt:connect(Client),
+ {ok, _, _} = emqtt:subscribe(Client, Topic),
+
+ %% exact match with percent encoded topic
+ Path = emqx_mgmt_api_test_util:api_path(["topics"]),
+ QS = uri_string:compose_query([
+ {"topic", Topic},
+ {"node", atom_to_list(node())}
+ ]),
+ Headers = emqx_mgmt_api_test_util:auth_header_(),
+ {ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
+ MatchData = emqx_utils_json:decode(MatchResponse, [return_maps]),
+ ?assertMatch(
+ #{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
+ maps:get(<<"meta">>, MatchData)
+ ),
+ ?assertMatch(
+ [#{<<"topic">> := Topic, <<"node">> := Node}],
+ maps:get(<<"data">>, MatchData)
+ ),
+
+ ok = emqtt:stop(Client).
diff --git a/changes/ce/fix-10801.en.md b/changes/ce/fix-10801.en.md
new file mode 100644
index 000000000..4c36bd528
--- /dev/null
+++ b/changes/ce/fix-10801.en.md
@@ -0,0 +1 @@
+Avoid duplicated percent decode the topic name in API `/topics/{topic}` and `/topics`.
diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon
index 92294bb75..d1f5c8b13 100644
--- a/rel/i18n/emqx_bridge_pulsar.hocon
+++ b/rel/i18n/emqx_bridge_pulsar.hocon
@@ -67,6 +67,11 @@ emqx_bridge_pulsar {
label = "Enable or Disable"
}
+ connect_timeout {
+ desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
+ label = "Connect Timeout"
+ }
+
desc_name {
desc = "Bridge name, used as a human-readable description of the bridge."
label = "Bridge Name"
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index ad63b4ba9..54d866014 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -213,9 +213,6 @@ pending connections can grow to."""
fields_tcp_opts_backlog.label:
"""TCP backlog length"""
-broker_route_batch_clean.desc:
-"""Enable batch clean for deleted routes."""
-
fields_mqtt_quic_listener_initial_window_packets.desc:
"""The size (in packets) of the initial congestion window for a connection. Default: 10"""
diff --git a/rel/i18n/zh/emqx_bridge_pulsar.hocon b/rel/i18n/zh/emqx_bridge_pulsar.hocon
index 23643060b..4e2fd5c9f 100644
--- a/rel/i18n/zh/emqx_bridge_pulsar.hocon
+++ b/rel/i18n/zh/emqx_bridge_pulsar.hocon
@@ -20,6 +20,11 @@ emqx_bridge_pulsar {
label = "启用或停用"
}
+ connect_timeout {
+ desc = "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
+ label = "连接超时时间"
+ }
+
servers {
desc = "以逗号分隔的 scheme://host[:port]
格式的 Pulsar URL 列表,"
"支持的 scheme 有 pulsar://
(默认)"
diff --git a/rel/i18n/zh/emqx_schema.hocon b/rel/i18n/zh/emqx_schema.hocon
index 835372868..0e329eac9 100644
--- a/rel/i18n/zh/emqx_schema.hocon
+++ b/rel/i18n/zh/emqx_schema.hocon
@@ -208,9 +208,6 @@ fields_tcp_opts_backlog.desc:
fields_tcp_opts_backlog.label:
"""TCP 连接队列长度"""
-broker_route_batch_clean.desc:
-"""是否开启批量清除路由。"""
-
fields_mqtt_quic_listener_initial_window_packets.desc:
"""一个连接的初始拥堵窗口的大小(以包为单位)。默认值:10"""