From 2aad6671ac46ba1e4ae2ce7e668f9264ba3648fb Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 9 Jun 2023 14:41:21 +0300 Subject: [PATCH 01/34] fix(emqx_mgmt_data_backup): avoid writing temporary Mnesia backup file As we use `keep_tables` option, we don't need to modify a backup schema before importing the backup file. With this option, `mnesia:restore/2` ignores backup schema and keeps current table schema unchanged. --- .../src/emqx_mgmt_data_backup.erl | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 5e59bd057..bdb9cf666 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -382,15 +382,17 @@ import_mnesia_tab(BackupDir, TabName, Opts) -> end. restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) -> - BackupNameToImport = MnesiaBackupFileName ++ "_for_import", - Prepared = + Validated = catch mnesia:traverse_backup( - MnesiaBackupFileName, BackupNameToImport, fun backup_converter/2, 0 + MnesiaBackupFileName, mnesia_backup, dummy, read_only, fun validate_mnesia_backup/2, 0 ), try - case Prepared of + case Validated of {ok, _} -> - Restored = mnesia:restore(BackupNameToImport, [{default_op, keep_tables}]), + %% As we use keep_tables option, we don't need to modify 'copies' (nodes) + %% in a backup file before restoring it, as `mnsia:restore/2` will ignore + %% backed-up schema and keep the current table schema unchanged + Restored = mnesia:restore(MnesiaBackupFileName, [{default_op, keep_tables}]), case Restored of {atomic, [TabName]} -> ok; @@ -416,30 +418,23 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) -> end after %% Cleanup files as soon as they are not needed any more for more efficient disk usage - _ = file:delete(BackupNameToImport), _ = file:delete(MnesiaBackupFileName) end. -backup_converter({schema, Tab, CreateList}, Acc) -> - check_rec_attributes(Tab, CreateList), - {[{schema, Tab, lists:map(fun convert_copies/1, CreateList)}], Acc}; -backup_converter(Other, Acc) -> - {[Other], Acc}. - -check_rec_attributes(Tab, CreateList) -> +%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. +%% Looks like there is no clean way to abort traversal without triggering any error reporting, +%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... +validate_mnesia_backup({schema, Tab, CreateList} = Schema, Acc) -> ImportAttributes = proplists:get_value(attributes, CreateList), Attributes = mnesia:table_info(Tab, attributes), case ImportAttributes =/= Attributes of true -> throw({error, different_table_schema}); false -> - ok - end. - -convert_copies({K, [_ | _]}) when K == ram_copies; K == disc_copies; K == disc_only_copies -> - {K, [node()]}; -convert_copies(Other) -> - Other. + {[Schema], Acc} + end; +validate_mnesia_backup(Other, Acc) -> + {[Other], Acc}. extract_backup(BackupFileName) -> BackupDir = root_backup_dir(), From 947d99218b34aebcf362670e04da8848787a31a6 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 12 Jun 2023 16:32:24 +0800 Subject: [PATCH 02/34] chore: upgrade emqtt to avoid sensitive data leakage in the debug log --- apps/emqx/rebar.config | 2 +- apps/emqx_retainer/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 9dedf3644..840f2cf0b 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -44,7 +44,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}} ]}, {extra_src_dirs, [{"test", [recursive]}]} ]} diff --git a/apps/emqx_retainer/rebar.config b/apps/emqx_retainer/rebar.config index a178e10a1..ab4b8ed37 100644 --- a/apps/emqx_retainer/rebar.config +++ b/apps/emqx_retainer/rebar.config @@ -30,7 +30,7 @@ {profiles, [ {test, [ {deps, [ - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}} ]} ]} ]}. diff --git a/mix.exs b/mix.exs index 455f2e6a9..e53f0c224 100644 --- a/mix.exs +++ b/mix.exs @@ -64,7 +64,7 @@ defmodule EMQXUmbrella.MixProject do {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer {:emqtt, - github: "emqx/emqtt", tag: "1.8.5", override: true, system_env: maybe_no_quic_env()}, + github: "emqx/emqtt", tag: "1.8.6", override: true, system_env: maybe_no_quic_env()}, {:rulesql, github: "emqx/rulesql", tag: "0.1.6"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, diff --git a/rebar.config b/rebar.config index d0e9570f8..f6830f83b 100644 --- a/rebar.config +++ b/rebar.config @@ -69,7 +69,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.6"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} From eff98ceed805a4e64e2024d7c8fb6a7d024e5631 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 12 Jun 2023 16:42:46 +0800 Subject: [PATCH 03/34] chore: update changes --- changes/ce/perf-11020.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/perf-11020.en.md diff --git a/changes/ce/perf-11020.en.md b/changes/ce/perf-11020.en.md new file mode 100644 index 000000000..2df22aebe --- /dev/null +++ b/changes/ce/perf-11020.en.md @@ -0,0 +1 @@ +Upgraded emqtt dependency to avoid sensitive data leakage in the debug log. From cf31b650764d7d779423fda0bc6f70126e4932e0 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 12 Jun 2023 17:20:52 +0200 Subject: [PATCH 04/34] fix: rule engine different behavior for div and mod Previously, the div operation could only be used as an infix operation while mod could only be used as a function call. After this commit, one can use both div and mod using function call syntax and infix syntax. Fixes: https://emqx.atlassian.net/browse/EMQX-10216 --- apps/emqx_rule_engine/include/rule_engine.hrl | 3 +- .../test/emqx_rule_engine_SUITE.erl | 31 +++++++++++++++++++ mix.exs | 2 +- rebar.config | 2 +- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index e3fef7e62..b2a6a549e 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -70,7 +70,8 @@ Op =:= '-' orelse Op =:= '*' orelse Op =:= '/' orelse - Op =:= 'div') + Op =:= 'div' orelse + Op =:= 'mod') ). %% Compare operators diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 2ec32173f..c9feda601 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -66,6 +66,7 @@ groups() -> t_sqlselect_with_3rd_party_impl2, t_sqlselect_with_3rd_party_funcs_unknown, t_sqlselect_001, + t_sqlselect_002, t_sqlselect_inject_props, t_sqlselect_01, t_sqlselect_02, @@ -1089,6 +1090,36 @@ t_sqlselect_001(_Config) -> ) ). +t_sqlselect_002(_Config) -> + %% Verify that the div and mod can be used both as infix operations and as + %% function calls + Sql = + "" + "select 2 mod 2 as mod1,\n" + " mod(3, 2) as mod2,\n" + " 4 div 2 as div1,\n" + " div(7, 2) as div2\n" + " from \"t/#\" " + "", + ?assertMatch( + {ok, #{ + <<"mod1">> := 0, + <<"mod2">> := 1, + <<"div1">> := 2, + <<"div2">> := 3 + }}, + emqx_rule_sqltester:test( + #{ + sql => Sql, + context => + #{ + payload => #{<<"what">> => 4}, + topic => <<"t/a">> + } + } + ) + ). + t_sqlselect_inject_props(_Config) -> SQL = "SELECT json_decode(payload) as p, payload, " diff --git a/mix.exs b/mix.exs index e53f0c224..3fc807c78 100644 --- a/mix.exs +++ b/mix.exs @@ -65,7 +65,7 @@ defmodule EMQXUmbrella.MixProject do # maybe forbid to fetch quicer {:emqtt, github: "emqx/emqtt", tag: "1.8.6", override: true, system_env: maybe_no_quic_env()}, - {:rulesql, github: "emqx/rulesql", tag: "0.1.6"}, + {:rulesql, github: "emqx/rulesql", tag: "0.1.7"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, {:telemetry, "1.1.0"}, diff --git a/rebar.config b/rebar.config index f6830f83b..713cbce40 100644 --- a/rebar.config +++ b/rebar.config @@ -70,7 +70,7 @@ , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}} - , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.6"}}} + , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.7"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} From 256adeb580d219cf899b6ab4b8bc2a43f6d6ece6 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 12 Jun 2023 17:33:54 +0200 Subject: [PATCH 05/34] docs: add changelog entry --- changes/ce/fix-11026.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-11026.en.md diff --git a/changes/ce/fix-11026.en.md b/changes/ce/fix-11026.en.md new file mode 100644 index 000000000..d07157b5f --- /dev/null +++ b/changes/ce/fix-11026.en.md @@ -0,0 +1 @@ +Addressed an inconsistency in the usage of 'div' and 'mod' operations within the rule engine. Previously, the 'div' operation was only usable as an infix operation and 'mod' could only be applied through a function call. With this change, both 'div' and 'mod' can be used via function call syntax and infix syntax. From 536f7ba423d3ceadad44eaf7ca344c9c8bcb1865 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 13 Jun 2023 16:06:28 +0800 Subject: [PATCH 06/34] chore: bump ecql for obfuscate sensitive data to avoid leakage --- apps/emqx_bridge_cassandra/rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_cassandra/rebar.config b/apps/emqx_bridge_cassandra/rebar.config index b8bfc7dd6..f1cc275d9 100644 --- a/apps/emqx_bridge_cassandra/rebar.config +++ b/apps/emqx_bridge_cassandra/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.1"}}} +{deps, [ {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.2"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} From 415424654b9202fc5ff10a55d7a1c4cd5c3722b4 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 13 Jun 2023 16:13:40 +0800 Subject: [PATCH 07/34] chore: update changes --- changes/ee/perf-11035.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/perf-11035.en.md diff --git a/changes/ee/perf-11035.en.md b/changes/ee/perf-11035.en.md new file mode 100644 index 000000000..4422b01bb --- /dev/null +++ b/changes/ee/perf-11035.en.md @@ -0,0 +1 @@ +Bump Cassandra driver to avoid sensitive data leakages. From b2a5065641df07fedf4ef97b7f3ea7714d8aa9a8 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 13 Jun 2023 15:56:45 +0200 Subject: [PATCH 08/34] fix(emqx_connector): report errors in on_start handler --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 25 +++++- .../test/emqx_bridge_webhook_SUITE.erl | 78 +++++++++++++------ .../src/emqx_connector_http.erl | 29 ++++++- .../test/emqx_connector_http_tests.erl | 2 + 4 files changed, 106 insertions(+), 28 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 025451988..d5fddaea8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -124,10 +124,13 @@ create_bridge_api(Config) -> create_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + create_bridge_api(BridgeType, BridgeName, BridgeConfig). + +create_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -164,6 +167,24 @@ update_bridge_api(Config, Overrides) -> ct:pal("bridge update result: ~p", [Res]), Res. +op_bridge_api(Op, BridgeType, BridgeName) -> + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of + {ok, {Status, Headers, Body}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + {error, {Status, Headers, Body}} -> + {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge op result: ~p", [Res]), + Res. + probe_bridge_api(Config) -> probe_bridge_api(Config, _Overrides = #{}). diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 4fc76fc9e..a1ff465c9 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -28,6 +28,9 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(BRIDGE_TYPE, <<"webhook">>). +-define(BRIDGE_NAME, atom_to_binary(?MODULE)). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -36,15 +39,13 @@ groups() -> init_per_suite(_Config) -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), []. end_per_suite(_Config) -> - ok = emqx_config:put([bridges], #{}), - ok = emqx_config:put_raw([bridges], #{}), - ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), _ = application:stop(emqx_bridge), @@ -53,10 +54,22 @@ end_per_suite(_Config) -> suite() -> [{timetrap, {seconds, 60}}]. +init_per_testcase(t_bad_bridge_config, Config) -> + Config; +init_per_testcase(t_send_async_connection_timeout, Config) -> + ResponseDelayMS = 500, + Server = start_http_server(#{response_delay_ms => ResponseDelayMS}), + [{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config]; init_per_testcase(_TestCase, Config) -> - Config. + Server = start_http_server(#{response_delay_ms => 0}), + [{http_server, Server} | Config]. -end_per_testcase(_TestCase, _Config) -> +end_per_testcase(_TestCase, Config) -> + case ?config(http_server, Config) of + undefined -> ok; + Server -> stop_http_server(Server) + end, + emqx_bridge_testlib:delete_all_bridges(), emqx_common_test_helpers:call_janitor(), ok. @@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) -> %% (Orginally copied from emqx_bridge_api_SUITE) %%------------------------------------------------------------------------------ start_http_server(HTTPServerConfig) -> - ct:pal("Start server\n"), process_flag(trap_exit, true), Parent = self(), + ct:pal("Starting server for ~p", [Parent]), {ok, {Port, Sock}} = listen_on_random_port(), Acceptor = spawn(fun() -> accept_loop(Sock, Parent, HTTPServerConfig) end), + ct:pal("Started server on port ~p", [Port]), timer:sleep(100), #{port => Port, sock => Sock, acceptor => Acceptor}. @@ -160,8 +174,8 @@ parse_http_request_assertive(ReqStr0) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% bridge_async_config(#{port := Port} = Config) -> - Type = maps:get(type, Config, <<"webhook">>), - Name = maps:get(name, Config, atom_to_binary(?MODULE)), + Type = maps:get(type, Config, ?BRIDGE_TYPE), + Name = maps:get(name, Config, ?BRIDGE_NAME), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, 1), @@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) -> RetConfig. make_bridge(Config) -> - Type = <<"webhook">>, - Name = atom_to_binary(?MODULE), + Type = ?BRIDGE_TYPE, + Name = ?BRIDGE_NAME, BridgeConfig = bridge_async_config(Config#{ name => Name, type => Type @@ -236,16 +250,15 @@ make_bridge(Config) -> %% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. %% When the connection time out all the queued requests where dropped in -t_send_async_connection_timeout(_Config) -> - ResponseDelayMS = 90, - #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), - % Port = 9000, +t_send_async_connection_timeout(Config) -> + ResponseDelayMS = ?config(response_delay_ms, Config), + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "async", - connect_timeout => ResponseDelayMS * 2, - request_timeout => 10000, + connect_timeout => 10_000, + request_timeout => ResponseDelayMS * 2, resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, @@ -257,11 +270,10 @@ t_send_async_connection_timeout(_Config) -> ct:pal("Sent messages\n"), MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), receive_request_notifications(MessageIDs, ResponseDelayMS), - stop_http_server(Server), ok. -t_async_free_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_free_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, @@ -285,8 +297,8 @@ t_async_free_retries(_Config) -> do_t_async_retries(Context, {error, {shutdown, normal}}, Fn), ok. -t_async_common_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_common_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, @@ -323,6 +335,28 @@ t_async_common_retries(_Config) -> do_t_async_retries(Context, {error, something_else}, FnFail), ok. +t_bad_bridge_config(_Config) -> + BridgeConfig = bridge_async_config(#{port => 12345}), + ?assertMatch( + {ok, + {{_, 201, _}, _Headers, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Connection refused">> + }}}, + emqx_bridge_testlib:create_bridge_api( + ?BRIDGE_TYPE, + ?BRIDGE_NAME, + BridgeConfig + ) + ), + %% try `/start` bridge + ?assertMatch( + {error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}}, + emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME) + ), + ok. + +%% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 149704f76..8e836aaee 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -219,10 +219,31 @@ on_start( base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case ehttpc_sup:start_pool(InstId, PoolOpts) of - {ok, _} -> {ok, State}; - {error, {already_started, _}} -> {ok, State}; - {error, Reason} -> {error, Reason} + case start_pool(InstId, PoolOpts) of + ok -> + case do_get_status(InstId, ConnectTimeout) of + ok -> + {ok, State}; + Error -> + ok = ehttpc_sup:stop_pool(InstId), + Error + end; + Error -> + Error + end. + +start_pool(PoolName, PoolOpts) -> + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> + ok; + {error, {already_started, _}} -> + ?SLOG(warning, #{ + msg => "emqx_connector_on_start_already_started", + pool_name => PoolName + }), + ok; + Error -> + Error end. on_stop(InstId, _State) -> diff --git a/apps/emqx_connector/test/emqx_connector_http_tests.erl b/apps/emqx_connector/test/emqx_connector_http_tests.erl index 2dc2119f7..c5f6dfe78 100644 --- a/apps/emqx_connector/test/emqx_connector_http_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_http_tests.erl @@ -24,6 +24,8 @@ wrap_auth_headers_test_() -> fun() -> meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), + meck:expect(ehttpc, workers, 1, [{self, self()}]), + meck:expect(ehttpc, health_check, 2, ok), meck:expect(ehttpc_pool, pick_worker, 1, self()), meck:expect(emqx_resource, allocate_resource, 3, ok), [ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource] From 0d6d441f4ccd6181374711032ca01a119627a979 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 14 Jun 2023 09:56:50 +0200 Subject: [PATCH 09/34] test(emqx_connector): start/stop test for webhook bridge --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 111 +++++++++++++----- .../test/emqx_bridge_webhook_SUITE.erl | 11 ++ .../src/emqx_connector_http.erl | 7 +- 3 files changed, 99 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index d5fddaea8..62ba70b33 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -105,19 +105,19 @@ parse_and_check(Config, ConfigString, Name) -> resource_id(Config) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - emqx_bridge_resource:resource_id(BridgeType, Name). + BridgeName = ?config(bridge_name, Config), + emqx_bridge_resource:resource_id(BridgeType, BridgeName). create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). create_bridge(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), ct:pal("creating bridge with config: ~p", [BridgeConfig]), - emqx_bridge:create(BridgeType, Name, BridgeConfig). + emqx_bridge:create(BridgeType, BridgeName, BridgeConfig). create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). @@ -175,6 +175,8 @@ op_bridge_api(Op, BridgeType, BridgeName) -> ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), Res = case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of + {ok, {Status = {_, 204, _}, Headers, Body}} -> + {ok, {Status, Headers, Body}}; {ok, {Status, Headers, Body}} -> {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; {error, {Status, Headers, Body}} -> @@ -188,11 +190,15 @@ op_bridge_api(Op, BridgeType, BridgeName) -> probe_bridge_api(Config) -> probe_bridge_api(Config, _Overrides = #{}). -probe_bridge_api(Config, _Overrides) -> +probe_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - BridgeConfig = ?config(bridge_config, Config), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + BridgeName = ?config(bridge_name, Config), + BridgeConfig0 = ?config(bridge_config, Config), + BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + probe_bridge_api(BridgeType, BridgeName, BridgeConfig). + +probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -310,10 +316,34 @@ t_create_via_http(Config) -> t_start_stop(Config, StopTracePoint) -> BridgeType = ?config(bridge_type, Config), BridgeName = ?config(bridge_name, Config), - ResourceId = resource_id(Config), + BridgeConfig = ?config(bridge_config, Config), + t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint). + +t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ?check_trace( begin - ?assertMatch({ok, _}, create_bridge(Config)), + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes0 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ProbeRes1 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -322,24 +352,48 @@ t_start_stop(Config, StopTracePoint) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - %% Check that the bridge probe API doesn't leak atoms. - ProbeRes0 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} - ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), - AtomsBefore = erlang:system_info(atom_count), - %% Probe again; shouldn't have created more atoms. - ProbeRes1 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + %% `start` bridge to trigger `already_started` + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), - AtomsAfter = erlang:system_info(atom_count), - ?assertEqual(AtomsBefore, AtomsAfter), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), - %% Now stop the bridge. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName), + #{?snk_kind := StopTracePoint}, + 5_000 + ) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) + ), + + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Disable the bridge, which will also stop it. ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( @@ -352,8 +406,11 @@ t_start_stop(Config, StopTracePoint) -> ok end, fun(Trace) -> - %% one for each probe, one for real - ?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)), + %% one for each probe, two for real + ?assertMatch( + [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}], + ?of_kind(StopTracePoint, Trace) + ), ok end ), diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index a1ff465c9..93eab438e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -356,6 +356,17 @@ t_bad_bridge_config(_Config) -> ), ok. +t_start_stop(Config) -> + #{port := Port} = ?config(http_server, Config), + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + port => Port + }), + emqx_bridge_testlib:t_start_stop( + ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped + ). + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 8e836aaee..ce8a1a1a5 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -16,11 +16,10 @@ -module(emqx_connector_http). --include("emqx_connector.hrl"). - -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). @@ -251,7 +250,9 @@ on_stop(InstId, _State) -> msg => "stopping_http_connector", connector => InstId }), - ehttpc_sup:stop_pool(InstId). + Res = ehttpc_sup:stop_pool(InstId), + ?tp(emqx_connector_http_stopped, #{instance_id => InstId}), + Res. on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of From 748e54d507373a7973259b32faa1d35d7d1cb6fb Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 14 Jun 2023 10:13:29 +0200 Subject: [PATCH 10/34] chore: add changelog --- changes/ce/fix-11037.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-11037.en.md diff --git a/changes/ce/fix-11037.en.md b/changes/ce/fix-11037.en.md new file mode 100644 index 000000000..39b2dc4a6 --- /dev/null +++ b/changes/ce/fix-11037.en.md @@ -0,0 +1 @@ +When starting an HTTP connector EMQX now returns a descriptive error in case the system is unable to connect to the remote target system. From 52354bf58a0865ac5c0299dbfe4a9494a9a2e894 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 14 Jun 2023 22:10:48 +0300 Subject: [PATCH 11/34] fix(ft-fs): use `emqx:running_nodes()` as default cluster view When iterating over complete exports cluster-wide. --- apps/emqx_ft/src/emqx_ft_assembler.erl | 2 +- apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index c96df224c..0d9e86a49 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -96,7 +96,7 @@ handle_event( complete -> {next_state, start_assembling, NSt, ?internal([])}; {incomplete, _} -> - Nodes = mria_mnesia:running_nodes() -- [node()], + Nodes = emqx:running_nodes() -- [node()], {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}; % TODO: recovery? {error, _} = Error -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 589949bda..e37ba25af 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -361,7 +361,7 @@ list(_Options, Query) -> end. list(QueryIn) -> - {Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(mria_mnesia:running_nodes())), + {Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(emqx:running_nodes())), list_nodes(NodeQuery, Nodes, #{items => []}). list_nodes(Query, Nodes = [Node | Rest], Acc) -> From 2ee1317ca0b6fa41afe412055b0e0976616256c2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 14 Jun 2023 16:55:12 -0300 Subject: [PATCH 12/34] ci(dev_script): fix patches vm.args path Fixes the path to the patches directory so it works with `quickrun`. Before fix: ``` iex(emqx@127.0.0.1)3> :init.get_arguments() [ # ... pa: ['"_build/dev-run/emqx/data/patches"'], # ... ] ``` After fix: ``` iex(emqx@127.0.0.1)1> :init.get_arguments() [ # ... pa: ['_build/dev-run/emqx/data/patches'], # ... ] ``` --- dev | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev b/dev index 6a20c4a2d..553cb8a9c 100755 --- a/dev +++ b/dev @@ -290,7 +290,7 @@ append_args_file() { +IOt 4 +SDio 8 -shutdown_time 30000 --pa '"$EMQX_DATA_DIR/patches"' +-pa '$EMQX_DATA_DIR/patches' -mnesia dump_log_write_threshold 5000 -mnesia dump_log_time_threshold 60000 -os_mon start_disksup false From acd6e5635b6b8d8ea396a922d2e972338c8a020d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 14 Jun 2023 22:14:28 +0300 Subject: [PATCH 13/34] fix(ct): adapt cluster setup helper to replicant nodes Before this change, trying to start cluster consisting of cores + replicants resulted in error joining replicant node to the cluster. --- apps/emqx/test/emqx_common_test_helpers.erl | 45 ++++++++++++------- .../test/emqx_bridge_api_SUITE.erl | 3 +- apps/emqx_conf/test/emqx_conf_app_SUITE.erl | 1 - .../test/emqx_mgmt_api_listeners_SUITE.erl | 1 - .../test/emqx_mgmt_api_nodes_SUITE.erl | 1 - .../test/emqx_mgmt_data_backup_SUITE.erl | 1 - 6 files changed, 30 insertions(+), 22 deletions(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 1cfc10f74..8f9d02401 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -652,10 +652,13 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) -> %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2 env_handler => fun((AppName :: atom()) -> term()), %% Application env preset before calling `emqx_common_test_helpers:start_apps/2` - env => {AppName :: atom(), Key :: atom(), Val :: term()}, + env => [{AppName :: atom(), Key :: atom(), Val :: term()}], %% Whether to execute `emqx_config:init_load(SchemaMod)` %% default: true load_schema => boolean(), + %% Which node in the cluster to join to. + %% default: first core node + join_to => node(), %% If we want to exercise the scenario where a node joins an %% existing cluster where there has already been some %% configuration changes (via cluster rpc), then we need to enable @@ -690,28 +693,38 @@ emqx_cluster(Specs0, CommonOpts) -> ]), %% Set the default node of the cluster: CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], - JoinTo0 = + JoinTo = case CoreNodes of [First | _] -> First; _ -> undefined end, - JoinTo = - case maps:find(join_to, CommonOpts) of - {ok, true} -> JoinTo0; - {ok, JT} -> JT; - error -> JoinTo0 - end, - [ - {Name, - merge_opts(Opts, #{ - base_port => base_port(Number), + NodeOpts = fun(Number) -> + #{ + base_port => base_port(Number), + env => [ + {mria, core_nodes, CoreNodes}, + {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} + ] + } + end, + RoleOpts = fun + (core) -> + #{ join_to => JoinTo, env => [ - {mria, core_nodes, CoreNodes}, - {mria, node_role, Role}, - {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} + {mria, node_role, core} ] - })} + }; + (replicant) -> + #{ + env => [ + {mria, node_role, replicant}, + {ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}} + ] + } + end, + [ + {Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)} || {{Role, Name, Opts}, Number} <- Specs ]. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 6f5c669a0..a98b53032 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -140,7 +140,7 @@ mk_cluster_specs(Config, Opts) -> {core, emqx_bridge_api_SUITE1, #{}}, {core, emqx_bridge_api_SUITE2, #{}} ], - CommonOpts = #{ + CommonOpts = Opts#{ env => [{emqx, boot_modules, [broker]}], apps => [], % NOTE @@ -157,7 +157,6 @@ mk_cluster_specs(Config, Opts) -> load_apps => ?SUITE_APPS ++ [emqx_dashboard], env_handler => fun load_suite_config/1, load_schema => false, - join_to => maps:get(join_to, Opts, true), priv_data_dir => ?config(priv_dir, Config) }, emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts). diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl index 2a9888451..34bf5c702 100644 --- a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -247,7 +247,6 @@ cluster(Specs, Config) -> {env, Env}, {apps, [emqx_conf]}, {load_schema, false}, - {join_to, true}, {priv_data_dir, PrivDataDir}, {env_handler, fun (emqx) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index b038863a8..76f50c748 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -284,7 +284,6 @@ cluster(Specs) -> {env, Env}, {apps, [emqx_conf]}, {load_schema, false}, - {join_to, true}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, []), diff --git a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl index efabe32c8..2fc51737a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl @@ -159,7 +159,6 @@ cluster(Specs) -> {env, Env}, {apps, [emqx_conf, emqx_management]}, {load_schema, false}, - {join_to, true}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, []), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index 9df6d2138..0325ab030 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -444,7 +444,6 @@ cluster(Config) -> env => [{mria, db_backend, rlog}], load_schema => true, start_autocluster => true, - join_to => true, listener_ports => [], conf => [{[dashboard, listeners, http, bind], 0}], env_handler => From 6dc281345cd5296f3fa89302f2484348295aff62 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 14 Jun 2023 22:32:52 +0300 Subject: [PATCH 14/34] test(ft-fs): run API tests with replicant nodes --- apps/emqx_ft/test/emqx_ft_api_SUITE.erl | 47 ++++++++++++++-------- apps/emqx_ft/test/emqx_ft_test_helpers.erl | 2 +- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 18a8e9841..d2d65750f 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -24,6 +24,8 @@ -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]). +-define(SUITE_APPS, [emqx_conf, emqx_ft]). + all() -> [ {group, single}, @@ -49,10 +51,9 @@ end_per_suite(_Config) -> init_per_group(Group = cluster, Config) -> Cluster = mk_cluster_specs(Config), ct:pal("Starting ~p", [Cluster]), - Nodes = [ - emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()}) - || {Name, Opts} <- Cluster - ], + Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster], + InitResult = erpc:multicall(Nodes, fun() -> init_node(Config) end), + [] = [{Node, Error} || {Node, {R, Error}} <- lists:zip(Nodes, InitResult), R /= ok], [{group, Group}, {cluster_nodes, Nodes} | Config]; init_per_group(Group, Config) -> [{group, Group} | Config]. @@ -65,22 +66,29 @@ end_per_group(cluster, Config) -> end_per_group(_Group, _Config) -> ok. -mk_cluster_specs(Config) -> +mk_cluster_specs(_Config) -> Specs = [ {core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}}, - {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}} - ], - CommOpts = [ - {env, [{emqx, boot_modules, [broker, listeners]}]}, - {apps, [emqx_ft]}, - {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, - {env_handler, emqx_ft_test_helpers:env_handler(Config)} + {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}}, + {replicant, emqx_ft_api_SUITE3, #{listener_ports => [{tcp, 4883}]}} ], + CommOpts = #{ + env => [ + {mria, db_backend, rlog}, + {emqx, boot_modules, [broker, listeners]} + ], + apps => [], + load_apps => ?SUITE_APPS, + conf => [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]] + }, emqx_common_test_helpers:emqx_cluster( Specs, CommOpts ). +init_node(Config) -> + ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, emqx_ft_test_helpers:env_handler(Config)). + init_per_testcase(Case, Config) -> [{tc, Case} | Config]. end_per_testcase(t_ft_disabled, _Config) -> @@ -96,7 +104,7 @@ t_list_files(Config) -> ClientId = client_id(Config), FileId = <<"f1">>, - Node = lists:last(cluster(Config)), + Node = lists:last(test_nodes(Config)), ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), {ok, 200, #{<<"files">> := Files}} = @@ -124,7 +132,7 @@ t_download_transfer(Config) -> ClientId = client_id(Config), FileId = <<"f1">>, - Node = lists:last(cluster(Config)), + Node = lists:last(test_nodes(Config)), ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), ?assertMatch( @@ -184,7 +192,7 @@ t_download_transfer(Config) -> t_list_files_paging(Config) -> ClientId = client_id(Config), NFiles = 20, - Nodes = cluster(Config), + Nodes = test_nodes(Config), Uploads = [ {mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)} || N <- lists:seq(1, NFiles) @@ -280,8 +288,13 @@ t_ft_disabled(_Config) -> %% Helpers %%-------------------------------------------------------------------- -cluster(Config) -> - [node() | proplists:get_value(cluster_nodes, Config, [])]. +test_nodes(Config) -> + case proplists:get_value(cluster_nodes, Config, []) of + [] -> + [node()]; + Nodes -> + Nodes + end. client_id(Config) -> iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])). diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index a041dcd50..1b952bdd7 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -36,7 +36,7 @@ start_additional_node(Config, Name) -> ). stop_additional_node(Node) -> - ok = rpc:call(Node, ekka, leave, []), + _ = rpc:call(Node, ekka, leave, []), ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]), ok = emqx_common_test_helpers:stop_slave(Node), ok. From 8c7c0877fd53de1ee2e571d575108689437b5fc5 Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Fri, 9 Jun 2023 15:29:34 -0300 Subject: [PATCH 15/34] fix(rewrite): avoid wildcards on destination topic on publish Fixes https://emqx.atlassian.net/browse/EMQX-9247 --- apps/emqx_modules/src/emqx_modules_schema.erl | 32 +++++++++++++ .../test/emqx_rewrite_api_SUITE.erl | 46 +++++++++++++++++++ changes/ce/fix-11004.en.md | 1 + 3 files changed, 79 insertions(+) create mode 100644 changes/ce/fix-11004.en.md diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 5eb8ca148..891e0a076 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -36,6 +36,7 @@ roots() -> array("rewrite", #{ desc => "List of topic rewrite rules.", importance => ?IMPORTANCE_HIDDEN, + validator => fun rewrite_validator/1, default => [] }), array("topic_metrics", #{ @@ -45,6 +46,37 @@ roots() -> }) ]. +rewrite_validator(Rules) -> + case + lists:foldl( + fun + (#{<<"action">> := subscribe}, Acc) -> + Acc; + (#{<<"dest_topic">> := DestTopic}, InvalidAcc) -> + try + true = emqx_topic:validate(name, DestTopic), + InvalidAcc + catch + _:_ -> + [DestTopic | InvalidAcc] + end + end, + [], + Rules + ) + of + [] -> + ok; + InvalidTopics -> + { + error, + #{ + msg => "cannot_use_wildcard_for_destination_topic", + invalid_topics => InvalidTopics + } + } + end. + fields("delayed") -> [ {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, diff --git a/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl index 528102d9e..6c65d351b 100644 --- a/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl @@ -95,6 +95,52 @@ t_mqtt_topic_rewrite_limit(_) -> ) ). +t_mqtt_topic_rewrite_wildcard(_) -> + BadRules = [ + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/test/#">> + }, + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/#/test">> + }, + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/test/+">> + }, + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/+/test">> + } + ], + + Rules = lists:flatten( + lists:map( + fun(Rule) -> + [Rule#{<<"action">> => <<"publish">>}, Rule#{<<"action">> => <<"all">>}] + end, + BadRules + ) + ), + lists:foreach( + fun(Rule) -> + ?assertMatch( + {ok, 500, _}, + request( + put, + uri(["mqtt", "topic_rewrite"]), + [Rule] + ) + ) + end, + Rules + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-11004.en.md b/changes/ce/fix-11004.en.md new file mode 100644 index 000000000..3c6b580d7 --- /dev/null +++ b/changes/ce/fix-11004.en.md @@ -0,0 +1 @@ +Do not allow wildcards for destination topic in rewrite rules. From 64bbe21209b82b631f7fe09bf21af7c628327ada Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 14 Jun 2023 18:52:55 +0300 Subject: [PATCH 16/34] fix(emqx_schema): use non negative integer type for 'depth' SSL option Closes: EMQX-10276 --- apps/emqx/src/emqx_schema.erl | 2 +- apps/emqx/test/emqx_schema_tests.erl | 8 ++++++++ changes/ce/fix-11051.en.md | 1 + 3 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 changes/ce/fix-11051.en.md diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 70dcaf840..67834839d 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2044,7 +2044,7 @@ common_ssl_opts_schema(Defaults, Type) -> )}, {"depth", sc( - integer(), + non_neg_integer(), #{ default => Df("depth", 10), desc => ?DESC(common_ssl_opts_schema_depth) diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index a6e72cd27..446c9e586 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -106,6 +106,14 @@ ssl_opts_version_gap_test_() -> || S <- [Sc, RanchSc] ]. +ssl_opts_cert_depth_test() -> + Sc = emqx_schema:server_ssl_opts_schema(#{}, false), + Reason = #{expected_type => "non_neg_integer()"}, + ?assertThrow( + {_Sc, [#{kind := validation_error, reason := Reason}]}, + validate(Sc, #{<<"depth">> => -1}) + ). + bad_cipher_test() -> Sc = emqx_schema:server_ssl_opts_schema(#{}, false), Reason = {bad_ciphers, ["foo"]}, diff --git a/changes/ce/fix-11051.en.md b/changes/ce/fix-11051.en.md new file mode 100644 index 000000000..d782be226 --- /dev/null +++ b/changes/ce/fix-11051.en.md @@ -0,0 +1 @@ +Add validation to ensure that certificate 'depth' (listener SSL option) is a non negative integer. From 7dade3a52cbfdea62d2bab75a2cb85d80b93e417 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 15 Jun 2023 09:37:01 +0800 Subject: [PATCH 17/34] fix: tls_certfile_gc notice log don't print abspath --- apps/emqx/src/emqx_tls_certfile_gc.erl | 3 ++- dev | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_tls_certfile_gc.erl b/apps/emqx/src/emqx_tls_certfile_gc.erl index 78dfdbaca..a229db7a0 100644 --- a/apps/emqx/src/emqx_tls_certfile_gc.erl +++ b/apps/emqx/src/emqx_tls_certfile_gc.erl @@ -228,7 +228,8 @@ find_managed_files(Filter, Dir) -> Acc end; (AbsPath, {error, Reason}, Acc) -> - ?SLOG(notice, "filesystem_object_inaccessible", #{ + ?SLOG(notice, #{ + msg => "filesystem_object_inaccessible", abspath => AbsPath, reason => Reason }), diff --git a/dev b/dev index 6a20c4a2d..4411816e2 100755 --- a/dev +++ b/dev @@ -158,7 +158,7 @@ export EMQX_LOG_DIR="$BASE_DIR/log" CONFIGS_DIR="$EMQX_DATA_DIR/configs" # Use your cookie so your IDE can connect to it. COOKIE="${EMQX_NODE__COOKIE:-${EMQX_NODE_COOKIE:-$(cat ~/.erlang.cookie || echo 'emqxsecretcookie')}}" -mkdir -p "$EMQX_ETC_DIR" "$EMQX_DATA_DIR/patches" "$EMQX_LOG_DIR" "$CONFIGS_DIR" +mkdir -p "$EMQX_ETC_DIR" "$EMQX_DATA_DIR/patches" "$EMQX_DATA_DIR/certs" "$EMQX_LOG_DIR" "$CONFIGS_DIR" if [ $EKKA_EPMD -eq 1 ]; then EPMD_ARGS='-start_epmd false -epmd_module ekka_epmd' else From c160301dbe887f9a23c78873da6ee9a0b222a640 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 15 Jun 2023 17:05:51 +0800 Subject: [PATCH 18/34] fix: don't log enoent error --- apps/emqx/src/emqx_tls_certfile_gc.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx/src/emqx_tls_certfile_gc.erl b/apps/emqx/src/emqx_tls_certfile_gc.erl index a229db7a0..9e2e98b7f 100644 --- a/apps/emqx/src/emqx_tls_certfile_gc.erl +++ b/apps/emqx/src/emqx_tls_certfile_gc.erl @@ -227,6 +227,8 @@ find_managed_files(Filter, Dir) -> false -> Acc end; + (AbsPath, {error, enoent}, Acc) when AbsPath == Dir -> + Acc; (AbsPath, {error, Reason}, Acc) -> ?SLOG(notice, #{ msg => "filesystem_object_inaccessible", From 178711f742558ed97ee9015372ac133a260217f3 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 12 Jun 2023 13:05:55 +0800 Subject: [PATCH 19/34] fix: shared-sub with nl sub-option should cause protocol error --- apps/emqx/src/emqx_packet.erl | 7 +++++++ apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 15 +++++++++++++++ changes/ce/fix-11074.en.md | 1 + 3 files changed, 23 insertions(+) create mode 100644 changes/ce/fix-11074.en.md diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 32bd3df53..96eacc5a9 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -270,6 +270,9 @@ check(#mqtt_packet_subscribe{topic_filters = TopicFilters}) -> try validate_topic_filters(TopicFilters) catch + %% Known Specificed Reason Code + error:{error, RC} -> + {error, RC}; error:_Error -> {error, ?RC_TOPIC_FILTER_INVALID} end; @@ -413,6 +416,10 @@ run_checks([Check | More], Packet, Options) -> validate_topic_filters(TopicFilters) -> lists:foreach( fun + %% Protocol Error and Should Disconnect + %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] + ({<>, #{nl := 1}}) -> + error({error, ?RC_PROTOCOL_ERROR}); ({TopicFilter, _SubOpts}) -> emqx_topic:validate(TopicFilter); (TopicFilter) -> diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index fe608f600..a2a2e5244 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -985,3 +985,18 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(Config) -> ?assertEqual(1, counters:get(CRef, 1)), process_flag(trap_exit, false). + +t_share_subscribe_no_local(Config) -> + ConnFun = ?config(conn_fun, Config), + process_flag(trap_exit, true), + ShareTopic = <<"$share/sharename/TopicA">>, + + {ok, Client} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client), + %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] (Disconnect) + case catch emqtt:subscribe(Client, #{}, [{ShareTopic, [{nl, true}, {qos, 1}]}]) of + {'EXIT', {Reason, _Stk}} -> + ?assertEqual({disconnected, ?RC_PROTOCOL_ERROR, #{}}, Reason) + end, + + process_flag(trap_exit, false). diff --git a/changes/ce/fix-11074.en.md b/changes/ce/fix-11074.en.md new file mode 100644 index 000000000..7870e5ade --- /dev/null +++ b/changes/ce/fix-11074.en.md @@ -0,0 +1 @@ +Fix Protocol spec MQTT-5.0 [MQTT-3.8.3-4]. From 50e7d5d2ecfa50b244c5b3ec865ffaa34a50cf82 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 16 Jun 2023 17:03:39 -0300 Subject: [PATCH 20/34] fix(nolink_apply): avoid sending late replies to caller Due to race conditions, it's possible that the caller to `pmap`/`nolink_apply` might receive a late reply. e.g. when a timeout occurred while resource manager was checking a resource's health: ``` 19:18:23.084 [error] [data: ..., event_data: {#Reference<0.3247872820.3887857670.131018>, {:normal, [false, true, true, true, true, true]}}, event_type: :info, msg: :ignore_all_other_events, state: :connected] ``` Using an alias and also checking for the race condition in the `after` block (like [`gen`](https://github.com/erlang/otp/blob/a76bf63197dbf41d9179413b26597afeeb46ff30/lib/stdlib/src/gen.erl#L270-L277) does), we avoid polluting the caller's mailbox with late replies. --- apps/emqx_utils/src/emqx_utils.erl | 24 ++++++++++++++--- apps/emqx_utils/test/emqx_utils_SUITE.erl | 32 +++++++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 6cf85fb5d..830845b60 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -20,6 +20,8 @@ %% [TODO] Cleanup so the instruction below is not necessary. -elvis([{elvis_style, god_modules, disable}]). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([ merge_opts/2, maybe_apply/2, @@ -432,7 +434,7 @@ nolink_apply(Fun) -> nolink_apply(Fun, infinity). -spec nolink_apply(function(), timer:timeout()) -> term(). nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> Caller = self(), - ResRef = make_ref(), + ResRef = alias([reply]), Middleman = erlang:spawn( fun() -> process_flag(trap_exit, true), @@ -446,7 +448,8 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> C:E:S -> {exception, {C, E, S}} end, - _ = erlang:send(Caller, {ResRef, Res}), + _ = erlang:send(ResRef, {ResRef, Res}), + ?tp(pmap_middleman_sent_response, #{}), exit(normal) end ), @@ -460,7 +463,7 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> exit(normal); {'EXIT', Worker, Reason} -> %% worker exited with some reason other than 'normal' - _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}), exit(normal) end end @@ -473,8 +476,21 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> {ResRef, {'EXIT', Reason}} -> exit(Reason) after Timeout -> + %% possible race condition: a message was received just as we enter the after + %% block. + ?tp(pmap_timeout, #{}), + unalias(ResRef), exit(Middleman, kill), - exit(timeout) + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after 0 -> + exit(timeout) + end end. safe_to_existing_atom(In) -> diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 6c6bcf8d3..12e99c917 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SOCKOPTS, [ binary, @@ -208,3 +209,34 @@ t_pmap_exception(_) -> [{2, 3}, {3, 4}, error] ) ). + +t_pmap_late_reply(_) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pmap_middleman_sent_response}, + #{?snk_kind := pmap_timeout} + ), + Timeout = 100, + Res = + catch emqx_utils:pmap( + fun(_) -> + process_flag(trap_exit, true), + timer:sleep(3 * Timeout), + done + end, + [1, 2, 3], + Timeout + ), + receive + {Ref, LateReply} when is_reference(Ref) -> + ct:fail("should not receive late reply: ~p", [LateReply]) + after (5 * Timeout) -> + ok + end, + ?assertMatch([done, done, done], Res), + ok + end, + [] + ), + ok. From 4d24fd13a64c84c6b42748ab7b667d4a0930c22b Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Sat, 17 Jun 2023 07:06:42 +0800 Subject: [PATCH 21/34] chore: update changes/ce/fix-11074.en.md Co-authored-by: Thales Macedo Garitezi --- changes/ce/fix-11074.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/fix-11074.en.md b/changes/ce/fix-11074.en.md index 7870e5ade..fa557b3a1 100644 --- a/changes/ce/fix-11074.en.md +++ b/changes/ce/fix-11074.en.md @@ -1 +1 @@ -Fix Protocol spec MQTT-5.0 [MQTT-3.8.3-4]. +Fix to adhere to Protocol spec MQTT-5.0 [MQTT-3.8.3-4]. From 8b20a703449d67ed53d9ad2084c25144e0f36d05 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 19 Jun 2023 15:57:40 +0800 Subject: [PATCH 22/34] chore: hide plugins from conf load cli --- apps/emqx_conf/src/emqx_conf_cli.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 70e9c3a5e..7e5a5b891 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -170,7 +170,7 @@ drop_hidden_roots(Conf) -> lists:foldl(fun(K, Acc) -> maps:remove(K, Acc) end, Conf, hidden_roots()). hidden_roots() -> - [<<"trace">>, <<"stats">>, <<"broker">>, <<"persistent_session_store">>]. + [<<"trace">>, <<"stats">>, <<"broker">>, <<"persistent_session_store">>, <<"plugins">>]. get_config(Key) -> case emqx:get_raw_config([Key], undefined) of From 4071acfbd88a88b46dd9e4c5ca9cfaa644f9fcc6 Mon Sep 17 00:00:00 2001 From: YuShifan <894402575bt@gmail.com> Date: Mon, 19 Jun 2023 15:46:43 +0800 Subject: [PATCH 23/34] chore: upgrade dashboard to e1.1.0-beta.8 for ee and v1.3.0-1 for ce --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index f04f2fd77..22eff4c2f 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,8 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.3.0 -export EMQX_EE_DASHBOARD_VERSION ?= e1.1.0-beta.7 +export EMQX_DASHBOARD_VERSION ?= v1.3.0-1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.1.0-beta.8 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used # In make 4.4+, for backward-compatibility the value from the original environment is used. From a578d6fa1d35b17c85df61923f2b361d646259bb Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 19 Jun 2023 17:16:53 +0800 Subject: [PATCH 24/34] chore: release e5.1.0-alpha.8 --- Makefile | 2 +- apps/emqx/include/emqx_release.hrl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 22eff4c2f..533af9c05 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 export EMQX_DASHBOARD_VERSION ?= v1.3.0-1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.1.0-beta.8 +export EMQX_EE_DASHBOARD_VERSION ?= e1.1.0-beta.9 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used # In make 4.4+, for backward-compatibility the value from the original environment is used. diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index fe37a75ff..f23916960 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.1.0-alpha.5"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.1.0-alpha.7"). +-define(EMQX_RELEASE_EE, "5.1.0-alpha.8"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). From ae5846be2944487a3bf2315a66a0bfc6d9f30453 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 19 Jun 2023 13:25:06 +0200 Subject: [PATCH 25/34] fix(ft-api): make schema tags look nicer in the docs --- apps/emqx_ft/src/emqx_ft_api.erl | 5 +++-- apps/emqx_ft/src/emqx_ft_api.hrl | 10 ++++++++++ apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl | 3 ++- 3 files changed, 15 insertions(+), 3 deletions(-) create mode 100644 apps/emqx_ft/src/emqx_ft_api.hrl diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index 3fd279c76..7bc3a1d90 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -19,6 +19,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_ft_api.hrl"). %% Swagger specs from hocon schema -export([ @@ -61,7 +62,7 @@ schema("/file_transfer/files") -> #{ 'operationId' => '/file_transfer/files', get => #{ - tags => [<<"file_transfer">>], + tags => ?TAGS, summary => <<"List all uploaded files">>, description => ?DESC("file_list"), parameters => [ @@ -83,7 +84,7 @@ schema("/file_transfer/files/:clientid/:fileid") -> #{ 'operationId' => '/file_transfer/files/:clientid/:fileid', get => #{ - tags => [<<"file_transfer">>], + tags => ?TAGS, summary => <<"List files uploaded in a specific transfer">>, description => ?DESC("file_list_transfer"), parameters => [ diff --git a/apps/emqx_ft/src/emqx_ft_api.hrl b/apps/emqx_ft/src/emqx_ft_api.hrl new file mode 100644 index 000000000..ef38757f3 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_api.hrl @@ -0,0 +1,10 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(__EMQX_FT_API__). +-define(__EMQX_FT_API__, 42). + +-define(TAGS, [<<"File Transfer">>]). + +-endif. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl index 40944c0e8..8a475afd2 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl @@ -21,6 +21,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include("emqx_ft_api.hrl"). %% Swagger specs from hocon schema -export([ @@ -60,7 +61,7 @@ schema("/file_transfer/file") -> #{ 'operationId' => '/file_transfer/file', get => #{ - tags => [<<"file_transfer">>], + tags => ?TAGS, summary => <<"Download a particular file">>, description => ?DESC("file_get"), parameters => [ From a38a79aff8f879c732b6d6f9e348bb97597705e5 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Mon, 19 Jun 2023 19:42:59 +0200 Subject: [PATCH 26/34] fix(rpm): add missing dependencies for Amazon Linux 2023 --- deploy/packages/rpm/emqx.spec | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deploy/packages/rpm/emqx.spec b/deploy/packages/rpm/emqx.spec index b2b58ac23..4839d1a7d 100644 --- a/deploy/packages/rpm/emqx.spec +++ b/deploy/packages/rpm/emqx.spec @@ -23,8 +23,12 @@ AutoReq: 0 %if "%{_arch} %{?rhel}" == "x86_64 7" Requires: openssl11 libatomic procps which findutils %else +%if "%{?dist}" == ".amzn2023" +Requires: libatomic procps which findutils ncurses util-linux shadow-utils +%else Requires: libatomic procps which findutils %endif +%endif %description EMQX, a distributed, massively scalable, highly extensible MQTT message broker. From 13746c2cdf94b7f1e1e4f10a3f1cd60aa13c173d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 19 Jun 2023 17:56:06 -0300 Subject: [PATCH 27/34] fix(resource): check status when (re)starting a resource Fixes https://emqx.atlassian.net/browse/EMQX-10290 --- .../src/emqx_resource_manager.erl | 3 +- .../test/emqx_connector_demo.erl | 3 ++ .../test/emqx_resource_SUITE.erl | 28 +++++++++++++++++-- changes/ce/fix-11094.en.md | 1 + 4 files changed, 31 insertions(+), 4 deletions(-) create mode 100644 changes/ce/fix-11094.en.md diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c1adb8ecd..2e4822a2f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -223,8 +223,7 @@ restart(ResId, Opts) when is_binary(ResId) -> start(ResId, Opts) -> case safe_call(ResId, start, ?T_OPERATION) of ok -> - _ = wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), - ok; + wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)); {error, _Reason} = Error -> Error end. diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 96e22c6b6..b95d8c8bf 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -246,6 +246,9 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid} on_get_status(_InstId, #{health_check_error := true}) -> ?tp(connector_demo_health_check_error, #{}), disconnected; +on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) -> + ?tp(connector_demo_health_check_error, #{}), + {disconnected, State, Message}; on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), case is_process_alive(Pid) of diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5883614aa..934a97829 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -40,6 +40,7 @@ groups() -> init_per_testcase(_, Config) -> ct:timetrap({seconds, 30}), emqx_connector_demo:set_callback_mode(always_sync), + snabbkaffe:start_trace(), Config. end_per_testcase(_, _Config) -> @@ -1145,10 +1146,33 @@ t_auto_retry(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} + #{health_check_interval => 100} ), ?assertEqual(ok, Res). +%% tests resources that have an asynchronous start: they are created +%% without problems, but later some issue is found when calling the +%% health check. +t_start_throw_error(_Config) -> + Message = "something went wrong", + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, health_check_error => {msg, Message}}, + #{health_check_interval => 100} + ), + #{?snk_kind := connector_demo_health_check_error}, + 1_000 + ) + ), + %% Now, if we try to "reconnect" (restart) it, we should get the error + ?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})), + ok. + t_health_check_disconnected(_) -> ?check_trace( begin @@ -1157,7 +1181,7 @@ t_health_check_disconnected(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} + #{health_check_interval => 100} ), ?assertEqual( {ok, disconnected}, diff --git a/changes/ce/fix-11094.en.md b/changes/ce/fix-11094.en.md new file mode 100644 index 000000000..e73a8635f --- /dev/null +++ b/changes/ce/fix-11094.en.md @@ -0,0 +1 @@ +Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge. From 45963b6a95fc586275efe674686b55b56094fc3a Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 16 Jun 2023 20:14:16 +0800 Subject: [PATCH 28/34] fix: ip_port schema type crash --- apps/emqx/src/emqx_schema.erl | 17 +++++------------ .../src/emqx_dashboard_schema.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_schema.erl | 2 +- .../src/emqx_exproto_schema.erl | 2 +- 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 67834839d..17d723a77 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1758,7 +1758,7 @@ base_listener(Bind) -> )}, {"bind", sc( - hoconsc:union([ip_port(), integer()]), + ip_port(), #{ default => Bind, required => true, @@ -2525,9 +2525,9 @@ to_ip_port(Str) -> case split_ip_port(Str) of {"", Port} -> %% this is a local address - {ok, list_to_integer(Port)}; + {ok, parse_port(Port)}; {MaybeIp, Port} -> - PortVal = list_to_integer(Port), + PortVal = parse_port(Port), case inet:parse_address(MaybeIp) of {ok, IpTuple} -> {ok, {IpTuple, PortVal}}; @@ -2543,18 +2543,11 @@ split_ip_port(Str0) -> case lists:split(string:rchr(Str, $:), Str) of %% no colon {[], Str} -> - try - %% if it's just a port number, then return as-is - _ = list_to_integer(Str), - {"", Str} - catch - _:_ -> - error - end; + {"", Str}; {IpPlusColon, PortString} -> IpStr0 = lists:droplast(IpPlusColon), case IpStr0 of - %% dropp head/tail brackets + %% drop head/tail brackets [$[ | S] -> case lists:last(S) of $] -> {lists:droplast(S), PortString}; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index 957cc6120..31ad4f831 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -195,7 +195,7 @@ enable(Bool) -> bind(Port) -> {"bind", ?HOCON( - ?UNION([non_neg_integer(), emqx_schema:ip_port()]), + emqx_schema:ip_port(), #{ default => 0, required => false, diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index d527e1e06..b43f4ba98 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -261,7 +261,7 @@ common_listener_opts() -> )}, {bind, sc( - hoconsc:union([ip_port(), integer()]), + ip_port(), #{desc => ?DESC(gateway_common_listener_bind)} )}, {max_connections, diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl index 7e1f6f49c..10583e41a 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl @@ -56,7 +56,7 @@ fields(exproto_grpc_server) -> [ {bind, sc( - hoconsc:union([ip_port(), integer()]), + ip_port(), #{ required => true, desc => ?DESC(exproto_grpc_server_bind) From a4be9b8281e3b6d8200eab4482c05c3d8081a945 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 19 Jun 2023 09:30:32 +0800 Subject: [PATCH 29/34] chore: bump hocon to 0.39.9 --- apps/emqx/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b0f759ead..6c76a55b7 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, diff --git a/mix.exs b/mix.exs index 89354ea92..c50976b4d 100644 --- a/mix.exs +++ b/mix.exs @@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.39.8", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.39.9", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, diff --git a/rebar.config b/rebar.config index 455ecbf80..bc392bb92 100644 --- a/rebar.config +++ b/rebar.config @@ -75,7 +75,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} From 07172e42f04fc4c8be236954d6ad5978ecf2d115 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 19 Jun 2023 10:30:39 +0800 Subject: [PATCH 30/34] test: integer CI check failed --- apps/emqx/test/emqx_crl_cache_SUITE.erl | 21 ++++++++++++------- apps/emqx/test/emqx_flapping_SUITE.erl | 16 +++++++------- apps/emqx/test/emqx_ocsp_cache_SUITE.erl | 8 +++---- apps/emqx_authn/src/emqx_authn_api.erl | 4 ++-- .../test/emqx_authn_enable_flag_SUITE.erl | 2 +- apps/emqx_authz/test/emqx_authz_SUITE.erl | 2 +- .../test/emqx_authz_api_settings_SUITE.erl | 4 ++-- .../test/emqx_bridge_webhook_SUITE.erl | 18 ++++++++-------- .../test/emqx_bridge_cassandra_SUITE.erl | 6 +++--- ...emqx_bridge_clickhouse_connector_SUITE.erl | 2 +- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 4 ++-- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 2 +- .../test/emqx_bridge_kafka_tests.erl | 2 +- .../test/emqx_bridge_opents_SUITE.erl | 6 +++--- .../test/emqx_bridge_tdengine_SUITE.erl | 6 +++--- .../emqx_conf/test/emqx_conf_schema_tests.erl | 12 +++++------ apps/emqx_conf/test/emqx_global_gc_SUITE.erl | 2 +- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 2 +- .../test/emqx_exhook_api_SUITE.erl | 16 +++++++------- apps/emqx_ft/test/emqx_ft_SUITE.erl | 2 +- .../test/emqx_lwm2m_SUITE.erl | 4 ++-- .../src/emqx_node_rebalance_api.erl | 10 ++++----- .../test/emqx_node_rebalance_api_SUITE.erl | 12 +++++------ .../test/emqx_retainer_SUITE.erl | 9 ++++---- apps/emqx_s3/test/emqx_s3_schema_SUITE.erl | 2 +- apps/emqx_s3/test/emqx_s3_test_helpers.erl | 8 +++---- .../emqx_slow_subs/src/emqx_slow_subs_api.erl | 2 +- .../test/emqx_slow_subs_api_SUITE.erl | 21 ++++++++----------- 28 files changed, 104 insertions(+), 101 deletions(-) diff --git a/apps/emqx/test/emqx_crl_cache_SUITE.erl b/apps/emqx/test/emqx_crl_cache_SUITE.erl index 31738e980..6c6337038 100644 --- a/apps/emqx/test/emqx_crl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_crl_cache_SUITE.erl @@ -497,17 +497,24 @@ t_update_config(_Config) -> emqx_config_handler:start_link(), {ok, Pid} = emqx_crl_cache:start_link(), Conf = #{ - refresh_interval => timer:minutes(5), - http_timeout => timer:minutes(10), + refresh_interval => <<"5m">>, + http_timeout => <<"10m">>, capacity => 123 }, ?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)), State = sys:get_state(Pid), - ?assertEqual(Conf, #{ - refresh_interval => element(3, State), - http_timeout => element(4, State), - capacity => element(7, State) - }), + ?assertEqual( + #{ + refresh_interval => timer:minutes(5), + http_timeout => timer:minutes(10), + capacity => 123 + }, + #{ + refresh_interval => element(3, State), + http_timeout => element(4, State), + capacity => element(7, State) + } + ), emqx_config:erase(<<"crl_cache">>), emqx_config_handler:stop(), ok. diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 62c967078..942a262a6 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -33,9 +33,9 @@ init_per_suite(Config) -> <<"enable">> => true, <<"max_count">> => 3, % 0.1s - <<"window_time">> => 100, + <<"window_time">> => <<"100ms">>, %% 2s - <<"ban_time">> => "2s" + <<"ban_time">> => <<"2s">> } ), Config. @@ -119,16 +119,16 @@ t_conf_update(_) -> ?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)), Zones = #{ - <<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}}, - <<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}} + <<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"123s">>}}, + <<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"456s">>}} }, ?assertMatch({ok, _}, emqx:update_config([zones], Zones)), %% new_zone is already deleted ?assertError({config_not_found, _}, get_policy(new_zone)), %% update zone(zone_1) has default. - ?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)), + ?assertEqual(Global#{window_time := 123000}, emqx_flapping:get_policy(zone_1)), %% create zone(zone_2) has default - ?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)), + ?assertEqual(Global#{window_time := 456000}, emqx_flapping:get_policy(zone_2)), %% reset to default(empty) andalso get default from global ?assertMatch({ok, _}, emqx:update_config([zones], #{})), ?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])), @@ -172,13 +172,13 @@ validate_timer(Lists) -> ok. t_window_compatibility_check(_Conf) -> - Flapping = emqx:get_config([flapping_detect]), + Flapping = emqx:get_raw_config([flapping_detect]), ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>), ?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])), %% reset FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]), ok = emqx_config:init_load(emqx_schema, FlappingBin), - ?assertEqual(Flapping, emqx:get_config([flapping_detect])), + ?assertEqual(Flapping, emqx:get_raw_config([flapping_detect])), ok. get_policy(Zone) -> diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl index fefa998f8..cf8efacb1 100644 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl +++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl @@ -137,8 +137,8 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) -> enable_ocsp_stapling => true, responder_url => <<"http://localhost:9877/">>, issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), - refresh_http_timeout => 15_000, - refresh_interval => 1_000 + refresh_http_timeout => <<"15s">>, + refresh_interval => <<"1s">> } } }, @@ -179,8 +179,8 @@ init_per_testcase(_TestCase, Config) -> enable_ocsp_stapling => true, responder_url => <<"http://localhost:9877/">>, issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), - refresh_http_timeout => 15_000, - refresh_interval => 1_000 + refresh_http_timeout => <<"15s">>, + refresh_interval => <<"1s">> } } }, diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index ca6a28f51..35d2caa96 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -1316,8 +1316,8 @@ authenticator_examples() -> <<"password">> => ?PH_PASSWORD }, pool_size => 8, - connect_timeout => 5000, - request_timeout => 5000, + connect_timeout => <<"5s">>, + request_timeout => <<"5s">>, enable_pipelining => 100, ssl => #{enable => false} } diff --git a/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl b/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl index 143a24152..cc2785b1e 100644 --- a/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl @@ -80,7 +80,7 @@ listener_mqtt_tcp_conf(Port, EnableAuthn) -> <<"max_connections">> => 1024000, <<"mountpoint">> => <<>>, <<"proxy_protocol">> => false, - <<"proxy_protocol_timeout">> => 3000, + <<"proxy_protocol_timeout">> => <<"3s">>, <<"enable_authn">> => EnableAuthn }. diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 702359509..12558813c 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -105,7 +105,7 @@ set_special_configs(_App) -> <<"headers">> => #{}, <<"ssl">> => #{<<"enable">> => true}, <<"method">> => <<"get">>, - <<"request_timeout">> => 5000 + <<"request_timeout">> => <<"5s">> }). -define(SOURCE2, #{ <<"type">> => <<"mongodb">>, diff --git a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl index e3412e169..c29fe0f5b 100644 --- a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl @@ -70,7 +70,7 @@ t_api(_) -> <<"cache">> => #{ <<"enable">> => false, <<"max_size">> => 32, - <<"ttl">> => 60000 + <<"ttl">> => <<"60s">> } }, @@ -84,7 +84,7 @@ t_api(_) -> <<"cache">> => #{ <<"enable">> => true, <<"max_size">> => 32, - <<"ttl">> => 60000 + <<"ttl">> => <<"60s">> } }, diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 93eab438e..3b6f36bbc 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -179,20 +179,20 @@ bridge_async_config(#{port := Port} = Config) -> PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, 1), - RequestTimeout = maps:get(request_timeout, Config, 10000), + RequestTimeout = maps:get(request_timeout, Config, "10s"), ResumeInterval = maps:get(resume_interval, Config, "1s"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), ConfigString = io_lib:format( "bridges.~s.~s {\n" " url = \"http://localhost:~p\"\n" - " connect_timeout = \"~ps\"\n" + " connect_timeout = \"~p\"\n" " enable = true\n" " enable_pipelining = 100\n" " max_retries = 2\n" " method = \"post\"\n" " pool_size = ~p\n" " pool_type = \"random\"\n" - " request_timeout = \"~ps\"\n" + " request_timeout = \"~s\"\n" " body = \"${id}\"" " resource_opts {\n" " inflight_window = 100\n" @@ -257,8 +257,8 @@ t_send_async_connection_timeout(Config) -> port => Port, pool_size => 1, query_mode => "async", - connect_timeout => 10_000, - request_timeout => ResponseDelayMS * 2, + connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "s", + request_timeout => "10s", resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, @@ -278,8 +278,8 @@ t_async_free_retries(Config) -> port => Port, pool_size => 1, query_mode => "sync", - connect_timeout => 1_000, - request_timeout => 10_000, + connect_timeout => "1s", + request_timeout => "10s", resource_request_ttl => "10000s" }), %% Fail 5 times then succeed. @@ -304,8 +304,8 @@ t_async_common_retries(Config) -> pool_size => 1, query_mode => "sync", resume_interval => "100ms", - connect_timeout => 1_000, - request_timeout => 10_000, + connect_timeout => "1s", + request_timeout => "10s", resource_request_ttl => "10000s" }), %% Keeps failing until connector gives up. diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 5525a640c..fb16dd749 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -635,9 +635,9 @@ t_bad_sql_parameter(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_ttl">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"request_ttl">> => <<"500ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ) diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl index 0bae413e0..12d678e85 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl @@ -184,7 +184,7 @@ clickhouse_config() -> ] ) ), - connect_timeout => 10000 + connect_timeout => <<"10s">> }, #{<<"config">> => Config}. diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 884f160f9..f26e4037b 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -135,8 +135,8 @@ bridge_config(TestCase, _TestGroup, Config) -> " iotdb_version = \"~s\"\n" " pool_size = 1\n" " resource_opts = {\n" - " health_check_interval = 5000\n" - " request_ttl = 30000\n" + " health_check_interval = \"5s\"\n" + " request_ttl = 30s\n" " query_mode = \"async\"\n" " worker_pool_size = 1\n" " }\n" diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 33c207c39..74fde6426 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -590,7 +590,7 @@ kafka_config(TestCase, _KafkaType, Config) -> " kafka {\n" " max_batch_bytes = 896KB\n" " max_rejoin_attempts = 5\n" - " offset_commit_interval_seconds = 3\n" + " offset_commit_interval_seconds = 3s\n" %% todo: matrix this " offset_reset_policy = latest\n" " }\n" diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 0ccc19778..3b558200c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -307,7 +307,7 @@ bridges.kafka_consumer.my_consumer { kafka { max_batch_bytes = 896KB max_rejoin_attempts = 5 - offset_commit_interval_seconds = 3 + offset_commit_interval_seconds = 3s offset_reset_policy = latest } topic_mapping = [ diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index 3563e0774..93224d5ca 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -298,9 +298,9 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_ttl">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"request_ttl">> => <<"500ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ), diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 7644921f0..9399f6029 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -456,9 +456,9 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_ttl">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"request_ttl">> => <<"500ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ), diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index 8e056c018..cdebfeaf7 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -55,7 +55,7 @@ log.console_handler { burst_limit { enable = true max_count = 10000 - window_time = 1000 + window_time = 1s } chars_limit = unlimited drop_mode_qlen = 3000 @@ -66,9 +66,9 @@ log.console_handler { max_depth = 100 overload_kill { enable = true - mem_size = 31457280 + mem_size = \"30MB\" qlen = 20000 - restart_after = 5000 + restart_after = \"5s\" } single_line = true supervisor_reports = error @@ -80,7 +80,7 @@ log.file_handlers { burst_limit { enable = true max_count = 10000 - window_time = 1000 + window_time = 1s } chars_limit = unlimited drop_mode_qlen = 3000 @@ -93,9 +93,9 @@ log.file_handlers { max_size = \"1024MB\" overload_kill { enable = true - mem_size = 31457280 + mem_size = \"30MB\" qlen = 20000 - restart_after = 5000 + restart_after = \"5s\" } rotation {count = 20, enable = true} single_line = true diff --git a/apps/emqx_conf/test/emqx_global_gc_SUITE.erl b/apps/emqx_conf/test/emqx_global_gc_SUITE.erl index ec1e20b3d..036f97643 100644 --- a/apps/emqx_conf/test/emqx_global_gc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_global_gc_SUITE.erl @@ -35,7 +35,7 @@ t_run_gc(_) -> node => #{ cookie => <<"cookie">>, data_dir => <<"data">>, - global_gc_interval => 1000 + global_gc_interval => <<"1s">> } }, emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0), diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 8babd267b..bd756620d 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -225,7 +225,7 @@ t_update_conf(_Config) -> DeletedConf = Conf#{<<"servers">> => Servers2}, validate_servers(Path, DeletedConf, Servers2), [L1, L2 | Servers3] = Servers, - UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000}, + UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => <<"1s">>}, UpdatedServers = [L1, UpdateL2 | Servers3], UpdatedConf = Conf#{<<"servers">> => UpdatedServers}, validate_servers(Path, UpdatedConf, UpdatedServers), diff --git a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl index c03b3f231..1178f244b 100644 --- a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl @@ -67,7 +67,7 @@ init_per_suite(Config) -> _ = emqx_exhook_demo_svr:start(), load_cfg(?CONF_DEFAULT), emqx_mgmt_api_test_util:init_suite([emqx_exhook]), - [Conf] = emqx:get_config([exhook, servers]), + [Conf] = emqx:get_raw_config([exhook, servers]), [{template, Conf} | Config]. end_per_suite(Config) -> @@ -157,8 +157,8 @@ t_get(_) -> t_add(Cfg) -> Template = proplists:get_value(template, Cfg), Instance = Template#{ - name => <<"test1">>, - url => "http://127.0.0.1:9001" + <<"name">> => <<"test1">>, + <<"url">> => "http://127.0.0.1:9001" }, {ok, Data} = request_api( post, @@ -186,8 +186,8 @@ t_add(Cfg) -> t_add_duplicate(Cfg) -> Template = proplists:get_value(template, Cfg), Instance = Template#{ - name => <<"test1">>, - url => "http://127.0.0.1:9001" + <<"name">> => <<"test1">>, + <<"url">> => "http://127.0.0.1:9001" }, {error, _Reason} = request_api( @@ -203,8 +203,8 @@ t_add_duplicate(Cfg) -> t_add_with_bad_name(Cfg) -> Template = proplists:get_value(template, Cfg), Instance = Template#{ - name => <<"🤔">>, - url => "http://127.0.0.1:9001" + <<"name">> => <<"🤔">>, + <<"url">> => "http://127.0.0.1:9001" }, {error, _Reason} = request_api( @@ -298,7 +298,7 @@ t_hooks(_Cfg) -> t_update(Cfg) -> Template = proplists:get_value(template, Cfg), - Instance = Template#{enable => false}, + Instance = Template#{<<"enable">> => false}, {ok, <<"{\"", _/binary>>} = request_api( put, api_path(["exhooks", "default"]), diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index c48c77d93..78100bab4 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -77,7 +77,7 @@ set_special_configs(Config) -> % complete transfers. Storage = emqx_utils_maps:deep_merge( emqx_ft_test_helpers:local_storage(Config), - #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}} + #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => <<"0s">>}}}} ), emqx_ft_test_helpers:load_config(#{ <<"enable">> => true, diff --git a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl index 9f388b07c..1779bf842 100644 --- a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -192,7 +192,7 @@ default_config(Overrides) -> " xml_dir = \"~s\"\n" " lifetime_min = 1s\n" " lifetime_max = 86400s\n" - " qmode_time_window = 22\n" + " qmode_time_window = 22s\n" " auto_observe = ~w\n" " mountpoint = \"lwm2m/${username}\"\n" " update_msg_publish_condition = contains_object_list\n" diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index abae139ad..7c486adb0 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -468,7 +468,7 @@ fields(rebalance_evacuation_start) -> )}, {"wait_takeover", mk( - pos_integer(), + emqx_schema:timeout_duration_s(), #{ desc => ?DESC(wait_takeover), required => false @@ -709,24 +709,24 @@ fields(global_status) -> rebalance_example() -> #{ - wait_health_check => 10, + wait_health_check => <<"10s">>, conn_evict_rate => 10, sess_evict_rate => 20, abs_conn_threshold => 10, rel_conn_threshold => 1.5, abs_sess_threshold => 10, rel_sess_threshold => 1.5, - wait_takeover => 10, + wait_takeover => <<"10s">>, nodes => [<<"othernode@127.0.0.1">>] }. rebalance_evacuation_example() -> #{ - wait_health_check => 10, + wait_health_check => <<"10s">>, conn_evict_rate => 100, sess_evict_rate => 100, redirect_to => <<"othernode:1883">>, - wait_takeover => 10, + wait_takeover => <<"10s">>, migrate_to => [<<"othernode@127.0.0.1">>] }. diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl index 119b4a5d9..bb691a754 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl @@ -67,7 +67,6 @@ t_start_evacuation_validation(Config) -> BadOpts = [ #{conn_evict_rate => <<"conn">>}, #{sess_evict_rate => <<"sess">>}, - #{redirect_to => 123}, #{wait_takeover => <<"wait">>}, #{wait_health_check => <<"wait">>}, #{migrate_to => []}, @@ -83,7 +82,8 @@ t_start_evacuation_validation(Config) -> api_post( ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"], Opts - ) + ), + Opts ) end, BadOpts @@ -103,8 +103,8 @@ t_start_evacuation_validation(Config) -> #{ conn_evict_rate => 10, sess_evict_rate => 10, - wait_takeover => 10, - wait_health_check => 10, + wait_takeover => <<"10s">>, + wait_health_check => <<"10s">>, redirect_to => <<"srv">>, migrate_to => [atom_to_binary(RecipientNode)] } @@ -166,8 +166,8 @@ t_start_rebalance_validation(Config) -> #{ conn_evict_rate => 10, sess_evict_rate => 10, - wait_takeover => 10, - wait_health_check => 10, + wait_takeover => <<"10s">>, + wait_health_check => <<"10s">>, abs_conn_threshold => 10, rel_conn_threshold => 1.001, abs_sess_threshold => 10, diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index bce62aa25..c925e925d 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -483,26 +483,25 @@ t_clear_expired(_) -> with_conf(ConfMod, Case). t_max_payload_size(_) -> - ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end, + ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end, Case = fun() -> emqx_retainer:clean(), timer:sleep(500), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - + Payload = iolist_to_binary(lists:duplicate(1024, <<"0">>)), emqtt:publish( C1, <<"retained/1">>, #{}, - <<"1234">>, + Payload, [{qos, 0}, {retain, true}] ), - emqtt:publish( C1, <<"retained/2">>, #{}, - <<"1234567">>, + <<"1", Payload/binary>>, [{qos, 0}, {retain, true}] ), diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl index 3c7753857..ad887d1a6 100644 --- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -84,7 +84,7 @@ t_full_config(_Config) -> <<"min_part_size">> => <<"10mb">>, <<"acl">> => <<"public_read">>, <<"transport_options">> => #{ - <<"connect_timeout">> => 30000, + <<"connect_timeout">> => <<"30s">>, <<"enable_pipelining">> => 200, <<"pool_size">> => 10, <<"pool_type">> => <<"random">>, diff --git a/apps/emqx_s3/test/emqx_s3_test_helpers.erl b/apps/emqx_s3/test/emqx_s3_test_helpers.erl index a73f618af..26740c18b 100644 --- a/apps/emqx_s3/test/emqx_s3_test_helpers.erl +++ b/apps/emqx_s3/test/emqx_s3_test_helpers.erl @@ -64,10 +64,10 @@ base_raw_config(tcp) -> <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), <<"host">> => ?TCP_HOST, <<"port">> => ?TCP_PORT, - <<"max_part_size">> => 10 * 1024 * 1024, + <<"max_part_size">> => <<"10MB">>, <<"transport_options">> => #{ - <<"request_timeout">> => 2000 + <<"request_timeout">> => <<"2s">> } }; base_raw_config(tls) -> @@ -77,10 +77,10 @@ base_raw_config(tls) -> <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), <<"host">> => ?TLS_HOST, <<"port">> => ?TLS_PORT, - <<"max_part_size">> => 10 * 1024 * 1024, + <<"max_part_size">> => <<"10MB">>, <<"transport_options">> => #{ - <<"request_timeout">> => 2000, + <<"request_timeout">> => <<"2s">>, <<"ssl">> => #{ <<"enable">> => true, <<"cacertfile">> => bin(cert_path("ca.crt")), diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 311bcf62e..b4179caff 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -139,7 +139,7 @@ settings(get, _) -> {200, emqx:get_raw_config([slow_subs], #{})}; settings(put, #{body := Body}) -> case emqx_slow_subs:update_settings(Body) of - {ok, #{config := NewConf}} -> + {ok, #{raw_config := NewConf}} -> {200, NewConf}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])), diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl index 5196868c7..af9b7550f 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -41,7 +41,7 @@ "{\n" " enable = true\n" " top_k_num = 5,\n" - " expire_interval = 60000\n" + " expire_interval = 60s\n" " stats_type = whole\n" "}" "" @@ -137,36 +137,33 @@ t_clear(_) -> ?assertEqual(0, ets:info(?TOPK_TAB, size)). t_settting(_) -> - Conf = emqx:get_config([slow_subs]), - Conf2 = Conf#{stats_type => internal}, + RawConf = emqx:get_raw_config([slow_subs]), + RawConf2 = RawConf#{<<"stats_type">> => <<"internal">>}, {ok, Data} = request_api( put, api_path(["slow_subscriptions", "settings"]), [], auth_header_(), - Conf2 + RawConf2 ), Return = decode_json(Data), + Expect = emqx_config:fill_defaults(RawConf2), - ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return), + ?assertEqual(Expect, Return), + timer:sleep(800), {ok, GetData} = request_api( get, api_path(["slow_subscriptions", "settings"]), [], auth_header_() ), - - timer:sleep(1000), - GetReturn = decode_json(GetData), - - ?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn). + ?assertEqual(Expect, GetReturn). decode_json(Data) -> - BinJosn = emqx_utils_json:decode(Data, [return_maps]), - emqx_utils_maps:unsafe_atom_key_map(BinJosn). + emqx_utils_json:decode(Data, [return_maps]). request_api(Method, Url, Auth) -> request_api(Method, Url, [], Auth, []). From 093cdab8388dbcef7e7ce76a8af639e8f6544af5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 19 Jun 2023 16:30:18 +0800 Subject: [PATCH 31/34] chore: to_integer to make sure integer is converted --- apps/emqx/src/emqx_schema.erl | 15 +++++++++++---- apps/emqx/test/emqx_ocsp_cache_SUITE.erl | 16 ++++++++++++---- .../test/emqx_bridge_webhook_SUITE.erl | 2 +- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 17d723a77..1d685fb26 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2418,13 +2418,13 @@ mk_duration(Desc, OverrideMeta) -> to_duration(Str) -> case hocon_postprocess:duration(Str) of I when is_integer(I) -> {ok, I}; - _ -> {error, Str} + _ -> to_integer(Str) end. to_duration_s(Str) -> case hocon_postprocess:duration(Str) of I when is_number(I) -> {ok, ceiling(I / 1000)}; - _ -> {error, Str} + _ -> to_integer(Str) end. -spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when @@ -2432,7 +2432,7 @@ to_duration_s(Str) -> to_duration_ms(Str) -> case hocon_postprocess:duration(Str) of I when is_number(I) -> {ok, ceiling(I)}; - _ -> {error, Str} + _ -> to_integer(Str) end. -spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when @@ -2473,7 +2473,7 @@ do_to_timeout_duration(Str, Fn, Max, Unit) -> to_bytesize(Str) -> case hocon_postprocess:bytesize(Str) of I when is_integer(I) -> {ok, I}; - _ -> {error, Str} + _ -> to_integer(Str) end. to_wordsize(Str) -> @@ -2483,6 +2483,13 @@ to_wordsize(Str) -> Error -> Error end. +to_integer(Str) when is_list(Str) -> + case string:to_integer(Str) of + {Int, []} -> {ok, Int}; + {Int, <<>>} -> {ok, Int}; + _ -> {error, Str} + end. + to_percent(Str) -> {ok, hocon_postprocess:percent(Str)}. diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl index cf8efacb1..d0efc8cd6 100644 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl +++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl @@ -144,8 +144,12 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) -> }, Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, ConfBin = emqx_utils_maps:binary_key_map(Conf), - hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}), - emqx_config:put_listener_conf(Type, Name, [], ListenerOpts), + CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{ + required => false, atom_keys => false + }), + Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf), + ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2), + emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2), snabbkaffe:start_trace(), _Heir = spawn_dummy_heir(), {ok, CachePid} = emqx_ocsp_cache:start_link(), @@ -186,8 +190,12 @@ init_per_testcase(_TestCase, Config) -> }, Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, ConfBin = emqx_utils_maps:binary_key_map(Conf), - hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}), - emqx_config:put_listener_conf(Type, Name, [], ListenerOpts), + CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{ + required => false, atom_keys => false + }), + Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf), + ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2), + emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2), [ {cache_pid, CachePid} | Config diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 3b6f36bbc..f8159472b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -178,7 +178,7 @@ bridge_async_config(#{port := Port} = Config) -> Name = maps:get(name, Config, ?BRIDGE_NAME), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), - ConnectTimeout = maps:get(connect_timeout, Config, 1), + ConnectTimeout = maps:get(connect_timeout, Config, "1s"), RequestTimeout = maps:get(request_timeout, Config, "10s"), ResumeInterval = maps:get(resume_interval, Config, "1s"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), From 856de7869892ba60c5b59ae6968d4d5413291f2d Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 20 Jun 2023 08:39:05 +0800 Subject: [PATCH 32/34] chore: remove the is_list guard --- apps/emqx/src/emqx_schema.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 1d685fb26..9de6ef34a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2483,7 +2483,7 @@ to_wordsize(Str) -> Error -> Error end. -to_integer(Str) when is_list(Str) -> +to_integer(Str) -> case string:to_integer(Str) of {Int, []} -> {ok, Int}; {Int, <<>>} -> {ok, Int}; From 54d96488a0216d32a6277c5dce99218983a941de Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 20 Jun 2023 16:20:31 +0300 Subject: [PATCH 33/34] chore(deps): update erlcloud and remove unnecessary overrides --- apps/emqx_bridge_dynamo/rebar.config | 2 +- apps/emqx_s3/rebar.config | 2 +- changes/ce/fix-11103.en.md | 1 + mix.exs | 9 +-------- 4 files changed, 4 insertions(+), 10 deletions(-) create mode 100644 changes/ce/fix-11103.en.md diff --git a/apps/emqx_bridge_dynamo/rebar.config b/apps/emqx_bridge_dynamo/rebar.config index d3ba1093d..672e8efc2 100644 --- a/apps/emqx_bridge_dynamo/rebar.config +++ b/apps/emqx_bridge_dynamo/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}} +{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_s3/rebar.config b/apps/emqx_s3/rebar.config index b1483e028..65f740aa3 100644 --- a/apps/emqx_s3/rebar.config +++ b/apps/emqx_s3/rebar.config @@ -1,6 +1,6 @@ {deps, [ {emqx, {path, "../../apps/emqx"}}, - {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}} + {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}} ]}. {project_plugins, [erlfmt]}. diff --git a/changes/ce/fix-11103.en.md b/changes/ce/fix-11103.en.md new file mode 100644 index 000000000..794da067a --- /dev/null +++ b/changes/ce/fix-11103.en.md @@ -0,0 +1 @@ +Updated `erlcloud` dependency. diff --git a/mix.exs b/mix.exs index 89354ea92..2167963c1 100644 --- a/mix.exs +++ b/mix.exs @@ -216,14 +216,7 @@ defmodule EMQXUmbrella.MixProject do github: "emqx/rabbitmq-server", tag: "v3.11.13-emqx", sparse: "deps/amqp_client", - override: true}, - {:erlcloud, github: "emqx/erlcloud", tag: "3.6.8-emqx-1", override: true}, - # erlcloud's rebar.config requires rebar3 and does not support Mix, - # so it tries to fetch deps from git. We need to override this. - {:lhttpc, github: "erlcloud/lhttpc", tag: "1.6.2", override: true}, - {:eini, "1.2.9", override: true}, - {:base16, "1.0.0", override: true} - # end of erlcloud's deps + override: true} ] end From 1d791d7a8c1189db0383af4665ee7b05adfc4908 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 20 Jun 2023 14:21:45 -0300 Subject: [PATCH 34/34] fix(resource): validate maximum worker pool size Fixes https://emqx.atlassian.net/browse/EMQX-10297 --- .../src/schema/emqx_resource_schema.erl | 2 +- .../test/emqx_resource_schema_tests.erl | 38 +++++++++++++++++++ changes/ce/fix-11106.en.md | 3 ++ 3 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 changes/ce/fix-11106.en.md diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index eb0a48b06..59687eb8d 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -88,7 +88,7 @@ resource_opts_meta() -> desc => ?DESC(<<"resource_opts">>) }. -worker_pool_size(type) -> non_neg_integer(); +worker_pool_size(type) -> range(1, 1024); worker_pool_size(desc) -> ?DESC("worker_pool_size"); worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false; diff --git a/apps/emqx_resource/test/emqx_resource_schema_tests.erl b/apps/emqx_resource/test/emqx_resource_schema_tests.erl index 219861a4e..0935ee1c6 100644 --- a/apps/emqx_resource/test/emqx_resource_schema_tests.erl +++ b/apps/emqx_resource/test/emqx_resource_schema_tests.erl @@ -74,6 +74,44 @@ health_check_interval_validator_test_() -> ) ]. +worker_pool_size_test_() -> + BaseConf = parse(webhook_bridge_health_check_hocon(<<"15s">>)), + Check = fun(WorkerPoolSize) -> + Conf = emqx_utils_maps:deep_put( + [ + <<"bridges">>, + <<"webhook">>, + <<"simple">>, + <<"resource_opts">>, + <<"worker_pool_size">> + ], + BaseConf, + WorkerPoolSize + ), + #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf), + #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf, + WPS + end, + AssertThrow = fun(WorkerPoolSize) -> + ?assertThrow( + {_, [ + #{ + kind := validation_error, + reason := #{expected_type := _}, + value := WorkerPoolSize + } + ]}, + Check(WorkerPoolSize) + ) + end, + [ + ?_assertEqual(1, Check(1)), + ?_assertEqual(100, Check(100)), + ?_assertEqual(1024, Check(1024)), + ?_test(AssertThrow(0)), + ?_test(AssertThrow(1025)) + ]. + %%=========================================================================== %% Helper functions %%=========================================================================== diff --git a/changes/ce/fix-11106.en.md b/changes/ce/fix-11106.en.md new file mode 100644 index 000000000..2fa3053fa --- /dev/null +++ b/changes/ce/fix-11106.en.md @@ -0,0 +1,3 @@ +Added a validation for the maximum number of pool workers of a bridge. + +Now the maximum amount is 1024 to avoid large memory consumption from an unreasonable number of workers.