diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b94d4c962..2559e6821 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, @@ -44,7 +44,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}} ]}, {extra_src_dirs, [{"test", [recursive]}]} ]} diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 32bd3df53..96eacc5a9 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -270,6 +270,9 @@ check(#mqtt_packet_subscribe{topic_filters = TopicFilters}) -> try validate_topic_filters(TopicFilters) catch + %% Known Specificed Reason Code + error:{error, RC} -> + {error, RC}; error:_Error -> {error, ?RC_TOPIC_FILTER_INVALID} end; @@ -413,6 +416,10 @@ run_checks([Check | More], Packet, Options) -> validate_topic_filters(TopicFilters) -> lists:foreach( fun + %% Protocol Error and Should Disconnect + %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] + ({<>, #{nl := 1}}) -> + error({error, ?RC_PROTOCOL_ERROR}); ({TopicFilter, _SubOpts}) -> emqx_topic:validate(TopicFilter); (TopicFilter) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 67834839d..9de6ef34a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1758,7 +1758,7 @@ base_listener(Bind) -> )}, {"bind", sc( - hoconsc:union([ip_port(), integer()]), + ip_port(), #{ default => Bind, required => true, @@ -2418,13 +2418,13 @@ mk_duration(Desc, OverrideMeta) -> to_duration(Str) -> case hocon_postprocess:duration(Str) of I when is_integer(I) -> {ok, I}; - _ -> {error, Str} + _ -> to_integer(Str) end. to_duration_s(Str) -> case hocon_postprocess:duration(Str) of I when is_number(I) -> {ok, ceiling(I / 1000)}; - _ -> {error, Str} + _ -> to_integer(Str) end. -spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when @@ -2432,7 +2432,7 @@ to_duration_s(Str) -> to_duration_ms(Str) -> case hocon_postprocess:duration(Str) of I when is_number(I) -> {ok, ceiling(I)}; - _ -> {error, Str} + _ -> to_integer(Str) end. -spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when @@ -2473,7 +2473,7 @@ do_to_timeout_duration(Str, Fn, Max, Unit) -> to_bytesize(Str) -> case hocon_postprocess:bytesize(Str) of I when is_integer(I) -> {ok, I}; - _ -> {error, Str} + _ -> to_integer(Str) end. to_wordsize(Str) -> @@ -2483,6 +2483,13 @@ to_wordsize(Str) -> Error -> Error end. +to_integer(Str) -> + case string:to_integer(Str) of + {Int, []} -> {ok, Int}; + {Int, <<>>} -> {ok, Int}; + _ -> {error, Str} + end. + to_percent(Str) -> {ok, hocon_postprocess:percent(Str)}. @@ -2525,9 +2532,9 @@ to_ip_port(Str) -> case split_ip_port(Str) of {"", Port} -> %% this is a local address - {ok, list_to_integer(Port)}; + {ok, parse_port(Port)}; {MaybeIp, Port} -> - PortVal = list_to_integer(Port), + PortVal = parse_port(Port), case inet:parse_address(MaybeIp) of {ok, IpTuple} -> {ok, {IpTuple, PortVal}}; @@ -2543,18 +2550,11 @@ split_ip_port(Str0) -> case lists:split(string:rchr(Str, $:), Str) of %% no colon {[], Str} -> - try - %% if it's just a port number, then return as-is - _ = list_to_integer(Str), - {"", Str} - catch - _:_ -> - error - end; + {"", Str}; {IpPlusColon, PortString} -> IpStr0 = lists:droplast(IpPlusColon), case IpStr0 of - %% dropp head/tail brackets + %% drop head/tail brackets [$[ | S] -> case lists:last(S) of $] -> {lists:droplast(S), PortString}; diff --git a/apps/emqx/src/emqx_tls_certfile_gc.erl b/apps/emqx/src/emqx_tls_certfile_gc.erl index 78dfdbaca..9e2e98b7f 100644 --- a/apps/emqx/src/emqx_tls_certfile_gc.erl +++ b/apps/emqx/src/emqx_tls_certfile_gc.erl @@ -227,8 +227,11 @@ find_managed_files(Filter, Dir) -> false -> Acc end; + (AbsPath, {error, enoent}, Acc) when AbsPath == Dir -> + Acc; (AbsPath, {error, Reason}, Acc) -> - ?SLOG(notice, "filesystem_object_inaccessible", #{ + ?SLOG(notice, #{ + msg => "filesystem_object_inaccessible", abspath => AbsPath, reason => Reason }), diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index dc3a0fac7..498af53e6 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -654,10 +654,13 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) -> %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2 env_handler => fun((AppName :: atom()) -> term()), %% Application env preset before calling `emqx_common_test_helpers:start_apps/2` - env => {AppName :: atom(), Key :: atom(), Val :: term()}, + env => [{AppName :: atom(), Key :: atom(), Val :: term()}], %% Whether to execute `emqx_config:init_load(SchemaMod)` %% default: true load_schema => boolean(), + %% Which node in the cluster to join to. + %% default: first core node + join_to => node(), %% If we want to exercise the scenario where a node joins an %% existing cluster where there has already been some %% configuration changes (via cluster rpc), then we need to enable @@ -692,28 +695,38 @@ emqx_cluster(Specs0, CommonOpts) -> ]), %% Set the default node of the cluster: CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], - JoinTo0 = + JoinTo = case CoreNodes of [First | _] -> First; _ -> undefined end, - JoinTo = - case maps:find(join_to, CommonOpts) of - {ok, true} -> JoinTo0; - {ok, JT} -> JT; - error -> JoinTo0 - end, - [ - {Name, - merge_opts(Opts, #{ - base_port => base_port(Number), + NodeOpts = fun(Number) -> + #{ + base_port => base_port(Number), + env => [ + {mria, core_nodes, CoreNodes}, + {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} + ] + } + end, + RoleOpts = fun + (core) -> + #{ join_to => JoinTo, env => [ - {mria, core_nodes, CoreNodes}, - {mria, node_role, Role}, - {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} + {mria, node_role, core} ] - })} + }; + (replicant) -> + #{ + env => [ + {mria, node_role, replicant}, + {ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}} + ] + } + end, + [ + {Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)} || {{Role, Name, Opts}, Number} <- Specs ]. diff --git a/apps/emqx/test/emqx_crl_cache_SUITE.erl b/apps/emqx/test/emqx_crl_cache_SUITE.erl index 31738e980..6c6337038 100644 --- a/apps/emqx/test/emqx_crl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_crl_cache_SUITE.erl @@ -497,17 +497,24 @@ t_update_config(_Config) -> emqx_config_handler:start_link(), {ok, Pid} = emqx_crl_cache:start_link(), Conf = #{ - refresh_interval => timer:minutes(5), - http_timeout => timer:minutes(10), + refresh_interval => <<"5m">>, + http_timeout => <<"10m">>, capacity => 123 }, ?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)), State = sys:get_state(Pid), - ?assertEqual(Conf, #{ - refresh_interval => element(3, State), - http_timeout => element(4, State), - capacity => element(7, State) - }), + ?assertEqual( + #{ + refresh_interval => timer:minutes(5), + http_timeout => timer:minutes(10), + capacity => 123 + }, + #{ + refresh_interval => element(3, State), + http_timeout => element(4, State), + capacity => element(7, State) + } + ), emqx_config:erase(<<"crl_cache">>), emqx_config_handler:stop(), ok. diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 62c967078..942a262a6 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -33,9 +33,9 @@ init_per_suite(Config) -> <<"enable">> => true, <<"max_count">> => 3, % 0.1s - <<"window_time">> => 100, + <<"window_time">> => <<"100ms">>, %% 2s - <<"ban_time">> => "2s" + <<"ban_time">> => <<"2s">> } ), Config. @@ -119,16 +119,16 @@ t_conf_update(_) -> ?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)), Zones = #{ - <<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}}, - <<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}} + <<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"123s">>}}, + <<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"456s">>}} }, ?assertMatch({ok, _}, emqx:update_config([zones], Zones)), %% new_zone is already deleted ?assertError({config_not_found, _}, get_policy(new_zone)), %% update zone(zone_1) has default. - ?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)), + ?assertEqual(Global#{window_time := 123000}, emqx_flapping:get_policy(zone_1)), %% create zone(zone_2) has default - ?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)), + ?assertEqual(Global#{window_time := 456000}, emqx_flapping:get_policy(zone_2)), %% reset to default(empty) andalso get default from global ?assertMatch({ok, _}, emqx:update_config([zones], #{})), ?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])), @@ -172,13 +172,13 @@ validate_timer(Lists) -> ok. t_window_compatibility_check(_Conf) -> - Flapping = emqx:get_config([flapping_detect]), + Flapping = emqx:get_raw_config([flapping_detect]), ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>), ?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])), %% reset FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]), ok = emqx_config:init_load(emqx_schema, FlappingBin), - ?assertEqual(Flapping, emqx:get_config([flapping_detect])), + ?assertEqual(Flapping, emqx:get_raw_config([flapping_detect])), ok. get_policy(Zone) -> diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index fe608f600..a2a2e5244 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -985,3 +985,18 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(Config) -> ?assertEqual(1, counters:get(CRef, 1)), process_flag(trap_exit, false). + +t_share_subscribe_no_local(Config) -> + ConnFun = ?config(conn_fun, Config), + process_flag(trap_exit, true), + ShareTopic = <<"$share/sharename/TopicA">>, + + {ok, Client} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client), + %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] (Disconnect) + case catch emqtt:subscribe(Client, #{}, [{ShareTopic, [{nl, true}, {qos, 1}]}]) of + {'EXIT', {Reason, _Stk}} -> + ?assertEqual({disconnected, ?RC_PROTOCOL_ERROR, #{}}, Reason) + end, + + process_flag(trap_exit, false). diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl index fefa998f8..d0efc8cd6 100644 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl +++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl @@ -137,15 +137,19 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) -> enable_ocsp_stapling => true, responder_url => <<"http://localhost:9877/">>, issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), - refresh_http_timeout => 15_000, - refresh_interval => 1_000 + refresh_http_timeout => <<"15s">>, + refresh_interval => <<"1s">> } } }, Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, ConfBin = emqx_utils_maps:binary_key_map(Conf), - hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}), - emqx_config:put_listener_conf(Type, Name, [], ListenerOpts), + CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{ + required => false, atom_keys => false + }), + Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf), + ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2), + emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2), snabbkaffe:start_trace(), _Heir = spawn_dummy_heir(), {ok, CachePid} = emqx_ocsp_cache:start_link(), @@ -179,15 +183,19 @@ init_per_testcase(_TestCase, Config) -> enable_ocsp_stapling => true, responder_url => <<"http://localhost:9877/">>, issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), - refresh_http_timeout => 15_000, - refresh_interval => 1_000 + refresh_http_timeout => <<"15s">>, + refresh_interval => <<"1s">> } } }, Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, ConfBin = emqx_utils_maps:binary_key_map(Conf), - hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}), - emqx_config:put_listener_conf(Type, Name, [], ListenerOpts), + CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{ + required => false, atom_keys => false + }), + Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf), + ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2), + emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2), [ {cache_pid, CachePid} | Config diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index ca6a28f51..35d2caa96 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -1316,8 +1316,8 @@ authenticator_examples() -> <<"password">> => ?PH_PASSWORD }, pool_size => 8, - connect_timeout => 5000, - request_timeout => 5000, + connect_timeout => <<"5s">>, + request_timeout => <<"5s">>, enable_pipelining => 100, ssl => #{enable => false} } diff --git a/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl b/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl index 143a24152..cc2785b1e 100644 --- a/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl @@ -80,7 +80,7 @@ listener_mqtt_tcp_conf(Port, EnableAuthn) -> <<"max_connections">> => 1024000, <<"mountpoint">> => <<>>, <<"proxy_protocol">> => false, - <<"proxy_protocol_timeout">> => 3000, + <<"proxy_protocol_timeout">> => <<"3s">>, <<"enable_authn">> => EnableAuthn }. diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 702359509..12558813c 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -105,7 +105,7 @@ set_special_configs(_App) -> <<"headers">> => #{}, <<"ssl">> => #{<<"enable">> => true}, <<"method">> => <<"get">>, - <<"request_timeout">> => 5000 + <<"request_timeout">> => <<"5s">> }). -define(SOURCE2, #{ <<"type">> => <<"mongodb">>, diff --git a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl index e3412e169..c29fe0f5b 100644 --- a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl @@ -70,7 +70,7 @@ t_api(_) -> <<"cache">> => #{ <<"enable">> => false, <<"max_size">> => 32, - <<"ttl">> => 60000 + <<"ttl">> => <<"60s">> } }, @@ -84,7 +84,7 @@ t_api(_) -> <<"cache">> => #{ <<"enable">> => true, <<"max_size">> => 32, - <<"ttl">> => 60000 + <<"ttl">> => <<"60s">> } }, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 1348e411c..25452eb77 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -140,7 +140,7 @@ mk_cluster_specs(Config, Opts) -> {core, emqx_bridge_api_SUITE1, #{}}, {core, emqx_bridge_api_SUITE2, #{}} ], - CommonOpts = #{ + CommonOpts = Opts#{ env => [{emqx, boot_modules, [broker]}], apps => [], % NOTE @@ -157,7 +157,6 @@ mk_cluster_specs(Config, Opts) -> load_apps => ?SUITE_APPS ++ [emqx_dashboard], env_handler => fun load_suite_config/1, load_schema => false, - join_to => maps:get(join_to, Opts, true), priv_data_dir => ?config(priv_dir, Config) }, emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts). diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 025451988..62ba70b33 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -105,29 +105,32 @@ parse_and_check(Config, ConfigString, Name) -> resource_id(Config) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - emqx_bridge_resource:resource_id(BridgeType, Name). + BridgeName = ?config(bridge_name, Config), + emqx_bridge_resource:resource_id(BridgeType, BridgeName). create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). create_bridge(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), ct:pal("creating bridge with config: ~p", [BridgeConfig]), - emqx_bridge:create(BridgeType, Name, BridgeConfig). + emqx_bridge:create(BridgeType, BridgeName, BridgeConfig). create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). create_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + create_bridge_api(BridgeType, BridgeName, BridgeConfig). + +create_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -164,14 +167,38 @@ update_bridge_api(Config, Overrides) -> ct:pal("bridge update result: ~p", [Res]), Res. +op_bridge_api(Op, BridgeType, BridgeName) -> + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of + {ok, {Status = {_, 204, _}, Headers, Body}} -> + {ok, {Status, Headers, Body}}; + {ok, {Status, Headers, Body}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + {error, {Status, Headers, Body}} -> + {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge op result: ~p", [Res]), + Res. + probe_bridge_api(Config) -> probe_bridge_api(Config, _Overrides = #{}). -probe_bridge_api(Config, _Overrides) -> +probe_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - BridgeConfig = ?config(bridge_config, Config), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + BridgeName = ?config(bridge_name, Config), + BridgeConfig0 = ?config(bridge_config, Config), + BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + probe_bridge_api(BridgeType, BridgeName, BridgeConfig). + +probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -289,10 +316,34 @@ t_create_via_http(Config) -> t_start_stop(Config, StopTracePoint) -> BridgeType = ?config(bridge_type, Config), BridgeName = ?config(bridge_name, Config), - ResourceId = resource_id(Config), + BridgeConfig = ?config(bridge_config, Config), + t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint). + +t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ?check_trace( begin - ?assertMatch({ok, _}, create_bridge(Config)), + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes0 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ProbeRes1 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -301,24 +352,48 @@ t_start_stop(Config, StopTracePoint) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - %% Check that the bridge probe API doesn't leak atoms. - ProbeRes0 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} - ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), - AtomsBefore = erlang:system_info(atom_count), - %% Probe again; shouldn't have created more atoms. - ProbeRes1 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + %% `start` bridge to trigger `already_started` + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), - AtomsAfter = erlang:system_info(atom_count), - ?assertEqual(AtomsBefore, AtomsAfter), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), - %% Now stop the bridge. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName), + #{?snk_kind := StopTracePoint}, + 5_000 + ) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) + ), + + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Disable the bridge, which will also stop it. ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( @@ -331,8 +406,11 @@ t_start_stop(Config, StopTracePoint) -> ok end, fun(Trace) -> - %% one for each probe, one for real - ?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)), + %% one for each probe, two for real + ?assertMatch( + [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}], + ?of_kind(StopTracePoint, Trace) + ), ok end ), diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 4fc76fc9e..f8159472b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -28,6 +28,9 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(BRIDGE_TYPE, <<"webhook">>). +-define(BRIDGE_NAME, atom_to_binary(?MODULE)). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -36,15 +39,13 @@ groups() -> init_per_suite(_Config) -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), []. end_per_suite(_Config) -> - ok = emqx_config:put([bridges], #{}), - ok = emqx_config:put_raw([bridges], #{}), - ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), _ = application:stop(emqx_bridge), @@ -53,10 +54,22 @@ end_per_suite(_Config) -> suite() -> [{timetrap, {seconds, 60}}]. +init_per_testcase(t_bad_bridge_config, Config) -> + Config; +init_per_testcase(t_send_async_connection_timeout, Config) -> + ResponseDelayMS = 500, + Server = start_http_server(#{response_delay_ms => ResponseDelayMS}), + [{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config]; init_per_testcase(_TestCase, Config) -> - Config. + Server = start_http_server(#{response_delay_ms => 0}), + [{http_server, Server} | Config]. -end_per_testcase(_TestCase, _Config) -> +end_per_testcase(_TestCase, Config) -> + case ?config(http_server, Config) of + undefined -> ok; + Server -> stop_http_server(Server) + end, + emqx_bridge_testlib:delete_all_bridges(), emqx_common_test_helpers:call_janitor(), ok. @@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) -> %% (Orginally copied from emqx_bridge_api_SUITE) %%------------------------------------------------------------------------------ start_http_server(HTTPServerConfig) -> - ct:pal("Start server\n"), process_flag(trap_exit, true), Parent = self(), + ct:pal("Starting server for ~p", [Parent]), {ok, {Port, Sock}} = listen_on_random_port(), Acceptor = spawn(fun() -> accept_loop(Sock, Parent, HTTPServerConfig) end), + ct:pal("Started server on port ~p", [Port]), timer:sleep(100), #{port => Port, sock => Sock, acceptor => Acceptor}. @@ -160,25 +174,25 @@ parse_http_request_assertive(ReqStr0) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% bridge_async_config(#{port := Port} = Config) -> - Type = maps:get(type, Config, <<"webhook">>), - Name = maps:get(name, Config, atom_to_binary(?MODULE)), + Type = maps:get(type, Config, ?BRIDGE_TYPE), + Name = maps:get(name, Config, ?BRIDGE_NAME), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), - ConnectTimeout = maps:get(connect_timeout, Config, 1), - RequestTimeout = maps:get(request_timeout, Config, 10000), + ConnectTimeout = maps:get(connect_timeout, Config, "1s"), + RequestTimeout = maps:get(request_timeout, Config, "10s"), ResumeInterval = maps:get(resume_interval, Config, "1s"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), ConfigString = io_lib:format( "bridges.~s.~s {\n" " url = \"http://localhost:~p\"\n" - " connect_timeout = \"~ps\"\n" + " connect_timeout = \"~p\"\n" " enable = true\n" " enable_pipelining = 100\n" " max_retries = 2\n" " method = \"post\"\n" " pool_size = ~p\n" " pool_type = \"random\"\n" - " request_timeout = \"~ps\"\n" + " request_timeout = \"~s\"\n" " body = \"${id}\"" " resource_opts {\n" " inflight_window = 100\n" @@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) -> RetConfig. make_bridge(Config) -> - Type = <<"webhook">>, - Name = atom_to_binary(?MODULE), + Type = ?BRIDGE_TYPE, + Name = ?BRIDGE_NAME, BridgeConfig = bridge_async_config(Config#{ name => Name, type => Type @@ -236,16 +250,15 @@ make_bridge(Config) -> %% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. %% When the connection time out all the queued requests where dropped in -t_send_async_connection_timeout(_Config) -> - ResponseDelayMS = 90, - #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), - % Port = 9000, +t_send_async_connection_timeout(Config) -> + ResponseDelayMS = ?config(response_delay_ms, Config), + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "async", - connect_timeout => ResponseDelayMS * 2, - request_timeout => 10000, + connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "s", + request_timeout => "10s", resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, @@ -257,17 +270,16 @@ t_send_async_connection_timeout(_Config) -> ct:pal("Sent messages\n"), MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), receive_request_notifications(MessageIDs, ResponseDelayMS), - stop_http_server(Server), ok. -t_async_free_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_free_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "sync", - connect_timeout => 1_000, - request_timeout => 10_000, + connect_timeout => "1s", + request_timeout => "10s", resource_request_ttl => "10000s" }), %% Fail 5 times then succeed. @@ -285,15 +297,15 @@ t_async_free_retries(_Config) -> do_t_async_retries(Context, {error, {shutdown, normal}}, Fn), ok. -t_async_common_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_common_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "sync", resume_interval => "100ms", - connect_timeout => 1_000, - request_timeout => 10_000, + connect_timeout => "1s", + request_timeout => "10s", resource_request_ttl => "10000s" }), %% Keeps failing until connector gives up. @@ -323,6 +335,39 @@ t_async_common_retries(_Config) -> do_t_async_retries(Context, {error, something_else}, FnFail), ok. +t_bad_bridge_config(_Config) -> + BridgeConfig = bridge_async_config(#{port => 12345}), + ?assertMatch( + {ok, + {{_, 201, _}, _Headers, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Connection refused">> + }}}, + emqx_bridge_testlib:create_bridge_api( + ?BRIDGE_TYPE, + ?BRIDGE_NAME, + BridgeConfig + ) + ), + %% try `/start` bridge + ?assertMatch( + {error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}}, + emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME) + ), + ok. + +t_start_stop(Config) -> + #{port := Port} = ?config(http_server, Config), + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + port => Port + }), + emqx_bridge_testlib:t_start_stop( + ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped + ). + +%% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0), diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 5525a640c..fb16dd749 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -635,9 +635,9 @@ t_bad_sql_parameter(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_ttl">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"request_ttl">> => <<"500ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ) diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl index 0bae413e0..12d678e85 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl @@ -184,7 +184,7 @@ clickhouse_config() -> ] ) ), - connect_timeout => 10000 + connect_timeout => <<"10s">> }, #{<<"config">> => Config}. diff --git a/apps/emqx_bridge_dynamo/rebar.config b/apps/emqx_bridge_dynamo/rebar.config index d3ba1093d..672e8efc2 100644 --- a/apps/emqx_bridge_dynamo/rebar.config +++ b/apps/emqx_bridge_dynamo/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}} +{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 884f160f9..f26e4037b 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -135,8 +135,8 @@ bridge_config(TestCase, _TestGroup, Config) -> " iotdb_version = \"~s\"\n" " pool_size = 1\n" " resource_opts = {\n" - " health_check_interval = 5000\n" - " request_ttl = 30000\n" + " health_check_interval = \"5s\"\n" + " request_ttl = 30s\n" " query_mode = \"async\"\n" " worker_pool_size = 1\n" " }\n" diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 33c207c39..74fde6426 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -590,7 +590,7 @@ kafka_config(TestCase, _KafkaType, Config) -> " kafka {\n" " max_batch_bytes = 896KB\n" " max_rejoin_attempts = 5\n" - " offset_commit_interval_seconds = 3\n" + " offset_commit_interval_seconds = 3s\n" %% todo: matrix this " offset_reset_policy = latest\n" " }\n" diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 0ccc19778..3b558200c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -307,7 +307,7 @@ bridges.kafka_consumer.my_consumer { kafka { max_batch_bytes = 896KB max_rejoin_attempts = 5 - offset_commit_interval_seconds = 3 + offset_commit_interval_seconds = 3s offset_reset_policy = latest } topic_mapping = [ diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index 3563e0774..93224d5ca 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -298,9 +298,9 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_ttl">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"request_ttl">> => <<"500ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ), diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 7644921f0..9399f6029 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -456,9 +456,9 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_ttl">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"request_ttl">> => <<"500ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ), diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl index 2a9888451..34bf5c702 100644 --- a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -247,7 +247,6 @@ cluster(Specs, Config) -> {env, Env}, {apps, [emqx_conf]}, {load_schema, false}, - {join_to, true}, {priv_data_dir, PrivDataDir}, {env_handler, fun (emqx) -> diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index 8e056c018..cdebfeaf7 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -55,7 +55,7 @@ log.console_handler { burst_limit { enable = true max_count = 10000 - window_time = 1000 + window_time = 1s } chars_limit = unlimited drop_mode_qlen = 3000 @@ -66,9 +66,9 @@ log.console_handler { max_depth = 100 overload_kill { enable = true - mem_size = 31457280 + mem_size = \"30MB\" qlen = 20000 - restart_after = 5000 + restart_after = \"5s\" } single_line = true supervisor_reports = error @@ -80,7 +80,7 @@ log.file_handlers { burst_limit { enable = true max_count = 10000 - window_time = 1000 + window_time = 1s } chars_limit = unlimited drop_mode_qlen = 3000 @@ -93,9 +93,9 @@ log.file_handlers { max_size = \"1024MB\" overload_kill { enable = true - mem_size = 31457280 + mem_size = \"30MB\" qlen = 20000 - restart_after = 5000 + restart_after = \"5s\" } rotation {count = 20, enable = true} single_line = true diff --git a/apps/emqx_conf/test/emqx_global_gc_SUITE.erl b/apps/emqx_conf/test/emqx_global_gc_SUITE.erl index ec1e20b3d..036f97643 100644 --- a/apps/emqx_conf/test/emqx_global_gc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_global_gc_SUITE.erl @@ -35,7 +35,7 @@ t_run_gc(_) -> node => #{ cookie => <<"cookie">>, data_dir => <<"data">>, - global_gc_interval => 1000 + global_gc_interval => <<"1s">> } }, emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 149704f76..ce8a1a1a5 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -16,11 +16,10 @@ -module(emqx_connector_http). --include("emqx_connector.hrl"). - -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). @@ -219,10 +218,31 @@ on_start( base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case ehttpc_sup:start_pool(InstId, PoolOpts) of - {ok, _} -> {ok, State}; - {error, {already_started, _}} -> {ok, State}; - {error, Reason} -> {error, Reason} + case start_pool(InstId, PoolOpts) of + ok -> + case do_get_status(InstId, ConnectTimeout) of + ok -> + {ok, State}; + Error -> + ok = ehttpc_sup:stop_pool(InstId), + Error + end; + Error -> + Error + end. + +start_pool(PoolName, PoolOpts) -> + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> + ok; + {error, {already_started, _}} -> + ?SLOG(warning, #{ + msg => "emqx_connector_on_start_already_started", + pool_name => PoolName + }), + ok; + Error -> + Error end. on_stop(InstId, _State) -> @@ -230,7 +250,9 @@ on_stop(InstId, _State) -> msg => "stopping_http_connector", connector => InstId }), - ehttpc_sup:stop_pool(InstId). + Res = ehttpc_sup:stop_pool(InstId), + ?tp(emqx_connector_http_stopped, #{instance_id => InstId}), + Res. on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of diff --git a/apps/emqx_connector/test/emqx_connector_http_tests.erl b/apps/emqx_connector/test/emqx_connector_http_tests.erl index 2dc2119f7..c5f6dfe78 100644 --- a/apps/emqx_connector/test/emqx_connector_http_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_http_tests.erl @@ -24,6 +24,8 @@ wrap_auth_headers_test_() -> fun() -> meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), + meck:expect(ehttpc, workers, 1, [{self, self()}]), + meck:expect(ehttpc, health_check, 2, ok), meck:expect(ehttpc_pool, pick_worker, 1, self()), meck:expect(emqx_resource, allocate_resource, 3, ok), [ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource] diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index 957cc6120..31ad4f831 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -195,7 +195,7 @@ enable(Bool) -> bind(Port) -> {"bind", ?HOCON( - ?UNION([non_neg_integer(), emqx_schema:ip_port()]), + emqx_schema:ip_port(), #{ default => 0, required => false, diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 8babd267b..bd756620d 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -225,7 +225,7 @@ t_update_conf(_Config) -> DeletedConf = Conf#{<<"servers">> => Servers2}, validate_servers(Path, DeletedConf, Servers2), [L1, L2 | Servers3] = Servers, - UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000}, + UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => <<"1s">>}, UpdatedServers = [L1, UpdateL2 | Servers3], UpdatedConf = Conf#{<<"servers">> => UpdatedServers}, validate_servers(Path, UpdatedConf, UpdatedServers), diff --git a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl index c03b3f231..1178f244b 100644 --- a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl @@ -67,7 +67,7 @@ init_per_suite(Config) -> _ = emqx_exhook_demo_svr:start(), load_cfg(?CONF_DEFAULT), emqx_mgmt_api_test_util:init_suite([emqx_exhook]), - [Conf] = emqx:get_config([exhook, servers]), + [Conf] = emqx:get_raw_config([exhook, servers]), [{template, Conf} | Config]. end_per_suite(Config) -> @@ -157,8 +157,8 @@ t_get(_) -> t_add(Cfg) -> Template = proplists:get_value(template, Cfg), Instance = Template#{ - name => <<"test1">>, - url => "http://127.0.0.1:9001" + <<"name">> => <<"test1">>, + <<"url">> => "http://127.0.0.1:9001" }, {ok, Data} = request_api( post, @@ -186,8 +186,8 @@ t_add(Cfg) -> t_add_duplicate(Cfg) -> Template = proplists:get_value(template, Cfg), Instance = Template#{ - name => <<"test1">>, - url => "http://127.0.0.1:9001" + <<"name">> => <<"test1">>, + <<"url">> => "http://127.0.0.1:9001" }, {error, _Reason} = request_api( @@ -203,8 +203,8 @@ t_add_duplicate(Cfg) -> t_add_with_bad_name(Cfg) -> Template = proplists:get_value(template, Cfg), Instance = Template#{ - name => <<"🤔">>, - url => "http://127.0.0.1:9001" + <<"name">> => <<"🤔">>, + <<"url">> => "http://127.0.0.1:9001" }, {error, _Reason} = request_api( @@ -298,7 +298,7 @@ t_hooks(_Cfg) -> t_update(Cfg) -> Template = proplists:get_value(template, Cfg), - Instance = Template#{enable => false}, + Instance = Template#{<<"enable">> => false}, {ok, <<"{\"", _/binary>>} = request_api( put, api_path(["exhooks", "default"]), diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index 3fd279c76..7bc3a1d90 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -19,6 +19,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_ft_api.hrl"). %% Swagger specs from hocon schema -export([ @@ -61,7 +62,7 @@ schema("/file_transfer/files") -> #{ 'operationId' => '/file_transfer/files', get => #{ - tags => [<<"file_transfer">>], + tags => ?TAGS, summary => <<"List all uploaded files">>, description => ?DESC("file_list"), parameters => [ @@ -83,7 +84,7 @@ schema("/file_transfer/files/:clientid/:fileid") -> #{ 'operationId' => '/file_transfer/files/:clientid/:fileid', get => #{ - tags => [<<"file_transfer">>], + tags => ?TAGS, summary => <<"List files uploaded in a specific transfer">>, description => ?DESC("file_list_transfer"), parameters => [ diff --git a/apps/emqx_ft/src/emqx_ft_api.hrl b/apps/emqx_ft/src/emqx_ft_api.hrl new file mode 100644 index 000000000..ef38757f3 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_api.hrl @@ -0,0 +1,10 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(__EMQX_FT_API__). +-define(__EMQX_FT_API__, 42). + +-define(TAGS, [<<"File Transfer">>]). + +-endif. diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index c96df224c..0d9e86a49 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -96,7 +96,7 @@ handle_event( complete -> {next_state, start_assembling, NSt, ?internal([])}; {incomplete, _} -> - Nodes = mria_mnesia:running_nodes() -- [node()], + Nodes = emqx:running_nodes() -- [node()], {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}; % TODO: recovery? {error, _} = Error -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 589949bda..e37ba25af 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -361,7 +361,7 @@ list(_Options, Query) -> end. list(QueryIn) -> - {Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(mria_mnesia:running_nodes())), + {Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(emqx:running_nodes())), list_nodes(NodeQuery, Nodes, #{items => []}). list_nodes(Query, Nodes = [Node | Rest], Acc) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl index 40944c0e8..8a475afd2 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl @@ -21,6 +21,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include("emqx_ft_api.hrl"). %% Swagger specs from hocon schema -export([ @@ -60,7 +61,7 @@ schema("/file_transfer/file") -> #{ 'operationId' => '/file_transfer/file', get => #{ - tags => [<<"file_transfer">>], + tags => ?TAGS, summary => <<"Download a particular file">>, description => ?DESC("file_get"), parameters => [ diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index c48c77d93..78100bab4 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -77,7 +77,7 @@ set_special_configs(Config) -> % complete transfers. Storage = emqx_utils_maps:deep_merge( emqx_ft_test_helpers:local_storage(Config), - #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}} + #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => <<"0s">>}}}} ), emqx_ft_test_helpers:load_config(#{ <<"enable">> => true, diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 18a8e9841..d2d65750f 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -24,6 +24,8 @@ -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]). +-define(SUITE_APPS, [emqx_conf, emqx_ft]). + all() -> [ {group, single}, @@ -49,10 +51,9 @@ end_per_suite(_Config) -> init_per_group(Group = cluster, Config) -> Cluster = mk_cluster_specs(Config), ct:pal("Starting ~p", [Cluster]), - Nodes = [ - emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()}) - || {Name, Opts} <- Cluster - ], + Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster], + InitResult = erpc:multicall(Nodes, fun() -> init_node(Config) end), + [] = [{Node, Error} || {Node, {R, Error}} <- lists:zip(Nodes, InitResult), R /= ok], [{group, Group}, {cluster_nodes, Nodes} | Config]; init_per_group(Group, Config) -> [{group, Group} | Config]. @@ -65,22 +66,29 @@ end_per_group(cluster, Config) -> end_per_group(_Group, _Config) -> ok. -mk_cluster_specs(Config) -> +mk_cluster_specs(_Config) -> Specs = [ {core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}}, - {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}} - ], - CommOpts = [ - {env, [{emqx, boot_modules, [broker, listeners]}]}, - {apps, [emqx_ft]}, - {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, - {env_handler, emqx_ft_test_helpers:env_handler(Config)} + {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}}, + {replicant, emqx_ft_api_SUITE3, #{listener_ports => [{tcp, 4883}]}} ], + CommOpts = #{ + env => [ + {mria, db_backend, rlog}, + {emqx, boot_modules, [broker, listeners]} + ], + apps => [], + load_apps => ?SUITE_APPS, + conf => [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]] + }, emqx_common_test_helpers:emqx_cluster( Specs, CommOpts ). +init_node(Config) -> + ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, emqx_ft_test_helpers:env_handler(Config)). + init_per_testcase(Case, Config) -> [{tc, Case} | Config]. end_per_testcase(t_ft_disabled, _Config) -> @@ -96,7 +104,7 @@ t_list_files(Config) -> ClientId = client_id(Config), FileId = <<"f1">>, - Node = lists:last(cluster(Config)), + Node = lists:last(test_nodes(Config)), ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), {ok, 200, #{<<"files">> := Files}} = @@ -124,7 +132,7 @@ t_download_transfer(Config) -> ClientId = client_id(Config), FileId = <<"f1">>, - Node = lists:last(cluster(Config)), + Node = lists:last(test_nodes(Config)), ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), ?assertMatch( @@ -184,7 +192,7 @@ t_download_transfer(Config) -> t_list_files_paging(Config) -> ClientId = client_id(Config), NFiles = 20, - Nodes = cluster(Config), + Nodes = test_nodes(Config), Uploads = [ {mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)} || N <- lists:seq(1, NFiles) @@ -280,8 +288,13 @@ t_ft_disabled(_Config) -> %% Helpers %%-------------------------------------------------------------------- -cluster(Config) -> - [node() | proplists:get_value(cluster_nodes, Config, [])]. +test_nodes(Config) -> + case proplists:get_value(cluster_nodes, Config, []) of + [] -> + [node()]; + Nodes -> + Nodes + end. client_id(Config) -> iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])). diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index a041dcd50..1b952bdd7 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -36,7 +36,7 @@ start_additional_node(Config, Name) -> ). stop_additional_node(Node) -> - ok = rpc:call(Node, ekka, leave, []), + _ = rpc:call(Node, ekka, leave, []), ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]), ok = emqx_common_test_helpers:stop_slave(Node), ok. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index d527e1e06..b43f4ba98 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -261,7 +261,7 @@ common_listener_opts() -> )}, {bind, sc( - hoconsc:union([ip_port(), integer()]), + ip_port(), #{desc => ?DESC(gateway_common_listener_bind)} )}, {max_connections, diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl index 7e1f6f49c..10583e41a 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl @@ -56,7 +56,7 @@ fields(exproto_grpc_server) -> [ {bind, sc( - hoconsc:union([ip_port(), integer()]), + ip_port(), #{ required => true, desc => ?DESC(exproto_grpc_server_bind) diff --git a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl index 9f388b07c..1779bf842 100644 --- a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -192,7 +192,7 @@ default_config(Overrides) -> " xml_dir = \"~s\"\n" " lifetime_min = 1s\n" " lifetime_max = 86400s\n" - " qmode_time_window = 22\n" + " qmode_time_window = 22s\n" " auto_observe = ~w\n" " mountpoint = \"lwm2m/${username}\"\n" " update_msg_publish_condition = contains_object_list\n" diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 5e59bd057..bdb9cf666 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -382,15 +382,17 @@ import_mnesia_tab(BackupDir, TabName, Opts) -> end. restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) -> - BackupNameToImport = MnesiaBackupFileName ++ "_for_import", - Prepared = + Validated = catch mnesia:traverse_backup( - MnesiaBackupFileName, BackupNameToImport, fun backup_converter/2, 0 + MnesiaBackupFileName, mnesia_backup, dummy, read_only, fun validate_mnesia_backup/2, 0 ), try - case Prepared of + case Validated of {ok, _} -> - Restored = mnesia:restore(BackupNameToImport, [{default_op, keep_tables}]), + %% As we use keep_tables option, we don't need to modify 'copies' (nodes) + %% in a backup file before restoring it, as `mnsia:restore/2` will ignore + %% backed-up schema and keep the current table schema unchanged + Restored = mnesia:restore(MnesiaBackupFileName, [{default_op, keep_tables}]), case Restored of {atomic, [TabName]} -> ok; @@ -416,30 +418,23 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) -> end after %% Cleanup files as soon as they are not needed any more for more efficient disk usage - _ = file:delete(BackupNameToImport), _ = file:delete(MnesiaBackupFileName) end. -backup_converter({schema, Tab, CreateList}, Acc) -> - check_rec_attributes(Tab, CreateList), - {[{schema, Tab, lists:map(fun convert_copies/1, CreateList)}], Acc}; -backup_converter(Other, Acc) -> - {[Other], Acc}. - -check_rec_attributes(Tab, CreateList) -> +%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. +%% Looks like there is no clean way to abort traversal without triggering any error reporting, +%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... +validate_mnesia_backup({schema, Tab, CreateList} = Schema, Acc) -> ImportAttributes = proplists:get_value(attributes, CreateList), Attributes = mnesia:table_info(Tab, attributes), case ImportAttributes =/= Attributes of true -> throw({error, different_table_schema}); false -> - ok - end. - -convert_copies({K, [_ | _]}) when K == ram_copies; K == disc_copies; K == disc_only_copies -> - {K, [node()]}; -convert_copies(Other) -> - Other. + {[Schema], Acc} + end; +validate_mnesia_backup(Other, Acc) -> + {[Other], Acc}. extract_backup(BackupFileName) -> BackupDir = root_backup_dir(), diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index c47cea07e..e86a76e1c 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -295,7 +295,6 @@ cluster(Specs) -> {env, Env}, {apps, [emqx_conf]}, {load_schema, false}, - {join_to, true}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, []), diff --git a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl index 4154752ab..8d98b974d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl @@ -159,7 +159,6 @@ cluster(Specs) -> {env, Env}, {apps, [emqx_conf, emqx_management]}, {load_schema, false}, - {join_to, true}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, []), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index 9df6d2138..0325ab030 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -444,7 +444,6 @@ cluster(Config) -> env => [{mria, db_backend, rlog}], load_schema => true, start_autocluster => true, - join_to => true, listener_ports => [], conf => [{[dashboard, listeners, http, bind], 0}], env_handler => diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 5eb8ca148..891e0a076 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -36,6 +36,7 @@ roots() -> array("rewrite", #{ desc => "List of topic rewrite rules.", importance => ?IMPORTANCE_HIDDEN, + validator => fun rewrite_validator/1, default => [] }), array("topic_metrics", #{ @@ -45,6 +46,37 @@ roots() -> }) ]. +rewrite_validator(Rules) -> + case + lists:foldl( + fun + (#{<<"action">> := subscribe}, Acc) -> + Acc; + (#{<<"dest_topic">> := DestTopic}, InvalidAcc) -> + try + true = emqx_topic:validate(name, DestTopic), + InvalidAcc + catch + _:_ -> + [DestTopic | InvalidAcc] + end + end, + [], + Rules + ) + of + [] -> + ok; + InvalidTopics -> + { + error, + #{ + msg => "cannot_use_wildcard_for_destination_topic", + invalid_topics => InvalidTopics + } + } + end. + fields("delayed") -> [ {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, diff --git a/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl index 528102d9e..6c65d351b 100644 --- a/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_api_SUITE.erl @@ -95,6 +95,52 @@ t_mqtt_topic_rewrite_limit(_) -> ) ). +t_mqtt_topic_rewrite_wildcard(_) -> + BadRules = [ + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/test/#">> + }, + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/#/test">> + }, + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/test/+">> + }, + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"^test/(.+)$">>, + <<"dest_topic">> => <<"bad/+/test">> + } + ], + + Rules = lists:flatten( + lists:map( + fun(Rule) -> + [Rule#{<<"action">> => <<"publish">>}, Rule#{<<"action">> => <<"all">>}] + end, + BadRules + ) + ), + lists:foreach( + fun(Rule) -> + ?assertMatch( + {ok, 500, _}, + request( + put, + uri(["mqtt", "topic_rewrite"]), + [Rule] + ) + ) + end, + Rules + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index abae139ad..7c486adb0 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -468,7 +468,7 @@ fields(rebalance_evacuation_start) -> )}, {"wait_takeover", mk( - pos_integer(), + emqx_schema:timeout_duration_s(), #{ desc => ?DESC(wait_takeover), required => false @@ -709,24 +709,24 @@ fields(global_status) -> rebalance_example() -> #{ - wait_health_check => 10, + wait_health_check => <<"10s">>, conn_evict_rate => 10, sess_evict_rate => 20, abs_conn_threshold => 10, rel_conn_threshold => 1.5, abs_sess_threshold => 10, rel_sess_threshold => 1.5, - wait_takeover => 10, + wait_takeover => <<"10s">>, nodes => [<<"othernode@127.0.0.1">>] }. rebalance_evacuation_example() -> #{ - wait_health_check => 10, + wait_health_check => <<"10s">>, conn_evict_rate => 100, sess_evict_rate => 100, redirect_to => <<"othernode:1883">>, - wait_takeover => 10, + wait_takeover => <<"10s">>, migrate_to => [<<"othernode@127.0.0.1">>] }. diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl index 119b4a5d9..bb691a754 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl @@ -67,7 +67,6 @@ t_start_evacuation_validation(Config) -> BadOpts = [ #{conn_evict_rate => <<"conn">>}, #{sess_evict_rate => <<"sess">>}, - #{redirect_to => 123}, #{wait_takeover => <<"wait">>}, #{wait_health_check => <<"wait">>}, #{migrate_to => []}, @@ -83,7 +82,8 @@ t_start_evacuation_validation(Config) -> api_post( ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"], Opts - ) + ), + Opts ) end, BadOpts @@ -103,8 +103,8 @@ t_start_evacuation_validation(Config) -> #{ conn_evict_rate => 10, sess_evict_rate => 10, - wait_takeover => 10, - wait_health_check => 10, + wait_takeover => <<"10s">>, + wait_health_check => <<"10s">>, redirect_to => <<"srv">>, migrate_to => [atom_to_binary(RecipientNode)] } @@ -166,8 +166,8 @@ t_start_rebalance_validation(Config) -> #{ conn_evict_rate => 10, sess_evict_rate => 10, - wait_takeover => 10, - wait_health_check => 10, + wait_takeover => <<"10s">>, + wait_health_check => <<"10s">>, abs_conn_threshold => 10, rel_conn_threshold => 1.001, abs_sess_threshold => 10, diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c1adb8ecd..2e4822a2f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -223,8 +223,7 @@ restart(ResId, Opts) when is_binary(ResId) -> start(ResId, Opts) -> case safe_call(ResId, start, ?T_OPERATION) of ok -> - _ = wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), - ok; + wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)); {error, _Reason} = Error -> Error end. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index eb0a48b06..59687eb8d 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -88,7 +88,7 @@ resource_opts_meta() -> desc => ?DESC(<<"resource_opts">>) }. -worker_pool_size(type) -> non_neg_integer(); +worker_pool_size(type) -> range(1, 1024); worker_pool_size(desc) -> ?DESC("worker_pool_size"); worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false; diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 96e22c6b6..b95d8c8bf 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -246,6 +246,9 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid} on_get_status(_InstId, #{health_check_error := true}) -> ?tp(connector_demo_health_check_error, #{}), disconnected; +on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) -> + ?tp(connector_demo_health_check_error, #{}), + {disconnected, State, Message}; on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), case is_process_alive(Pid) of diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5883614aa..934a97829 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -40,6 +40,7 @@ groups() -> init_per_testcase(_, Config) -> ct:timetrap({seconds, 30}), emqx_connector_demo:set_callback_mode(always_sync), + snabbkaffe:start_trace(), Config. end_per_testcase(_, _Config) -> @@ -1145,10 +1146,33 @@ t_auto_retry(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} + #{health_check_interval => 100} ), ?assertEqual(ok, Res). +%% tests resources that have an asynchronous start: they are created +%% without problems, but later some issue is found when calling the +%% health check. +t_start_throw_error(_Config) -> + Message = "something went wrong", + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, health_check_error => {msg, Message}}, + #{health_check_interval => 100} + ), + #{?snk_kind := connector_demo_health_check_error}, + 1_000 + ) + ), + %% Now, if we try to "reconnect" (restart) it, we should get the error + ?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})), + ok. + t_health_check_disconnected(_) -> ?check_trace( begin @@ -1157,7 +1181,7 @@ t_health_check_disconnected(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} + #{health_check_interval => 100} ), ?assertEqual( {ok, disconnected}, diff --git a/apps/emqx_resource/test/emqx_resource_schema_tests.erl b/apps/emqx_resource/test/emqx_resource_schema_tests.erl index 219861a4e..0935ee1c6 100644 --- a/apps/emqx_resource/test/emqx_resource_schema_tests.erl +++ b/apps/emqx_resource/test/emqx_resource_schema_tests.erl @@ -74,6 +74,44 @@ health_check_interval_validator_test_() -> ) ]. +worker_pool_size_test_() -> + BaseConf = parse(webhook_bridge_health_check_hocon(<<"15s">>)), + Check = fun(WorkerPoolSize) -> + Conf = emqx_utils_maps:deep_put( + [ + <<"bridges">>, + <<"webhook">>, + <<"simple">>, + <<"resource_opts">>, + <<"worker_pool_size">> + ], + BaseConf, + WorkerPoolSize + ), + #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf), + #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf, + WPS + end, + AssertThrow = fun(WorkerPoolSize) -> + ?assertThrow( + {_, [ + #{ + kind := validation_error, + reason := #{expected_type := _}, + value := WorkerPoolSize + } + ]}, + Check(WorkerPoolSize) + ) + end, + [ + ?_assertEqual(1, Check(1)), + ?_assertEqual(100, Check(100)), + ?_assertEqual(1024, Check(1024)), + ?_test(AssertThrow(0)), + ?_test(AssertThrow(1025)) + ]. + %%=========================================================================== %% Helper functions %%=========================================================================== diff --git a/apps/emqx_retainer/rebar.config b/apps/emqx_retainer/rebar.config index a178e10a1..ab4b8ed37 100644 --- a/apps/emqx_retainer/rebar.config +++ b/apps/emqx_retainer/rebar.config @@ -30,7 +30,7 @@ {profiles, [ {test, [ {deps, [ - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}} ]} ]} ]}. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index bce62aa25..c925e925d 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -483,26 +483,25 @@ t_clear_expired(_) -> with_conf(ConfMod, Case). t_max_payload_size(_) -> - ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end, + ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end, Case = fun() -> emqx_retainer:clean(), timer:sleep(500), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - + Payload = iolist_to_binary(lists:duplicate(1024, <<"0">>)), emqtt:publish( C1, <<"retained/1">>, #{}, - <<"1234">>, + Payload, [{qos, 0}, {retain, true}] ), - emqtt:publish( C1, <<"retained/2">>, #{}, - <<"1234567">>, + <<"1", Payload/binary>>, [{qos, 0}, {retain, true}] ), diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index e3fef7e62..b2a6a549e 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -70,7 +70,8 @@ Op =:= '-' orelse Op =:= '*' orelse Op =:= '/' orelse - Op =:= 'div') + Op =:= 'div' orelse + Op =:= 'mod') ). %% Compare operators diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 2ec32173f..c9feda601 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -66,6 +66,7 @@ groups() -> t_sqlselect_with_3rd_party_impl2, t_sqlselect_with_3rd_party_funcs_unknown, t_sqlselect_001, + t_sqlselect_002, t_sqlselect_inject_props, t_sqlselect_01, t_sqlselect_02, @@ -1089,6 +1090,36 @@ t_sqlselect_001(_Config) -> ) ). +t_sqlselect_002(_Config) -> + %% Verify that the div and mod can be used both as infix operations and as + %% function calls + Sql = + "" + "select 2 mod 2 as mod1,\n" + " mod(3, 2) as mod2,\n" + " 4 div 2 as div1,\n" + " div(7, 2) as div2\n" + " from \"t/#\" " + "", + ?assertMatch( + {ok, #{ + <<"mod1">> := 0, + <<"mod2">> := 1, + <<"div1">> := 2, + <<"div2">> := 3 + }}, + emqx_rule_sqltester:test( + #{ + sql => Sql, + context => + #{ + payload => #{<<"what">> => 4}, + topic => <<"t/a">> + } + } + ) + ). + t_sqlselect_inject_props(_Config) -> SQL = "SELECT json_decode(payload) as p, payload, " diff --git a/apps/emqx_s3/rebar.config b/apps/emqx_s3/rebar.config index b1483e028..65f740aa3 100644 --- a/apps/emqx_s3/rebar.config +++ b/apps/emqx_s3/rebar.config @@ -1,6 +1,6 @@ {deps, [ {emqx, {path, "../../apps/emqx"}}, - {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}} + {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}} ]}. {project_plugins, [erlfmt]}. diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl index 3c7753857..ad887d1a6 100644 --- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -84,7 +84,7 @@ t_full_config(_Config) -> <<"min_part_size">> => <<"10mb">>, <<"acl">> => <<"public_read">>, <<"transport_options">> => #{ - <<"connect_timeout">> => 30000, + <<"connect_timeout">> => <<"30s">>, <<"enable_pipelining">> => 200, <<"pool_size">> => 10, <<"pool_type">> => <<"random">>, diff --git a/apps/emqx_s3/test/emqx_s3_test_helpers.erl b/apps/emqx_s3/test/emqx_s3_test_helpers.erl index a73f618af..26740c18b 100644 --- a/apps/emqx_s3/test/emqx_s3_test_helpers.erl +++ b/apps/emqx_s3/test/emqx_s3_test_helpers.erl @@ -64,10 +64,10 @@ base_raw_config(tcp) -> <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), <<"host">> => ?TCP_HOST, <<"port">> => ?TCP_PORT, - <<"max_part_size">> => 10 * 1024 * 1024, + <<"max_part_size">> => <<"10MB">>, <<"transport_options">> => #{ - <<"request_timeout">> => 2000 + <<"request_timeout">> => <<"2s">> } }; base_raw_config(tls) -> @@ -77,10 +77,10 @@ base_raw_config(tls) -> <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), <<"host">> => ?TLS_HOST, <<"port">> => ?TLS_PORT, - <<"max_part_size">> => 10 * 1024 * 1024, + <<"max_part_size">> => <<"10MB">>, <<"transport_options">> => #{ - <<"request_timeout">> => 2000, + <<"request_timeout">> => <<"2s">>, <<"ssl">> => #{ <<"enable">> => true, <<"cacertfile">> => bin(cert_path("ca.crt")), diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 311bcf62e..b4179caff 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -139,7 +139,7 @@ settings(get, _) -> {200, emqx:get_raw_config([slow_subs], #{})}; settings(put, #{body := Body}) -> case emqx_slow_subs:update_settings(Body) of - {ok, #{config := NewConf}} -> + {ok, #{raw_config := NewConf}} -> {200, NewConf}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])), diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl index 5196868c7..af9b7550f 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -41,7 +41,7 @@ "{\n" " enable = true\n" " top_k_num = 5,\n" - " expire_interval = 60000\n" + " expire_interval = 60s\n" " stats_type = whole\n" "}" "" @@ -137,36 +137,33 @@ t_clear(_) -> ?assertEqual(0, ets:info(?TOPK_TAB, size)). t_settting(_) -> - Conf = emqx:get_config([slow_subs]), - Conf2 = Conf#{stats_type => internal}, + RawConf = emqx:get_raw_config([slow_subs]), + RawConf2 = RawConf#{<<"stats_type">> => <<"internal">>}, {ok, Data} = request_api( put, api_path(["slow_subscriptions", "settings"]), [], auth_header_(), - Conf2 + RawConf2 ), Return = decode_json(Data), + Expect = emqx_config:fill_defaults(RawConf2), - ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return), + ?assertEqual(Expect, Return), + timer:sleep(800), {ok, GetData} = request_api( get, api_path(["slow_subscriptions", "settings"]), [], auth_header_() ), - - timer:sleep(1000), - GetReturn = decode_json(GetData), - - ?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn). + ?assertEqual(Expect, GetReturn). decode_json(Data) -> - BinJosn = emqx_utils_json:decode(Data, [return_maps]), - emqx_utils_maps:unsafe_atom_key_map(BinJosn). + emqx_utils_json:decode(Data, [return_maps]). request_api(Method, Url, Auth) -> request_api(Method, Url, [], Auth, []). diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 6cf85fb5d..830845b60 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -20,6 +20,8 @@ %% [TODO] Cleanup so the instruction below is not necessary. -elvis([{elvis_style, god_modules, disable}]). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([ merge_opts/2, maybe_apply/2, @@ -432,7 +434,7 @@ nolink_apply(Fun) -> nolink_apply(Fun, infinity). -spec nolink_apply(function(), timer:timeout()) -> term(). nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> Caller = self(), - ResRef = make_ref(), + ResRef = alias([reply]), Middleman = erlang:spawn( fun() -> process_flag(trap_exit, true), @@ -446,7 +448,8 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> C:E:S -> {exception, {C, E, S}} end, - _ = erlang:send(Caller, {ResRef, Res}), + _ = erlang:send(ResRef, {ResRef, Res}), + ?tp(pmap_middleman_sent_response, #{}), exit(normal) end ), @@ -460,7 +463,7 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> exit(normal); {'EXIT', Worker, Reason} -> %% worker exited with some reason other than 'normal' - _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}), exit(normal) end end @@ -473,8 +476,21 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> {ResRef, {'EXIT', Reason}} -> exit(Reason) after Timeout -> + %% possible race condition: a message was received just as we enter the after + %% block. + ?tp(pmap_timeout, #{}), + unalias(ResRef), exit(Middleman, kill), - exit(timeout) + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after 0 -> + exit(timeout) + end end. safe_to_existing_atom(In) -> diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 6c6bcf8d3..12e99c917 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SOCKOPTS, [ binary, @@ -208,3 +209,34 @@ t_pmap_exception(_) -> [{2, 3}, {3, 4}, error] ) ). + +t_pmap_late_reply(_) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pmap_middleman_sent_response}, + #{?snk_kind := pmap_timeout} + ), + Timeout = 100, + Res = + catch emqx_utils:pmap( + fun(_) -> + process_flag(trap_exit, true), + timer:sleep(3 * Timeout), + done + end, + [1, 2, 3], + Timeout + ), + receive + {Ref, LateReply} when is_reference(Ref) -> + ct:fail("should not receive late reply: ~p", [LateReply]) + after (5 * Timeout) -> + ok + end, + ?assertMatch([done, done, done], Res), + ok + end, + [] + ), + ok. diff --git a/changes/ce/fix-11004.en.md b/changes/ce/fix-11004.en.md new file mode 100644 index 000000000..3c6b580d7 --- /dev/null +++ b/changes/ce/fix-11004.en.md @@ -0,0 +1 @@ +Do not allow wildcards for destination topic in rewrite rules. diff --git a/changes/ce/fix-11026.en.md b/changes/ce/fix-11026.en.md new file mode 100644 index 000000000..d07157b5f --- /dev/null +++ b/changes/ce/fix-11026.en.md @@ -0,0 +1 @@ +Addressed an inconsistency in the usage of 'div' and 'mod' operations within the rule engine. Previously, the 'div' operation was only usable as an infix operation and 'mod' could only be applied through a function call. With this change, both 'div' and 'mod' can be used via function call syntax and infix syntax. diff --git a/changes/ce/fix-11037.en.md b/changes/ce/fix-11037.en.md new file mode 100644 index 000000000..39b2dc4a6 --- /dev/null +++ b/changes/ce/fix-11037.en.md @@ -0,0 +1 @@ +When starting an HTTP connector EMQX now returns a descriptive error in case the system is unable to connect to the remote target system. diff --git a/changes/ce/fix-11074.en.md b/changes/ce/fix-11074.en.md new file mode 100644 index 000000000..fa557b3a1 --- /dev/null +++ b/changes/ce/fix-11074.en.md @@ -0,0 +1 @@ +Fix to adhere to Protocol spec MQTT-5.0 [MQTT-3.8.3-4]. diff --git a/changes/ce/fix-11094.en.md b/changes/ce/fix-11094.en.md new file mode 100644 index 000000000..e73a8635f --- /dev/null +++ b/changes/ce/fix-11094.en.md @@ -0,0 +1 @@ +Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge. diff --git a/changes/ce/fix-11103.en.md b/changes/ce/fix-11103.en.md new file mode 100644 index 000000000..794da067a --- /dev/null +++ b/changes/ce/fix-11103.en.md @@ -0,0 +1 @@ +Updated `erlcloud` dependency. diff --git a/changes/ce/fix-11106.en.md b/changes/ce/fix-11106.en.md new file mode 100644 index 000000000..2fa3053fa --- /dev/null +++ b/changes/ce/fix-11106.en.md @@ -0,0 +1,3 @@ +Added a validation for the maximum number of pool workers of a bridge. + +Now the maximum amount is 1024 to avoid large memory consumption from an unreasonable number of workers. diff --git a/changes/ce/perf-11020.en.md b/changes/ce/perf-11020.en.md new file mode 100644 index 000000000..2df22aebe --- /dev/null +++ b/changes/ce/perf-11020.en.md @@ -0,0 +1 @@ +Upgraded emqtt dependency to avoid sensitive data leakage in the debug log. diff --git a/deploy/packages/rpm/emqx.spec b/deploy/packages/rpm/emqx.spec index b2b58ac23..4839d1a7d 100644 --- a/deploy/packages/rpm/emqx.spec +++ b/deploy/packages/rpm/emqx.spec @@ -23,8 +23,12 @@ AutoReq: 0 %if "%{_arch} %{?rhel}" == "x86_64 7" Requires: openssl11 libatomic procps which findutils %else +%if "%{?dist}" == ".amzn2023" +Requires: libatomic procps which findutils ncurses util-linux shadow-utils +%else Requires: libatomic procps which findutils %endif +%endif %description EMQX, a distributed, massively scalable, highly extensible MQTT message broker. diff --git a/dev b/dev index 6a20c4a2d..897aed5de 100755 --- a/dev +++ b/dev @@ -158,7 +158,7 @@ export EMQX_LOG_DIR="$BASE_DIR/log" CONFIGS_DIR="$EMQX_DATA_DIR/configs" # Use your cookie so your IDE can connect to it. COOKIE="${EMQX_NODE__COOKIE:-${EMQX_NODE_COOKIE:-$(cat ~/.erlang.cookie || echo 'emqxsecretcookie')}}" -mkdir -p "$EMQX_ETC_DIR" "$EMQX_DATA_DIR/patches" "$EMQX_LOG_DIR" "$CONFIGS_DIR" +mkdir -p "$EMQX_ETC_DIR" "$EMQX_DATA_DIR/patches" "$EMQX_DATA_DIR/certs" "$EMQX_LOG_DIR" "$CONFIGS_DIR" if [ $EKKA_EPMD -eq 1 ]; then EPMD_ARGS='-start_epmd false -epmd_module ekka_epmd' else @@ -290,7 +290,7 @@ append_args_file() { +IOt 4 +SDio 8 -shutdown_time 30000 --pa '"$EMQX_DATA_DIR/patches"' +-pa '$EMQX_DATA_DIR/patches' -mnesia dump_log_write_threshold 5000 -mnesia dump_log_time_threshold 60000 -os_mon start_disksup false diff --git a/mix.exs b/mix.exs index 5f60a2e34..ca17ccd38 100644 --- a/mix.exs +++ b/mix.exs @@ -64,15 +64,15 @@ defmodule EMQXUmbrella.MixProject do {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer {:emqtt, - github: "emqx/emqtt", tag: "1.8.5", override: true, system_env: maybe_no_quic_env()}, - {:rulesql, github: "emqx/rulesql", tag: "0.1.6"}, + github: "emqx/emqtt", tag: "1.8.6", override: true, system_env: maybe_no_quic_env()}, + {:rulesql, github: "emqx/rulesql", tag: "0.1.7"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, {:telemetry, "1.1.0"}, # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.39.8", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.39.9", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, @@ -216,14 +216,7 @@ defmodule EMQXUmbrella.MixProject do github: "emqx/rabbitmq-server", tag: "v3.11.13-emqx", sparse: "deps/amqp_client", - override: true}, - {:erlcloud, github: "emqx/erlcloud", tag: "3.6.8-emqx-1", override: true}, - # erlcloud's rebar.config requires rebar3 and does not support Mix, - # so it tries to fetch deps from git. We need to override this. - {:lhttpc, github: "erlcloud/lhttpc", tag: "1.6.2", override: true}, - {:eini, "1.2.9", override: true}, - {:base16, "1.0.0", override: true} - # end of erlcloud's deps + override: true} ] end diff --git a/rebar.config b/rebar.config index 9eca8dfec..20a9834f0 100644 --- a/rebar.config +++ b/rebar.config @@ -69,13 +69,13 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} - , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.6"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}} + , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.7"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}