diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml
index 836eaf079..7e664c1c7 100644
--- a/.github/workflows/build_slim_packages.yaml
+++ b/.github/workflows/build_slim_packages.yaml
@@ -111,8 +111,14 @@ jobs:
timeout-minutes: 5
run: |
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx start
- Start-Sleep -s 5
- echo "EMQX started"
+ Start-Sleep -s 10
+ $pingOutput = ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx ping
+ if ($pingOutput = 'pong') {
+ echo "EMQX started OK"
+ } else {
+ echo "Failed to ping EMQX $pingOutput"
+ Exit 1
+ }
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx stop
echo "EMQX stopped"
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx install
diff --git a/apps/emqx/include/emqx_channel.hrl b/apps/emqx/include/emqx_channel.hrl
index d4362633a..be2448a20 100644
--- a/apps/emqx/include/emqx_channel.hrl
+++ b/apps/emqx/include/emqx_channel.hrl
@@ -40,3 +40,5 @@
session,
will_msg
]).
+
+-define(EXPIRE_INTERVAL_INFINITE, 4294967295000).
diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config
index 97a0c0f31..8ba36a87b 100644
--- a/apps/emqx/rebar.config
+++ b/apps/emqx/rebar.config
@@ -27,7 +27,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
- {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}},
+ {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.6"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl
index d583b1691..5637bb171 100644
--- a/apps/emqx/src/emqx_channel.erl
+++ b/apps/emqx/src/emqx_channel.erl
@@ -2088,7 +2088,7 @@ maybe_resume_session(#channel{
maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
case maps:get(expiry_interval, ConnInfo) of
- ?UINT_MAX ->
+ ?EXPIRE_INTERVAL_INFINITE ->
{ok, Channel};
I when I > 0 ->
{ok, ensure_timer(expire_timer, I, Channel)};
diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl
index 9a3b4e39b..ebcf9c434 100644
--- a/apps/emqx/src/emqx_cm.erl
+++ b/apps/emqx/src/emqx_cm.erl
@@ -773,6 +773,7 @@ mark_channel_connected(ChanPid) ->
mark_channel_disconnected(ChanPid) ->
?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}),
ets:delete(?CHAN_LIVE_TAB, ChanPid),
+ ?tp(emqx_cm_connected_client_count_dec_done, #{chan_pid => ChanPid}),
ok.
get_connected_client_count() ->
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 914f87d1f..051b1d4c1 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1556,7 +1556,8 @@ fields("broker") ->
boolean(),
#{
default => true,
- desc => ?DESC(broker_route_batch_clean)
+ desc => "This config is stale since 4.3",
+ importance => ?IMPORTANCE_HIDDEN
}
)},
{"perf",
diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl
index 68f783283..3e9e00c81 100644
--- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl
+++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl
@@ -60,14 +60,12 @@
-export_type([sess_msg_key/0]).
-include("emqx.hrl").
+-include("emqx_channel.hrl").
-include("emqx_persistent_session.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-compile({inline, [is_store_enabled/0]}).
-%% 16#FFFFFFFF * 1000
--define(MAX_EXPIRY_INTERVAL, 4294967295000).
-
%% NOTE: Order is significant because of traversal order of the table.
-define(MARKER, 3).
-define(DELIVERED, 2).
@@ -424,7 +422,7 @@ pending(SessionID, MarkerIds) ->
%% @private [MQTT-3.1.2-23]
persistent_session_status(#session_store{expiry_interval = 0}) ->
not_persistent;
-persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) ->
+persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) ->
persistent;
persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
case E + TS > erlang:system_time(millisecond) of
diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl
index b0ba4f0e2..8bf965cc3 100644
--- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl
+++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl
@@ -165,6 +165,7 @@ init_per_testcase(_TestCase, Config) ->
{ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}}
end
),
+ snabbkaffe:start_trace(),
_Heir = spawn_dummy_heir(),
{ok, CachePid} = emqx_ocsp_cache:start_link(),
DataDir = ?config(data_dir, Config),
@@ -187,7 +188,6 @@ init_per_testcase(_TestCase, Config) ->
ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
- snabbkaffe:start_trace(),
[
{cache_pid, CachePid}
| Config
@@ -231,12 +231,19 @@ end_per_testcase(_TestCase, Config) ->
%% In some tests, we don't start the full supervision tree, so we need
%% this dummy process.
spawn_dummy_heir() ->
- spawn_link(fun() ->
- true = register(emqx_kernel_sup, self()),
- receive
- stop -> ok
- end
- end).
+ {_, {ok, _}} =
+ ?wait_async_action(
+ spawn_link(fun() ->
+ true = register(emqx_kernel_sup, self()),
+ ?tp(heir_name_registered, #{}),
+ receive
+ stop -> ok
+ end
+ end),
+ #{?snk_kind := heir_name_registered},
+ 1_000
+ ),
+ ok.
does_module_exist(Mod) ->
case erlang:module_loaded(Mod) of
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index 3aade0369..1fbd6902e 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -211,7 +211,7 @@ send_message(BridgeId, Message) ->
query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of
- Timeout when is_integer(Timeout) ->
+ Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_timeout is configured
#{timeout => Timeout};
_ ->
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index 49ddd19bd..30a888118 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -159,9 +159,10 @@ init_node(Type) ->
ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, fun load_suite_config/1),
case Type of
primary ->
+ ok = emqx_dashboard_desc_cache:init(),
ok = emqx_config:put(
[dashboard, listeners],
- #{http => #{enable => true, bind => 18083}, proxy_header => false}
+ #{http => #{enable => true, bind => 18083, proxy_header => false}}
),
ok = emqx_dashboard:start_listeners(),
ready = emqx_dashboard_listener:regenerate_minirest_dispatch(),
diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl
index 79220321e..8f093ef5c 100644
--- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl
+++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl
@@ -506,7 +506,17 @@ t_write_failure(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
- {ok, _} = create_bridge(Config),
+ {ok, _} = create_bridge(
+ Config,
+ #{
+ <<"resource_opts">> =>
+ #{
+ <<"auto_restart_interval">> => <<"100ms">>,
+ <<"resume_interval">> => <<"100ms">>,
+ <<"health_check_interval">> => <<"100ms">>
+ }
+ }
+ ),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{
topic => atom_to_binary(?FUNCTION_NAME),
@@ -523,7 +533,9 @@ t_write_failure(Config) ->
async ->
send_message(Config, SentData)
end,
- #{?snk_kind := buffer_worker_flush_nack},
+ #{?snk_kind := Evt} when
+ Evt =:= buffer_worker_flush_nack orelse
+ Evt =:= buffer_worker_retry_inflight_failed,
10_000
)
end),
diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl
index 49ca57c42..814051733 100644
--- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl
+++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl
@@ -623,6 +623,30 @@ t_publish_success(Config) ->
),
ok.
+t_publish_success_infinity_timeout(Config) ->
+ ServiceAccountJSON = ?config(service_account_json, Config),
+ Topic = <<"t/topic">>,
+ {ok, _} = create_bridge(Config, #{
+ <<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>}
+ }),
+ {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+ Payload = <<"payload">>,
+ Message = emqx_message:make(Topic, Payload),
+ emqx:publish(Message),
+ DecodedMessages = assert_http_request(ServiceAccountJSON),
+ ?assertMatch(
+ [
+ #{
+ <<"topic">> := Topic,
+ <<"payload">> := Payload,
+ <<"metadata">> := #{<<"rule_id">> := RuleId}
+ }
+ ],
+ DecodedMessages
+ ),
+ ok.
+
t_publish_success_local_topic(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
diff --git a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
index 5e6bf9ac5..5d693547a 100644
--- a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
+++ b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
@@ -5,7 +5,7 @@
-ifndef(EMQX_BRIDGE_IOTDB_HRL).
-define(EMQX_BRIDGE_IOTDB_HRL, true).
--define(VSN_1_0_X, 'v1.0.x').
+-define(VSN_1_X, 'v1.x').
-define(VSN_0_13_X, 'v0.13.x').
-endif.
diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
index 9c5108307..cebf60cb1 100644
--- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
+++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"},
- {vsn, "0.1.0"},
+ {vsn, "0.1.1"},
{modules, [
emqx_bridge_iotdb,
emqx_bridge_iotdb_impl
diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
index 90e8d18a4..aa2c32589 100644
--- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
+++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
@@ -109,10 +109,10 @@ basic_config() ->
)},
{iotdb_version,
mk(
- hoconsc:enum([?VSN_1_0_X, ?VSN_0_13_X]),
+ hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]),
#{
desc => ?DESC("config_iotdb_version"),
- default => ?VSN_1_0_X
+ default => ?VSN_1_X
}
)}
] ++ resource_creation_opts() ++
@@ -217,7 +217,7 @@ conn_bridge_example(_Method, Type) ->
is_aligned => false,
device_id => <<"my_device">>,
base_url => <<"http://iotdb.local:18080/">>,
- iotdb_version => ?VSN_1_0_X,
+ iotdb_version => ?VSN_1_X,
connect_timeout => <<"15s">>,
pool_type => <<"random">>,
pool_size => 8,
diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
index 2f8794560..8331e715f 100644
--- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
+++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
@@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
%% Internal Functions
%%--------------------------------------------------------------------
-preproc_data(DataList) ->
+make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
+ emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
+make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
+ lists:map(fun make_parsed_payload/1, PayloadUnparsed);
+make_parsed_payload(
+ #{
+ measurement := Measurement,
+ data_type := DataType,
+ value := Value
+ } = Data
+) ->
+ Data#{
+ <<"measurement">> => Measurement,
+ <<"data_type">> => DataType,
+ <<"value">> => Value
+ }.
+
+preproc_data(
+ #{
+ <<"measurement">> := Measurement,
+ <<"data_type">> := DataType,
+ <<"value">> := Value
+ } = Data
+) ->
+ #{
+ timestamp => emqx_plugin_libs_rule:preproc_tmpl(
+ maps:get(<<"timestamp">>, Data, <<"now">>)
+ ),
+ measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
+ data_type => DataType,
+ value => emqx_plugin_libs_rule:preproc_tmpl(Value)
+ }.
+
+preproc_data_list(DataList) ->
lists:map(
- fun(
- #{
- measurement := Measurement,
- data_type := DataType,
- value := Value
- } = Data
- ) ->
- #{
- timestamp => emqx_plugin_libs_rule:preproc_tmpl(
- maps:get(<<"timestamp">>, Data, <<"now">>)
- ),
- measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
- data_type => DataType,
- value => emqx_plugin_libs_rule:preproc_tmpl(Value)
- }
- end,
+ fun preproc_data/1,
DataList
).
@@ -258,12 +276,13 @@ convert_float(Str) when is_binary(Str) ->
convert_float(undefined) ->
null.
-make_iotdb_insert_request(Message, State) ->
+make_iotdb_insert_request(MessageUnparsedPayload, State) ->
+ Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
IsAligned = maps:get(is_aligned, State, false),
DeviceId = device_id(Message, State),
- IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X),
+ IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
Payload = make_list(maps:get(payload, Message)),
- PreProcessedData = preproc_data(Payload),
+ PreProcessedData = preproc_data_list(Payload),
DataList = proc_data(PreProcessedData, Message),
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
@@ -330,15 +349,15 @@ insert_value(1, Data, [Value | Values]) ->
insert_value(Index, Data, [Value | Values]) ->
[[null | Value] | insert_value(Index - 1, Data, Values)].
-iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
+iotdb_field_key(is_aligned, ?VSN_1_X) ->
<<"is_aligned">>;
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
<<"isAligned">>;
-iotdb_field_key(device_id, ?VSN_1_0_X) ->
+iotdb_field_key(device_id, ?VSN_1_X) ->
<<"device">>;
iotdb_field_key(device_id, ?VSN_0_13_X) ->
<<"deviceId">>;
-iotdb_field_key(data_types, ?VSN_1_0_X) ->
+iotdb_field_key(data_types, ?VSN_1_X) ->
<<"data_types">>;
iotdb_field_key(data_types, ?VSN_0_13_X) ->
<<"dataTypes">>.
@@ -350,6 +369,8 @@ device_id(Message, State) ->
case maps:get(device_id, State, undefined) of
undefined ->
case maps:get(payload, Message) of
+ #{<<"device_id">> := DeviceId} ->
+ DeviceId;
#{device_id := DeviceId} ->
DeviceId;
_NotFound ->
diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
index 9f2011779..e4f17d76a 100644
--- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
+++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
@@ -258,13 +258,18 @@ query_resource(Config, Request) ->
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
+ query_resource_async(Config, Request, _Opts = #{}).
+
+query_resource_async(Config, Request, Opts) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+ Timeout = maps:get(timeout, Opts, 500),
Return = emqx_resource:query(ResourceID, Request, #{
- timeout => 500, async_reply_fun => {AsyncReplyFun, []}
+ timeout => Timeout,
+ async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
@@ -498,9 +503,9 @@ t_write_timeout(Config) ->
Config,
#{
<<"resource_opts">> => #{
- <<"request_timeout">> => 500,
- <<"resume_interval">> => 100,
- <<"health_check_interval">> => 100
+ <<"auto_restart_interval">> => <<"100ms">>,
+ <<"resume_interval">> => <<"100ms">>,
+ <<"health_check_interval">> => <<"100ms">>
}
}
),
@@ -515,7 +520,7 @@ t_write_timeout(Config) ->
Res1 =
case QueryMode of
async ->
- query_resource_async(Config, {send_message, SentData});
+ query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
sync ->
query_resource(Config, {send_message, SentData})
end,
@@ -526,7 +531,17 @@ t_write_timeout(Config) ->
{_, Ref} when is_reference(Ref) ->
case receive_result(Ref, 15_000) of
{ok, Res} ->
- ?assertMatch({error, {unrecoverable_error, _}}, Res);
+ %% we may receive a successful result depending on
+ %% timing, if the request is retried after the
+ %% failure is healed.
+ case Res of
+ {error, {unrecoverable_error, _}} ->
+ ok;
+ {ok, _} ->
+ ok;
+ _ ->
+ ct:fail("unexpected result: ~p", [Res])
+ end;
timeout ->
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
ct:fail("no response received")
diff --git a/apps/emqx_bridge_pulsar/rebar.config b/apps/emqx_bridge_pulsar/rebar.config
index d5a63f320..c77007b93 100644
--- a/apps/emqx_bridge_pulsar/rebar.config
+++ b/apps/emqx_bridge_pulsar/rebar.config
@@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
- {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}},
+ {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.2"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
index 7d1b20d24..721937cd2 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
@@ -57,6 +57,14 @@ fields(config) ->
sensitive => true,
desc => ?DESC("authentication")
}
+ )},
+ {connect_timeout,
+ mk(
+ emqx_schema:duration_ms(),
+ #{
+ default => <<"5s">>,
+ desc => ?DESC("connect_timeout")
+ }
)}
] ++ emqx_connector_schema_lib:ssl_fields();
fields(producer_opts) ->
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
index 59956e1b6..5906cc57a 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
@@ -48,6 +48,7 @@
memory_overload_protection := boolean()
},
compression := compression_mode(),
+ connect_timeout := emqx_schema:duration_ms(),
max_batch_bytes := emqx_schema:bytesize(),
message := message_template_raw(),
pulsar_topic := binary(),
@@ -80,8 +81,11 @@ on_start(InstanceId, Config) ->
} = Config,
Servers = format_servers(Servers0),
ClientId = make_client_id(InstanceId, BridgeName),
+ ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
+ ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
ClientOpts = #{
+ connect_timeout => ConnectTimeout,
ssl_opts => SSLOpts,
conn_opts => conn_opts(Config)
},
@@ -96,26 +100,46 @@ on_start(InstanceId, Config) ->
}
);
{error, Reason} ->
+ RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1),
?SLOG(error, #{
msg => "failed_to_start_pulsar_client",
instance_id => InstanceId,
pulsar_hosts => Servers,
- reason => emqx_utils:redact(Reason, fun is_sensitive_key/1)
+ reason => RedactedReason
}),
- throw(failed_to_start_pulsar_client)
+ Message =
+ case get_error_message(RedactedReason) of
+ {ok, Msg} -> Msg;
+ error -> failed_to_start_pulsar_client
+ end,
+ throw(Message)
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
-spec on_stop(resource_id(), state()) -> ok.
-on_stop(_InstanceId, State) ->
- #{
- pulsar_client_id := ClientId,
- producers := Producers
- } = State,
- stop_producers(ClientId, Producers),
- stop_client(ClientId),
- ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
- ok.
+on_stop(InstanceId, _State) ->
+ case emqx_resource:get_allocated_resources(InstanceId) of
+ #{pulsar_client_id := ClientId, pulsar_producers := Producers} ->
+ stop_producers(ClientId, Producers),
+ stop_client(ClientId),
+ ?tp(pulsar_bridge_stopped, #{
+ instance_id => InstanceId,
+ pulsar_client_id => ClientId,
+ pulsar_producers => Producers
+ }),
+ ok;
+ #{pulsar_client_id := ClientId} ->
+ stop_client(ClientId),
+ ?tp(pulsar_bridge_stopped, #{
+ instance_id => InstanceId,
+ pulsar_client_id => ClientId,
+ pulsar_producers => undefined
+ }),
+ ok;
+ _ ->
+ ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}),
+ ok
+ end.
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(_InstanceId, State = #{}) ->
@@ -316,6 +340,8 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
{ok, Producers} ->
+ ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers),
+ ?tp(pulsar_producer_producers_allocated, #{}),
State = #{
pulsar_client_id => ClientId,
producers => Producers,
@@ -422,3 +448,19 @@ partition_strategy(Strategy) -> Strategy.
is_sensitive_key(auth_data) -> true;
is_sensitive_key(_) -> false.
+
+get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
+ Iter = maps:iterator(BrokerErrorMap),
+ do_get_error_message(Iter);
+get_error_message(_Error) ->
+ error.
+
+do_get_error_message(Iter) ->
+ case maps:next(Iter) of
+ {{_Broker, _Port}, #{message := Message}, _NIter} ->
+ {ok, Message};
+ {_K, _V, NIter} ->
+ do_get_error_message(NIter);
+ none ->
+ error
+ end.
diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
index 76d9f94e1..3605baaab 100644
--- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
+++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
@@ -43,7 +43,9 @@ only_once_tests() ->
t_send_when_down,
t_send_when_timeout,
t_failure_to_start_producer,
- t_producer_process_crash
+ t_producer_process_crash,
+ t_resource_manager_crash_after_producers_started,
+ t_resource_manager_crash_before_producers_started
].
init_per_suite(Config) ->
@@ -429,7 +431,19 @@ wait_until_producer_connected() ->
wait_until_connected(pulsar_producers_sup, pulsar_producer).
wait_until_connected(SupMod, Mod) ->
- Pids = [
+ Pids = get_pids(SupMod, Mod),
+ ?retry(
+ _Sleep = 300,
+ _Attempts0 = 20,
+ lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
+ ),
+ ok.
+
+get_pulsar_producers() ->
+ get_pids(pulsar_producers_sup, pulsar_producer).
+
+get_pids(SupMod, Mod) ->
+ [
P
|| {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod),
P <- element(2, process_info(SupPid, links)),
@@ -437,13 +451,7 @@ wait_until_connected(SupMod, Mod) ->
{Mod, init, _} -> true;
_ -> false
end
- ],
- ?retry(
- _Sleep = 300,
- _Attempts0 = 20,
- lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
- ),
- ok.
+ ].
create_rule_and_action_http(Config) ->
PulsarName = ?config(pulsar_name, Config),
@@ -528,6 +536,18 @@ start_cluster(Cluster) ->
end),
Nodes.
+kill_resource_managers() ->
+ ct:pal("gonna kill resource managers"),
+ lists:foreach(
+ fun({_, Pid, _, _}) ->
+ ct:pal("terminating resource manager ~p", [Pid]),
+ %% sys:terminate(Pid, stop),
+ exit(Pid, kill),
+ ok
+ end,
+ supervisor:which_children(emqx_resource_manager_sup)
+ ).
+
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@@ -921,7 +941,11 @@ t_producer_process_crash(Config) ->
ok
after 1_000 -> ct:fail("pid didn't die")
end,
- ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)),
+ ?retry(
+ _Sleep0 = 50,
+ _Attempts0 = 50,
+ ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
+ ),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
@@ -952,6 +976,69 @@ t_producer_process_crash(Config) ->
),
ok.
+t_resource_manager_crash_after_producers_started(Config) ->
+ ?check_trace(
+ begin
+ ?force_ordering(
+ #{?snk_kind := pulsar_producer_producers_allocated},
+ #{?snk_kind := will_kill_resource_manager}
+ ),
+ ?force_ordering(
+ #{?snk_kind := resource_manager_killed},
+ #{?snk_kind := pulsar_producer_bridge_started}
+ ),
+ spawn_link(fun() ->
+ ?tp(will_kill_resource_manager, #{}),
+ kill_resource_managers(),
+ ?tp(resource_manager_killed, #{}),
+ ok
+ end),
+ %% even if the resource manager is dead, we can still
+ %% clear the allocated resources.
+ {{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
+ ?wait_async_action(
+ create_bridge(Config),
+ #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when
+ Producers =/= undefined,
+ 10_000
+ ),
+ ok
+ end,
+ []
+ ),
+ ok.
+
+t_resource_manager_crash_before_producers_started(Config) ->
+ ?check_trace(
+ begin
+ ?force_ordering(
+ #{?snk_kind := pulsar_producer_capture_name},
+ #{?snk_kind := will_kill_resource_manager}
+ ),
+ ?force_ordering(
+ #{?snk_kind := resource_manager_killed},
+ #{?snk_kind := pulsar_producer_about_to_start_producers}
+ ),
+ spawn_link(fun() ->
+ ?tp(will_kill_resource_manager, #{}),
+ kill_resource_managers(),
+ ?tp(resource_manager_killed, #{}),
+ ok
+ end),
+ %% even if the resource manager is dead, we can still
+ %% clear the allocated resources.
+ {{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
+ ?wait_async_action(
+ create_bridge(Config),
+ #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
+ 10_000
+ ),
+ ok
+ end,
+ []
+ ),
+ ok.
+
t_cluster(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
index 36f47aaf6..2b572a98c 100644
--- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
+++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"},
- {vsn, "0.1.0"},
+ {vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]},
{env, []},
diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
index 6f833d659..3e809d99c 100644
--- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
+++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
@@ -72,7 +72,7 @@ fields(config) ->
desc => ?DESC("username")
}
)},
- {password, fun emqx_connector_schema_lib:password/1},
+ {password, fun emqx_connector_schema_lib:password_required/1},
{pool_size,
hoconsc:mk(
typerefl:pos_integer(),
diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl
index 9eb159ea2..58edc7236 100644
--- a/apps/emqx_conf/src/emqx_conf_schema.erl
+++ b/apps/emqx_conf/src/emqx_conf_schema.erl
@@ -523,6 +523,7 @@ fields("node") ->
desc => ?DESC(node_crash_dump_file),
default => crash_dump_file_default(),
importance => ?IMPORTANCE_HIDDEN,
+ converter => fun ensure_unicode_path/2,
'readOnly' => true
}
)},
@@ -770,6 +771,7 @@ fields("rpc") ->
file(),
#{
mapping => "gen_rpc.certfile",
+ converter => fun ensure_unicode_path/2,
desc => ?DESC(rpc_certfile)
}
)},
@@ -778,6 +780,7 @@ fields("rpc") ->
file(),
#{
mapping => "gen_rpc.keyfile",
+ converter => fun ensure_unicode_path/2,
desc => ?DESC(rpc_keyfile)
}
)},
@@ -786,6 +789,7 @@ fields("rpc") ->
file(),
#{
mapping => "gen_rpc.cacertfile",
+ converter => fun ensure_unicode_path/2,
desc => ?DESC(rpc_cacertfile)
}
)},
@@ -912,10 +916,11 @@ fields("log_file_handler") ->
#{
desc => ?DESC("log_file_handler_file"),
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
- converter => fun emqx_schema:naive_env_interpolation/1,
- validator => fun validate_file_location/1,
aliases => [file],
- importance => ?IMPORTANCE_HIGH
+ importance => ?IMPORTANCE_HIGH,
+ converter => fun(Path, Opts) ->
+ emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts))
+ end
}
)},
{"rotation_count",
@@ -1333,11 +1338,6 @@ emqx_schema_high_prio_roots() ->
)},
lists:keyreplace("authorization", 1, Roots, Authz).
-validate_file_location(File) ->
- ValidFile = "^[/\\_a-zA-Z0-9\\.\\-]*$",
- Error = "Invalid file name: " ++ ValidFile,
- validator_string_re(File, ValidFile, Error).
-
validate_time_offset(Offset) ->
ValidTimeOffset = "^([\\-\\+][0-1][0-9]:[0-6][0-9]|system|utc)$",
Error =
@@ -1371,3 +1371,20 @@ ensure_file_handlers(Conf, _Opts) ->
convert_rotation(undefined, _Opts) -> undefined;
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
+
+ensure_unicode_path(undefined, _) ->
+ undefined;
+ensure_unicode_path(Path, #{make_serializable := true}) ->
+ %% format back to serializable string
+ unicode:characters_to_binary(Path, utf8);
+ensure_unicode_path(Path, Opts) when is_binary(Path) ->
+ case unicode:characters_to_list(Path, utf8) of
+ {R, _, _} when R =:= error orelse R =:= incomplete ->
+ throw({"bad_file_path_string", Path});
+ PathStr ->
+ ensure_unicode_path(PathStr, Opts)
+ end;
+ensure_unicode_path(Path, _) when is_list(Path) ->
+ Path;
+ensure_unicode_path(Path, _) ->
+ throw({"not_string", Path}).
diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl
index b59a5f819..32c66fb90 100644
--- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl
+++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl
@@ -438,3 +438,63 @@ ensure_acl_conf() ->
true -> ok;
false -> file:write_file(File, <<"">>)
end.
+
+log_path_test_() ->
+ Fh = fun(Path) ->
+ #{<<"log">> => #{<<"file_handlers">> => #{<<"name1">> => #{<<"file">> => Path}}}}
+ end,
+ Assert = fun(Name, Path, Conf) ->
+ ?assertMatch(#{log := #{file := #{Name := #{to := Path}}}}, Conf)
+ end,
+
+ [
+ {"default-values", fun() -> Assert(default, "log/emqx.log", check(#{})) end},
+ {"file path with space", fun() -> Assert(name1, "a /b", check(Fh(<<"a /b">>))) end},
+ {"windows", fun() -> Assert(name1, "c:\\a\\ b\\", check(Fh(<<"c:\\a\\ b\\">>))) end},
+ {"unicoded", fun() -> Assert(name1, "路 径", check(Fh(<<"路 径"/utf8>>))) end},
+ {"bad utf8", fun() ->
+ ?assertThrow(
+ {emqx_conf_schema, [
+ #{
+ kind := validation_error,
+ mismatches :=
+ #{
+ "handler_name" :=
+ #{
+ kind := validation_error,
+ path := "log.file.name1.to",
+ reason := {"bad_file_path_string", _}
+ }
+ }
+ }
+ ]},
+ check(Fh(<<239, 32, 132, 47, 117, 116, 102, 56>>))
+ )
+ end},
+ {"not string", fun() ->
+ ?assertThrow(
+ {emqx_conf_schema, [
+ #{
+ kind := validation_error,
+ mismatches :=
+ #{
+ "handler_name" :=
+ #{
+ kind := validation_error,
+ path := "log.file.name1.to",
+ reason := {"not_string", _}
+ }
+ }
+ }
+ ]},
+ check(Fh(#{<<"foo">> => <<"bar">>}))
+ )
+ end}
+ ].
+
+check(Config) ->
+ Schema = emqx_conf_schema,
+ {_, Conf} = hocon_tconf:map(Schema, Config, [log], #{
+ atom_key => false, required => false, format => map
+ }),
+ emqx_utils_maps:unsafe_atom_key_map(Conf).
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 6ecfd0b59..9426e16f2 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -406,7 +406,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
ok ->
- {connected, State};
+ connected;
+ {error, still_connecting} ->
+ connecting;
{error, Reason} ->
{disconnected, State, Reason}
end.
@@ -428,7 +430,8 @@ do_get_status(PoolName, Timeout) ->
end
end,
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
- % we crash in case of non-empty lists since we don't know what to do in that case
+ [] ->
+ {error, still_connecting};
[_ | _] = Results ->
case [E || {error, _} = E <- Results] of
[] ->
diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
index f64208311..a277fe8c8 100644
--- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
@@ -30,6 +30,7 @@
database/1,
username/1,
password/1,
+ password_required/1,
auto_reconnect/1
]).
@@ -104,6 +105,14 @@ password(sensitive) -> true;
password(converter) -> fun emqx_schema:password_converter/2;
password(_) -> undefined.
+password_required(type) -> binary();
+password_required(desc) -> ?DESC("password");
+password_required(required) -> true;
+password_required(format) -> <<"password">>;
+password_required(sensitive) -> true;
+password_required(converter) -> fun emqx_schema:password_converter/2;
+password_required(_) -> undefined.
+
auto_reconnect(type) -> boolean();
auto_reconnect(desc) -> ?DESC("auto_reconnect");
auto_reconnect(default) -> true;
diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl
index aec811e5d..364853eec 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard.erl
@@ -192,7 +192,9 @@ ranch_opts(Options) ->
RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
proto_opts(#{proxy_header := ProxyHeader}) ->
- #{proxy_header => ProxyHeader}.
+ #{proxy_header => ProxyHeader};
+proto_opts(_Opts) ->
+ #{}.
filter_false(_K, false, S) -> S;
filter_false(K, V, S) -> [{K, V} | S].
diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
index 239d9052e..7e6cf5b95 100644
--- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
+++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
@@ -1,6 +1,6 @@
{application, emqx_eviction_agent, [
{description, "EMQX Eviction Agent"},
- {vsn, "5.0.0"},
+ {vsn, "5.0.1"},
{registered, [
emqx_eviction_agent_sup,
emqx_eviction_agent,
diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl
index a6097f03d..1369ee969 100644
--- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl
+++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl
@@ -218,10 +218,10 @@ cancel_expiry_timer(_) ->
set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
case maps:get(expiry_interval, ConnInfo) of
- ?UINT_MAX ->
+ ?EXPIRE_INTERVAL_INFINITE ->
{ok, Channel};
I when I > 0 ->
- Timer = erlang:send_after(timer:seconds(I), self(), expire_session),
+ Timer = erlang:send_after(I, self(), expire_session),
{ok, Channel#{expiry_timer => Timer}};
_ ->
{error, should_be_expired}
diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
index a68a1f292..883407a94 100644
--- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
+++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
@@ -177,7 +177,7 @@ t_explicit_session_takeover(Config) ->
?assert(false, "Connection not evicted")
end
end,
- #{?snk_kind := emqx_cm_connected_client_count_dec, chan_pid := ChanPid},
+ #{?snk_kind := emqx_cm_connected_client_count_dec_done, chan_pid := ChanPid},
2000
),
@@ -383,7 +383,7 @@ t_ws_conn(_Config) ->
?assertWaitEvent(
ok = emqx_eviction_agent:evict_connections(1),
- #{?snk_kind := emqx_cm_connected_client_count_dec},
+ #{?snk_kind := emqx_cm_connected_client_count_dec_done},
1000
),
@@ -418,7 +418,7 @@ t_quic_conn(_Config) ->
?assertWaitEvent(
ok = emqx_eviction_agent:evict_connections(1),
- #{?snk_kind := emqx_cm_connected_client_count_dec},
+ #{?snk_kind := emqx_cm_connected_client_count_dec_done},
1000
),
diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
index 3b7ef6672..4ace80893 100644
--- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
+++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
@@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_channel.hrl").
-define(CLIENT_ID, <<"client_with_session">>).
@@ -101,7 +102,7 @@ t_start_infinite_expire(_Config) ->
conninfo => #{
clientid => ?CLIENT_ID,
receive_maximum => 32,
- expiry_interval => ?UINT_MAX
+ expiry_interval => ?EXPIRE_INTERVAL_INFINITE
}
},
?assertMatch(
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl
index 6738d6fef..702bc35ce 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl
@@ -422,7 +422,7 @@ decode_cursor(Cursor) ->
true = is_list(Name),
{Node, #{transfer => {ClientId, FileId}, name => Name}}
catch
- error:{_, invalid_json} ->
+ error:{Loc, JsonError} when is_integer(Loc), is_atom(JsonError) ->
error({badarg, cursor});
error:{badmatch, _} ->
error({badarg, cursor});
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
index abb774f82..40944c0e8 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
@@ -167,7 +167,7 @@ parse_filepath(PathBin) ->
throw({invalid, PathBin})
end,
PathComponents = filename:split(PathBin),
- case lists:any(fun is_special_component/1, PathComponents) of
+ case PathComponents == [] orelse lists:any(fun is_special_component/1, PathComponents) of
false ->
filename:join(PathComponents);
true ->
diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl
index 7d64f9716..e582db01f 100644
--- a/apps/emqx_ft/test/emqx_ft_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl
@@ -47,6 +47,7 @@ groups() ->
t_invalid_topic_format,
t_meta_conflict,
t_nasty_clientids_fileids,
+ t_nasty_filenames,
t_no_meta,
t_no_segment,
t_simple_transfer
@@ -205,10 +206,6 @@ t_invalid_filename(Config) ->
encode_meta(meta(lists:duplicate(1000, $A), <<>>)),
1
)
- ),
- ?assertRCName(
- success,
- emqtt:publish(C, mk_init_topic(<<"f5">>), encode_meta(meta("146%", <<>>)), 1)
).
t_simple_transfer(Config) ->
@@ -265,6 +262,22 @@ t_nasty_clientids_fileids(_Config) ->
Transfers
).
+t_nasty_filenames(_Config) ->
+ Filenames = [
+ {<<"nasty1">>, "146%"},
+ {<<"nasty2">>, "🌚"},
+ {<<"nasty3">>, "中文.txt"}
+ ],
+ ok = lists:foreach(
+ fun({ClientId, Filename}) ->
+ FileId = unicode:characters_to_binary(Filename),
+ ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId),
+ [Export] = list_files(ClientId),
+ ?assertEqual({ok, FileId}, read_export(Export))
+ end,
+ Filenames
+ ).
+
t_meta_conflict(Config) ->
C = ?config(client, Config),
diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
index f69e13a6d..18a8e9841 100644
--- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
@@ -140,10 +140,7 @@ t_download_transfer(Config) ->
request(
get,
uri(["file_transfer", "file"]) ++
- query(#{
- fileref => FileId,
- node => <<"nonode@nohost">>
- })
+ query(#{fileref => FileId, node => <<"nonode@nohost">>})
)
),
@@ -152,10 +149,25 @@ t_download_transfer(Config) ->
request(
get,
uri(["file_transfer", "file"]) ++
- query(#{
- fileref => <<"unknown_file">>,
- node => node()
- })
+ query(#{fileref => <<"unknown_file">>, node => node()})
+ )
+ ),
+
+ ?assertMatch(
+ {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
+ request_json(
+ get,
+ uri(["file_transfer", "file"]) ++
+ query(#{fileref => <<>>, node => node()})
+ )
+ ),
+
+ ?assertMatch(
+ {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
+ request_json(
+ get,
+ uri(["file_transfer", "file"]) ++
+ query(#{fileref => <<"/etc/passwd">>, node => node()})
)
),
@@ -204,6 +216,16 @@ t_list_files_paging(Config) ->
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}))
),
+ ?assertMatch(
+ {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
+ request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}))
+ ),
+
+ ?assertMatch(
+ {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
+ request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}))
+ ),
+
?assertMatch(
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request_json(
diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl
index 6b0e1f622..d451261ff 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl
@@ -139,9 +139,9 @@ lookup(#{topic := Topic}) ->
%%%==============================================================================================
%% internal
generate_topic(Params = #{<<"topic">> := Topic}) ->
- Params#{<<"topic">> => uri_string:percent_decode(Topic)};
+ Params#{<<"topic">> => Topic};
generate_topic(Params = #{topic := Topic}) ->
- Params#{topic => uri_string:percent_decode(Topic)};
+ Params#{topic => Topic};
generate_topic(Params) ->
Params.
diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl
index 659ae0d44..e617c6dcb 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl
@@ -92,4 +92,35 @@ t_nodes_api(Config) ->
#{<<"topic">> := Topic, <<"node">> := Node2}
] = emqx_utils_json:decode(RouteResponse, [return_maps]),
- ?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])).
+ ?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])),
+
+ ok = emqtt:stop(Client).
+
+t_percent_topics(_Config) ->
+ Node = atom_to_binary(node(), utf8),
+ Topic = <<"test_%%1">>,
+ {ok, Client} = emqtt:start_link(#{
+ username => <<"routes_username">>, clientid => <<"routes_cid">>
+ }),
+ {ok, _} = emqtt:connect(Client),
+ {ok, _, _} = emqtt:subscribe(Client, Topic),
+
+ %% exact match with percent encoded topic
+ Path = emqx_mgmt_api_test_util:api_path(["topics"]),
+ QS = uri_string:compose_query([
+ {"topic", Topic},
+ {"node", atom_to_list(node())}
+ ]),
+ Headers = emqx_mgmt_api_test_util:auth_header_(),
+ {ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
+ MatchData = emqx_utils_json:decode(MatchResponse, [return_maps]),
+ ?assertMatch(
+ #{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
+ maps:get(<<"meta">>, MatchData)
+ ),
+ ?assertMatch(
+ [#{<<"topic">> := Topic, <<"node">> := Node}],
+ maps:get(<<"data">>, MatchData)
+ ),
+
+ ok = emqtt:stop(Client).
diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl
index 7f3ac580d..ce3ee73a9 100644
--- a/apps/emqx_resource/include/emqx_resource.hrl
+++ b/apps/emqx_resource/include/emqx_resource.hrl
@@ -121,3 +121,5 @@
-define(TEST_ID_PREFIX, "_probe_:").
-define(RES_METRICS, resource_metrics).
+
+-define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations).
diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl
index 80f270b13..10f1de6c4 100644
--- a/apps/emqx_resource/src/emqx_resource.erl
+++ b/apps/emqx_resource/src/emqx_resource.erl
@@ -79,7 +79,13 @@
query/2,
query/3,
%% query the instance without batching and queuing messages.
- simple_sync_query/2
+ simple_sync_query/2,
+ %% functions used by connectors to register resources that must be
+ %% freed when stopping or even when a resource manager crashes.
+ allocate_resource/3,
+ has_allocated_resources/1,
+ get_allocated_resources/1,
+ forget_allocated_resources/1
]).
%% Direct calls to the callback module
@@ -372,6 +378,9 @@ is_buffer_supported(Module) ->
{ok, resource_state()} | {error, Reason :: term()}.
call_start(ResId, Mod, Config) ->
try
+ %% If the previous manager process crashed without cleaning up
+ %% allocated resources, clean them up.
+ clean_allocated_resources(ResId, Mod),
Mod:on_start(ResId, Config)
catch
throw:Error ->
@@ -390,7 +399,16 @@ call_health_check(ResId, Mod, ResourceState) ->
-spec call_stop(resource_id(), module(), resource_state()) -> term().
call_stop(ResId, Mod, ResourceState) ->
- ?SAFE_CALL(Mod:on_stop(ResId, ResourceState)).
+ ?SAFE_CALL(begin
+ Res = Mod:on_stop(ResId, ResourceState),
+ case Res of
+ ok ->
+ emqx_resource:forget_allocated_resources(ResId);
+ _ ->
+ ok
+ end,
+ Res
+ end).
-spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}.
@@ -486,7 +504,37 @@ apply_reply_fun({F, A}, Result) when is_function(F) ->
apply_reply_fun(From, Result) ->
gen_server:reply(From, Result).
+-spec allocate_resource(resource_id(), any(), term()) -> ok.
+allocate_resource(InstanceId, Key, Value) ->
+ true = ets:insert(?RESOURCE_ALLOCATION_TAB, {InstanceId, Key, Value}),
+ ok.
+
+-spec has_allocated_resources(resource_id()) -> boolean().
+has_allocated_resources(InstanceId) ->
+ ets:member(?RESOURCE_ALLOCATION_TAB, InstanceId).
+
+-spec get_allocated_resources(resource_id()) -> map().
+get_allocated_resources(InstanceId) ->
+ Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId),
+ maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]).
+
+-spec forget_allocated_resources(resource_id()) -> ok.
+forget_allocated_resources(InstanceId) ->
+ true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId),
+ ok.
+
%% =================================================================================
filter_instances(Filter) ->
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
+
+clean_allocated_resources(ResourceId, ResourceMod) ->
+ case emqx_resource:has_allocated_resources(ResourceId) of
+ true ->
+ %% The resource entries in the ETS table are erased inside
+ %% `call_stop' if the call is successful.
+ ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined),
+ ok;
+ false ->
+ ok
+ end.
diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl
index 97ac355f4..7a54bfa97 100644
--- a/apps/emqx_resource/src/emqx_resource_manager.erl
+++ b/apps/emqx_resource/src/emqx_resource_manager.erl
@@ -500,8 +500,10 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
%% We don't care the return value of the Mod:on_stop/2.
%% The callback mod should make sure the resource is stopped after on_stop/2
%% is returned.
- case ResState /= undefined of
+ HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),
+ case ResState =/= undefined orelse HasAllocatedResources of
true ->
+ %% we clear the allocated resources after stop is successful
emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState);
false ->
ok
diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl
index 73f1988c6..9e86e6363 100644
--- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl
+++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl
@@ -17,6 +17,8 @@
-behaviour(supervisor).
+-include("emqx_resource.hrl").
+
-export([ensure_child/5, delete_child/1]).
-export([start_link/0]).
@@ -36,6 +38,12 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
+ %% Maps resource_id() to one or more allocated resources.
+ emqx_utils_ets:new(?RESOURCE_ALLOCATION_TAB, [
+ bag,
+ public,
+ {read_concurrency, true}
+ ]),
ChildSpecs = [
#{
id => emqx_resource_manager,
diff --git a/apps/emqx_resource/src/emqx_resource_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl
index 913b29c86..ea2240efd 100644
--- a/apps/emqx_resource/src/emqx_resource_pool.erl
+++ b/apps/emqx_resource/src/emqx_resource_pool.erl
@@ -25,7 +25,12 @@
-include_lib("emqx/include/logger.hrl").
+-ifndef(TEST).
-define(HEALTH_CHECK_TIMEOUT, 15000).
+-else.
+%% make tests faster
+-define(HEALTH_CHECK_TIMEOUT, 1000).
+-endif.
start(Name, Mod, Options) ->
case ecpool:start_sup_pool(Name, Mod, Options) of
diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl
index 47017f718..73e2f78e7 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl
@@ -1115,7 +1115,11 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
'$handle_undefined_function'(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
- emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs);
+ %% encode outputs iolists, but when the rule actions process those
+ %% it might wrongly encode them as JSON lists, so we force them to
+ %% binaries here.
+ IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs),
+ iolist_to_binary(IOList);
'$handle_undefined_function'(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}});
'$handle_undefined_function'(sprintf, [Format | Args]) ->
diff --git a/build b/build
index ec9e6c816..cceaf3860 100755
--- a/build
+++ b/build
@@ -91,19 +91,28 @@ log() {
echo "===< $msg"
}
+prepare_erl_libs() {
+ local libs_dir="$1"
+ local erl_libs="${ERL_LIBS:-}"
+ local sep
+ if [ "${SYSTEM}" = 'windows' ]; then
+ sep=';'
+ else
+ sep=':'
+ fi
+ for app in "${libs_dir}"/*; do
+ if [ -d "${app}/ebin" ]; then
+ if [ -n "$erl_libs" ]; then
+ erl_libs="${erl_libs}${sep}${app}"
+ else
+ erl_libs="${app}"
+ fi
+ fi
+ done
+ export ERL_LIBS="$erl_libs"
+}
+
make_docs() {
- local libs_dir1 libs_dir2 libs_dir3 docdir
- libs_dir1="$("$FIND" "_build/$PROFILE/lib/" -maxdepth 2 -name ebin -type d)"
- if [ -d "_build/default/lib/" ]; then
- libs_dir2="$("$FIND" "_build/default/lib/" -maxdepth 2 -name ebin -type d)"
- else
- libs_dir2=''
- fi
- if [ -d "_build/$PROFILE/checkouts" ]; then
- libs_dir3="$("$FIND" "_build/$PROFILE/checkouts/" -maxdepth 2 -name ebin -type d 2>/dev/null || true)"
- else
- libs_dir3=''
- fi
case "$(is_enterprise "$PROFILE")" in
'yes')
SCHEMA_MODULE='emqx_enterprise_schema'
@@ -112,10 +121,12 @@ make_docs() {
SCHEMA_MODULE='emqx_conf_schema'
;;
esac
- docdir="_build/docgen/$PROFILE"
+ prepare_erl_libs "_build/$PROFILE/checkouts"
+ prepare_erl_libs "_build/$PROFILE/lib"
+ local docdir="_build/docgen/$PROFILE"
mkdir -p "$docdir"
# shellcheck disable=SC2086
- erl -noshell -pa $libs_dir1 $libs_dir2 $libs_dir3 -eval \
+ erl -noshell -eval \
"ok = emqx_conf:dump_schema('$docdir', $SCHEMA_MODULE), \
halt(0)."
}
diff --git a/changes/ce/fix-10801.en.md b/changes/ce/fix-10801.en.md
new file mode 100644
index 000000000..4c36bd528
--- /dev/null
+++ b/changes/ce/fix-10801.en.md
@@ -0,0 +1 @@
+Avoid duplicated percent decode the topic name in API `/topics/{topic}` and `/topics`.
diff --git a/changes/ce/fix-10809.en.md b/changes/ce/fix-10809.en.md
new file mode 100644
index 000000000..e7e15e5ca
--- /dev/null
+++ b/changes/ce/fix-10809.en.md
@@ -0,0 +1,2 @@
+Address `** ERROR ** Mnesia post_commit hook failed: error:badarg` error messages happening during node shutdown or restart.
+Mria pull request: https://github.com/emqx/mria/pull/142
diff --git a/changes/ee/feat-10778.en.md b/changes/ee/feat-10778.en.md
new file mode 100644
index 000000000..3084d2959
--- /dev/null
+++ b/changes/ee/feat-10778.en.md
@@ -0,0 +1 @@
+Refactored Pulsar Producer bridge to avoid leaking resources during crashes.
diff --git a/changes/ee/fix-10807.en.md b/changes/ee/fix-10807.en.md
new file mode 100644
index 000000000..8cd5da0c8
--- /dev/null
+++ b/changes/ee/fix-10807.en.md
@@ -0,0 +1 @@
+Removed license check debug logs.
diff --git a/dev b/dev
index 8cf07cfaf..5087cc30f 100755
--- a/dev
+++ b/dev
@@ -58,6 +58,7 @@ fi
export HOCON_ENV_OVERRIDE_PREFIX='EMQX_'
export EMQX_LOG__FILE__DEFAULT__ENABLE='false'
export EMQX_LOG__CONSOLE__ENABLE='true'
+SYSTEM="$(./scripts/get-distro.sh)"
EMQX_NODE_NAME="${EMQX_NODE_NAME:-emqx@127.0.0.1}"
PROFILE="${PROFILE:-emqx}"
FORCE_COMPILE=0
@@ -157,14 +158,24 @@ fi
prepare_erl_libs() {
local profile="$1"
local libs_dir="_build/${profile}/lib"
- local erl_libs=''
+ local erl_libs="${ERL_LIBS:-}"
+ local sep
+ if [ "${SYSTEM}" = 'windows' ]; then
+ sep=';'
+ else
+ sep=':'
+ fi
if [ $FORCE_COMPILE -eq 1 ] || [ ! -d "$libs_dir" ]; then
make "compile-${PROFILE}"
else
echo "Running from code in $libs_dir"
fi
for app in "${libs_dir}"/*; do
- erl_libs="${erl_libs}:${app}"
+ if [ -n "$erl_libs" ]; then
+ erl_libs="${erl_libs}${sep}${app}"
+ else
+ erl_libs="${app}"
+ fi
done
export ERL_LIBS="$erl_libs"
}
diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl
index d17c159c3..99c4fa155 100644
--- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl
+++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl
@@ -82,8 +82,12 @@ make_trace_fn_action() ->
#{function => Fn, args => #{}}.
create_rule_http(RuleParams) ->
+ create_rule_http(RuleParams, _Overrides = #{}).
+
+create_rule_http(RuleParams, Overrides) ->
RepublishTopic = <<"republish/schema_registry">>,
emqx:subscribe(RepublishTopic),
+ PayloadTemplate = maps:get(payload_template, Overrides, <<>>),
DefaultParams = #{
enable => true,
actions => [
@@ -93,7 +97,7 @@ create_rule_http(RuleParams) ->
<<"args">> =>
#{
<<"topic">> => RepublishTopic,
- <<"payload">> => <<>>,
+ <<"payload">> => PayloadTemplate,
<<"qos">> => 0,
<<"retain">> => false,
<<"user_properties">> => <<>>
@@ -177,10 +181,12 @@ test_params_for(avro, encode1) ->
"from t\n"
>>,
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+ PayloadTemplate = <<"${.encoded}">>,
ExtraArgs = [],
#{
sql => SQL,
payload => Payload,
+ payload_template => PayloadTemplate,
extra_args => ExtraArgs
};
test_params_for(avro, decode1) ->
@@ -251,10 +257,12 @@ test_params_for(protobuf, encode1) ->
"from t\n"
>>,
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+ PayloadTemplate = <<"${.encoded}">>,
ExtraArgs = [<<"Person">>],
#{
sql => SQL,
payload => Payload,
+ payload_template => PayloadTemplate,
extra_args => ExtraArgs
};
test_params_for(protobuf, union1) ->
@@ -487,17 +495,18 @@ t_encode(Config) ->
#{
sql := SQL,
payload := Payload,
+ payload_template := PayloadTemplate,
extra_args := ExtraArgs
} = test_params_for(SerdeType, encode1),
- {ok, _} = create_rule_http(#{sql => SQL}),
+ {ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}),
PayloadBin = emqx_utils_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Published = receive_published(?LINE),
?assertMatch(
- #{payload := #{<<"encoded">> := _}},
+ #{payload := P} when is_binary(P),
Published
),
- #{payload := #{<<"encoded">> := Encoded}} = Published,
+ #{payload := Encoded} = Published,
{ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
ok.
diff --git a/lib-ee/emqx_license/src/emqx_license.app.src b/lib-ee/emqx_license/src/emqx_license.app.src
index fcdcbc05b..354385faf 100644
--- a/lib-ee/emqx_license/src/emqx_license.app.src
+++ b/lib-ee/emqx_license/src/emqx_license.app.src
@@ -1,6 +1,6 @@
{application, emqx_license, [
{description, "EMQX License"},
- {vsn, "5.0.9"},
+ {vsn, "5.0.10"},
{modules, []},
{registered, [emqx_license_sup]},
{applications, [kernel, stdlib, emqx_ctl]},
diff --git a/lib-ee/emqx_license/src/emqx_license.erl b/lib-ee/emqx_license/src/emqx_license.erl
index ef285b937..3e29dcf25 100644
--- a/lib-ee/emqx_license/src/emqx_license.erl
+++ b/lib-ee/emqx_license/src/emqx_license.erl
@@ -1,6 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
+
-module(emqx_license).
-include("emqx_license.hrl").
diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl
index 2ebb96004..da777ff84 100644
--- a/lib-ee/emqx_license/src/emqx_license_checker.erl
+++ b/lib-ee/emqx_license/src/emqx_license_checker.erl
@@ -117,7 +117,7 @@ handle_cast(_Msg, State) ->
handle_info(check_license, #{license := License} = State) ->
#{} = check_license(License),
NewState = ensure_check_license_timer(State),
- ?tp(debug, emqx_license_checked, #{}),
+ ?tp(emqx_license_checked, #{}),
{noreply, NewState};
handle_info(check_expiry_alarm, #{license := License} = State) ->
ok = expiry_early_alarm(License),
diff --git a/lib-ee/emqx_license/src/emqx_license_installer.erl b/lib-ee/emqx_license/src/emqx_license_installer.erl
deleted file mode 100644
index 58ee6ebcc..000000000
--- a/lib-ee/emqx_license/src/emqx_license_installer.erl
+++ /dev/null
@@ -1,86 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
--module(emqx_license_installer).
-
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
--behaviour(gen_server).
-
--export([
- start_link/1,
- start_link/4
-]).
-
-%% gen_server callbacks
--export([
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2
-]).
-
--define(NAME, emqx).
--define(INTERVAL, 5000).
-
-%%------------------------------------------------------------------------------
-%% API
-%%------------------------------------------------------------------------------
-
-start_link(Callback) ->
- start_link(?NAME, ?MODULE, ?INTERVAL, Callback).
-
-start_link(Name, ServerName, Interval, Callback) ->
- gen_server:start_link({local, ServerName}, ?MODULE, [Name, Interval, Callback], []).
-
-%%------------------------------------------------------------------------------
-%% gen_server callbacks
-%%------------------------------------------------------------------------------
-
-init([Name, Interval, Callback]) ->
- Pid = whereis(Name),
- State = #{
- interval => Interval,
- name => Name,
- pid => Pid,
- callback => Callback
- },
- {ok, ensure_timer(State)}.
-
-handle_call(_Req, _From, State) ->
- {reply, unknown, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({timeout, Timer, check_pid}, #{timer := Timer} = State) ->
- NewState = check_pid(State),
- {noreply, ensure_timer(NewState)};
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-%%------------------------------------------------------------------------------
-%% Private functions
-%%------------------------------------------------------------------------------
-
-ensure_timer(#{interval := Interval} = State) ->
- _ =
- case State of
- #{timer := Timer} -> erlang:cancel_timer(Timer);
- _ -> ok
- end,
- State#{timer => erlang:start_timer(Interval, self(), check_pid)}.
-
-check_pid(#{name := Name, pid := OldPid, callback := Callback} = State) ->
- case whereis(Name) of
- undefined ->
- ?tp(debug, emqx_license_installer_noproc, #{old_pid => OldPid}),
- State;
- OldPid ->
- ?tp(debug, emqx_license_installer_nochange, #{old_pid => OldPid}),
- State;
- NewPid ->
- _ = Callback(),
- ?tp(debug, emqx_license_installer_called, #{old_pid => OldPid}),
- State#{pid => NewPid}
- end.
diff --git a/lib-ee/emqx_license/src/emqx_license_resources.erl b/lib-ee/emqx_license/src/emqx_license_resources.erl
index 2cc62b8a3..9a63e5e06 100644
--- a/lib-ee/emqx_license/src/emqx_license_resources.erl
+++ b/lib-ee/emqx_license/src/emqx_license_resources.erl
@@ -76,7 +76,7 @@ handle_cast(_Msg, State) ->
handle_info(update_resources, State) ->
true = update_resources(),
connection_quota_early_alarm(),
- ?tp(debug, emqx_license_resources_updated, #{}),
+ ?tp(emqx_license_resources_updated, #{}),
{noreply, ensure_timer(State)}.
terminate(_Reason, _State) ->
diff --git a/lib-ee/emqx_license/src/emqx_license_sup.erl b/lib-ee/emqx_license/src/emqx_license_sup.erl
index 6b8f73953..304fac313 100644
--- a/lib-ee/emqx_license/src/emqx_license_sup.erl
+++ b/lib-ee/emqx_license/src/emqx_license_sup.erl
@@ -41,15 +41,6 @@ init([]) ->
shutdown => 5000,
type => worker,
modules => [emqx_license_resources]
- },
-
- #{
- id => license_installer,
- start => {emqx_license_installer, start_link, [fun emqx_license:load/0]},
- restart => permanent,
- shutdown => 5000,
- type => worker,
- modules => [emqx_license_installer]
}
]
}}.
diff --git a/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl b/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl
deleted file mode 100644
index 5d5e27489..000000000
--- a/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl
+++ /dev/null
@@ -1,89 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
-
--module(emqx_license_installer_SUITE).
-
--compile(nowarn_export_all).
--compile(export_all).
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("common_test/include/ct.hrl").
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
-all() ->
- emqx_common_test_helpers:all(?MODULE).
-
-init_per_suite(Config) ->
- _ = application:load(emqx_conf),
- emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1),
- Config.
-
-end_per_suite(_) ->
- emqx_common_test_helpers:stop_apps([emqx_license]),
- ok.
-
-init_per_testcase(_Case, Config) ->
- {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
- Config.
-
-end_per_testcase(_Case, _Config) ->
- ok.
-
-set_special_configs(emqx_license) ->
- Config = #{key => emqx_license_test_lib:default_license()},
- emqx_config:put([license], Config);
-set_special_configs(_) ->
- ok.
-
-%%------------------------------------------------------------------------------
-%% Tests
-%%------------------------------------------------------------------------------
-
-t_update(_Config) ->
- ?check_trace(
- begin
- ?wait_async_action(
- begin
- Pid0 = spawn_link(fun() ->
- receive
- exit -> ok
- end
- end),
- register(installer_test, Pid0),
-
- {ok, _} = emqx_license_installer:start_link(
- installer_test,
- ?MODULE,
- 10,
- fun() -> ok end
- ),
-
- {ok, _} = ?block_until(
- #{?snk_kind := emqx_license_installer_nochange},
- 100
- ),
-
- Pid0 ! exit,
-
- {ok, _} = ?block_until(
- #{?snk_kind := emqx_license_installer_noproc},
- 100
- ),
-
- Pid1 = spawn_link(fun() -> timer:sleep(100) end),
- register(installer_test, Pid1)
- end,
- #{?snk_kind := emqx_license_installer_called},
- 1000
- )
- end,
- fun(Trace) ->
- ?assertMatch([_ | _], ?of_kind(emqx_license_installer_called, Trace))
- end
- ).
-
-t_unknown_calls(_Config) ->
- ok = gen_server:cast(emqx_license_installer, some_cast),
- some_msg = erlang:send(emqx_license_installer, some_msg),
- ?assertEqual(unknown, gen_server:call(emqx_license_installer, some_request)).
diff --git a/mix.exs b/mix.exs
index df58ae73a..bb73853a7 100644
--- a/mix.exs
+++ b/mix.exs
@@ -49,16 +49,16 @@ defmodule EMQXUmbrella.MixProject do
{:redbug, "2.0.8"},
{:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
- {:ehttpc, github: "emqx/ehttpc", tag: "0.4.8", override: true},
+ {:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true},
{:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true},
- {:ekka, github: "emqx/ekka", tag: "0.15.1", override: true},
+ {:ekka, github: "emqx/ekka", tag: "0.15.2", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
- {:minirest, github: "emqx/minirest", tag: "1.3.9", override: true},
+ {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true},
{:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true},
{:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
diff --git a/rebar.config b/rebar.config
index 4b4838f61..faa7d88e1 100644
--- a/rebar.config
+++ b/rebar.config
@@ -56,16 +56,16 @@
, {gpb, "4.19.7"}
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
- , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.8"}}}
+ , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}}
, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}}
- , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}}
+ , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
- , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.9"}}}
+ , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}}
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon
index 92294bb75..d1f5c8b13 100644
--- a/rel/i18n/emqx_bridge_pulsar.hocon
+++ b/rel/i18n/emqx_bridge_pulsar.hocon
@@ -67,6 +67,11 @@ emqx_bridge_pulsar {
label = "Enable or Disable"
}
+ connect_timeout {
+ desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
+ label = "Connect Timeout"
+ }
+
desc_name {
desc = "Bridge name, used as a human-readable description of the bridge."
label = "Bridge Name"
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index ad63b4ba9..54d866014 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -213,9 +213,6 @@ pending connections can grow to."""
fields_tcp_opts_backlog.label:
"""TCP backlog length"""
-broker_route_batch_clean.desc:
-"""Enable batch clean for deleted routes."""
-
fields_mqtt_quic_listener_initial_window_packets.desc:
"""The size (in packets) of the initial congestion window for a connection. Default: 10"""
diff --git a/rel/i18n/zh/emqx_bridge_pulsar.hocon b/rel/i18n/zh/emqx_bridge_pulsar.hocon
index 23643060b..4e2fd5c9f 100644
--- a/rel/i18n/zh/emqx_bridge_pulsar.hocon
+++ b/rel/i18n/zh/emqx_bridge_pulsar.hocon
@@ -20,6 +20,11 @@ emqx_bridge_pulsar {
label = "启用或停用"
}
+ connect_timeout {
+ desc = "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
+ label = "连接超时时间"
+ }
+
servers {
desc = "以逗号分隔的 scheme://host[:port]
格式的 Pulsar URL 列表,"
"支持的 scheme 有 pulsar://
(默认)"
diff --git a/rel/i18n/zh/emqx_schema.hocon b/rel/i18n/zh/emqx_schema.hocon
index 835372868..4c8e1f81e 100644
--- a/rel/i18n/zh/emqx_schema.hocon
+++ b/rel/i18n/zh/emqx_schema.hocon
@@ -208,9 +208,6 @@ fields_tcp_opts_backlog.desc:
fields_tcp_opts_backlog.label:
"""TCP 连接队列长度"""
-broker_route_batch_clean.desc:
-"""是否开启批量清除路由。"""
-
fields_mqtt_quic_listener_initial_window_packets.desc:
"""一个连接的初始拥堵窗口的大小(以包为单位)。默认值:10"""
@@ -846,10 +843,10 @@ sysmon_vm_long_schedule.desc:
sysmon_vm_long_schedule.label:
"""启用长调度监控"""
-mqtt_keepalive_backoff.desc:
+mqtt_keepalive_multiplier.desc:
"""EMQX 判定客户端保活超时使用的阈值系数。计算公式为:Keep Alive * Backoff * 2"""
-mqtt_keepalive_backoff.label:
+mqtt_keepalive_multiplier.label:
"""保活超时阈值系数"""
force_gc_bytes.desc: