From cf9d6943d52a71a7d8beaf8bb414cb6eeb7ae30b Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 11 Jun 2024 03:06:25 +0800 Subject: [PATCH 01/23] fix: check willretain and willqos when WillFlag set to `true` --- apps/emqx/src/emqx_frame.erl | 19 +++++++++++++++---- apps/emqx/src/emqx_schema.erl | 1 + changes/ce/fix-13222.en.md | 5 +++++ 3 files changed, 21 insertions(+), 4 deletions(-) create mode 100644 changes/ce/fix-13222.en.md diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 0b02ad1f5..4b4a2d5cf 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -287,14 +287,25 @@ parse_connect(FrameBin, StrictMode) -> % Note: return malformed if reserved flag is not 0. parse_connect2( ProtoName, - <>, + <>, StrictMode ) -> case Reserved of 0 -> ok; 1 -> ?PARSE_ERR(reserved_connect_flag) end, + WillFlag = bool(WillFlagB), + WillRetain = bool(WillRetainB), + case WillFlag of + %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] + false when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos); + %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] + true when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos); + %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] + false when WillRetain -> ?PARSE_ERR(invalid_will_retain); + _ -> ok + end, {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), {ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid), ConnPacket = #mqtt_packet_connect{ @@ -304,9 +315,9 @@ parse_connect2( %% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html is_bridge = (BridgeTag =:= 8), clean_start = bool(CleanStart), - will_flag = bool(WillFlag), + will_flag = WillFlag, will_qos = WillQoS, - will_retain = bool(WillRetain), + will_retain = WillRetain, keepalive = KeepAlive, properties = Properties, clientid = ClientId diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 63b77f8d2..9b96ad20a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -3491,6 +3491,7 @@ mqtt_general() -> )}, {"max_clientid_len", sc( + %% MQTT-v3.1.1-[MQTT-3.1.3-5], MQTT-v5.0-[MQTT-3.1.3-5] range(23, 65535), #{ default => 65535, diff --git a/changes/ce/fix-13222.en.md b/changes/ce/fix-13222.en.md new file mode 100644 index 000000000..0fc7a40ac --- /dev/null +++ b/changes/ce/fix-13222.en.md @@ -0,0 +1,5 @@ +Fix the flags check and error handling related to the Will message in the `CONNECT` packet. +See also: +- MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] +- MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] +- MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] From 675abd7512493abb795c784a074737d93ed5d977 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 11 Jun 2024 03:26:08 +0800 Subject: [PATCH 02/23] test: will retain and willqos in connect flags --- apps/emqx/test/emqx_frame_SUITE.erl | 55 ++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 8193f9c31..2457c3faf 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -64,7 +64,10 @@ groups() -> t_malformed_connect_header, t_malformed_connect_data, t_reserved_connect_flag, - t_invalid_clientid + t_invalid_clientid, + t_undefined_password, + t_invalid_will_retain, + t_invalid_will_qos ]}, {connack, [parallel], [ t_serialize_parse_connack, @@ -738,6 +741,56 @@ t_undefined_password(_) -> ), ok. +t_invalid_will_retain(_) -> + ConnectFlags = <<2#01100000>>, + ConnectBin = + <<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55, + 122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84, + 54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>, + ?assertException( + throw, + {frame_parse_error, invalid_will_retain}, + emqx_frame:parse(ConnectBin) + ), + ok. + +t_invalid_will_qos(_) -> + Will_F_WillQoS0 = <<2#010:3, 2#00:2, 2#000:3>>, + Will_F_WillQoS1 = <<2#010:3, 2#01:2, 2#000:3>>, + Will_F_WillQoS2 = <<2#010:3, 2#10:2, 2#000:3>>, + Will_F_WillQoS3 = <<2#010:3, 2#11:2, 2#000:3>>, + Will_T_WillQoS3 = <<2#011:3, 2#11:2, 2#000:3>>, + ConnectBinFun = fun(ConnectFlags) -> + <<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55, + 122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84, + 54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>> + end, + ?assertMatch( + {ok, _, _, _}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS0)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3)) + ), + ok. + parse_serialize(Packet) -> parse_serialize(Packet, #{strict_mode => true}). From ac8762be1094a3f88da81bbdb6f6e81d9c5d6f0e Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 20 Jun 2024 12:40:26 +0300 Subject: [PATCH 03/23] chore: upgrade ekka to 0.19.5 (mria 0.8.8) mria 0.8.8 heals a network partition once majority of core nodes are alive. Previously, the autoheal worked only when all core nodes were reachable. Fixes: EMQX-10974 --- apps/emqx/rebar.config | 2 +- changes/ce/fix-13307.en.md | 7 +++++++ mix.exs | 2 +- rebar.config | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-13307.en.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 27648a88d..f3089d11f 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, diff --git a/changes/ce/fix-13307.en.md b/changes/ce/fix-13307.en.md new file mode 100644 index 000000000..d15732586 --- /dev/null +++ b/changes/ce/fix-13307.en.md @@ -0,0 +1,7 @@ +Upgrade ekka lib to 0.19.5 + +ekka 0.19.5 uses mria 0.8.8 that improves auto-heal functionality. +Previously, the auto-heal worked only when all core nodes were reachable again. +This update allows to apply auto-heal once the majority of core nodes are alive. + +[Mria PR](https://github.com/emqx/mria/pull/180) diff --git a/mix.exs b/mix.exs index 6ee0c73e5..2cc48d979 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.19.3", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.4.3", override: true}, diff --git a/rebar.config b/rebar.config index 346014c17..2be8656a6 100644 --- a/rebar.config +++ b/rebar.config @@ -83,7 +83,7 @@ {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.3"}}}, From 75a524c916744c01fba7e03d08477608c1484cd4 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 21 Jun 2024 18:04:24 +0800 Subject: [PATCH 04/23] test: add more debug msg to flaky cluster_rpc SUITE --- .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index b054988be..cfdc5820e 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -74,13 +74,14 @@ end_per_testcase(_Config) -> t_base_test(_Config) -> ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), - MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + MFA = {M, F, A} = {?MODULE, echo, [Pid, Msg]}, {ok, TnxId, ok} = multicall(M, F, A), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), ?assertEqual({ok, 2, ok}, multicall(M, F, A)), {atomic, Status} = emqx_cluster_rpc:status(), case length(Status) =:= 3 of @@ -118,9 +119,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), Pid = self(), - {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]}, {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {ok, _, ok} = multicall(M, F, A, 1, 1000), @@ -154,9 +156,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> t_commit_concurrency(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), Pid = self(), - {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, - {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), - ?assertEqual(ok, receive_msg(3, test)), + Msg = ?FUNCTION_NAME, + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]}, + ?assertEqual({ok, 1, ok}, multicall(BaseM, BaseF, BaseA)), + ?assertEqual(ok, receive_msg(3, Msg)), %% call concurrently without stale tnx_id error Workers = lists:seq(1, 256), @@ -231,23 +234,24 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {atomic, [_Status | L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ets:insert(test, {other_mfa_result, ok}), - {ok, 2, ok} = multicall(io, format, ["test"], 1, 1000), + {ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000), ct:sleep(1000), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), - MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]}, {ok, TnxId, ok} = multicall(M1, F1, A1), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), ok. t_del_stale_mfa(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - MFA = {M, F, A} = {io, format, ["test"]}, + MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]}, Keys = lists:seq(1, 50), Keys2 = lists:seq(51, 150), Ids = @@ -288,7 +292,7 @@ t_del_stale_mfa(_Config) -> t_skip_failed_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -308,7 +312,7 @@ t_skip_failed_commit(_Config) -> t_fast_forward_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -356,7 +360,11 @@ tnx_ids(Status) -> start() -> {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), + ok = emqx_cluster_rpc:wait_for_cluster_rpc(), ok = emqx_cluster_rpc:reset(), + %% Ensure all processes are idle status. + ok = gen_server:call(?NODE2, test), + ok = gen_server:call(?NODE3, test), ok. stop() -> @@ -366,6 +374,7 @@ stop() -> undefined -> ok; P -> + erlang:unregister(N), erlang:unlink(P), erlang:exit(P, kill) end @@ -379,8 +388,9 @@ receive_msg(Count, Msg) when Count > 0 -> receive Msg -> receive_msg(Count - 1, Msg) - after 1000 -> - timeout + after 1300 -> + Msg = iolist_to_binary(io_lib:format("There's still ~w messages to be received", [Count])), + {Msg, flush_msg([])} end. echo(Pid, Msg) -> @@ -425,3 +435,11 @@ multicall(M, F, A, N, T) -> multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). + +flush_msg(Acc) -> + receive + Msg -> + flush_msg([Msg | Acc]) + after 10 -> + Acc + end. From c9ec5ac87b4c2487ebea3732942615a1d9df382f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 21 Jun 2024 14:07:56 -0300 Subject: [PATCH 05/23] test(gcp consumer): stabilize flaky test https://github.com/emqx/emqx/actions/runs/9614788348/job/26526973635?pr=13317#step:5:1463 ``` %%% emqx_bridge_gcp_pubsub_consumer_SUITE ==> t_connection_down_before_starting: FAILED %%% emqx_bridge_gcp_pubsub_consumer_SUITE ==> {{panic, #{msg => "Unexpected result", result => {run_stage_failed,error, {assertMatch, [{module,emqx_bridge_gcp_pubsub_consumer_SUITE}, {line,1451}, {expression,"health_check ( Config )"}, {pattern,"{ ok , connecting }"}, {value,{ok,disconnected}}]}, [{emqx_bridge_gcp_pubsub_consumer_SUITE, '-t_connection_down_before_starting/1-fun-11-',4, [{file, "/emqx/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl"}, {line,1451}]}, {emqx_bridge_gcp_pubsub_consumer_SUITE, t_connection_down_before_starting,1, [{file, "/emqx/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl"}, {line,1427}]}]}}}, [{emqx_bridge_gcp_pubsub_consumer_SUITE,t_connection_down_before_starting,1, [{file, "/emqx/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl"}, {line,1462}]}, {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1793}]}, {test_server,run_test_case_eval1,6,[{file,"test_server.erl"},{line,1302}]}, {test_server,run_test_case_eval,9,[{file,"test_server.erl"},{line,1234}]}]} ``` --- .../test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index c96eeeccf..9450d02f0 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -1448,7 +1448,10 @@ t_connection_down_before_starting(Config) -> ), {ok, _} = create_bridge(Config), {ok, _} = snabbkaffe:receive_events(SRef0), - ?assertMatch({ok, connecting}, health_check(Config)), + ?assertMatch( + {ok, Status} when Status =:= connecting orelse Status =:= disconnected, + health_check(Config) + ), emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), ?retry( From 1a497bcaf2b4aaf476c6d1a5eddb9f99c028fa60 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 24 Jun 2024 17:17:52 -0300 Subject: [PATCH 06/23] fix(greptime): correctly define grpc options for `grpcbox_channel` Will probably fix CI flakiness. --- .../emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src | 2 +- .../src/emqx_bridge_greptimedb_connector.erl | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index badddb20f..8c3223e8b 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_greptimedb, [ {description, "EMQX GreptimeDB Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 1cd808e46..ee256794c 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -359,7 +359,7 @@ do_start_client( {error, Reason} end. -grpc_config() -> +grpc_opts() -> #{ sync_start => true, connect_timeout => ?CONNECT_TIMEOUT @@ -378,7 +378,7 @@ client_config( {pool, InstId}, {pool_type, random}, {auto_reconnect, ?AUTO_RECONNECT_S}, - {gprc_options, grpc_config()} + {grpc_opts, grpc_opts()} ] ++ protocol_config(Config). protocol_config( From 98e4ea6fde9bd9cbed41079a83de68f7df94e291 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 13:49:25 +0200 Subject: [PATCH 07/23] feat(bridge-s3): make validation errors more readable And also turn them into schema-level validations, instead of bridge-level error conditions. --- .../src/emqx_bridge_s3_connector.erl | 19 ++----- .../src/emqx_bridge_s3_upload.erl | 49 ++++++++++++------- .../emqx_bridge_s3_aggreg_upload_SUITE.erl | 21 ++++++++ apps/emqx_utils/src/emqx_utils.erl | 4 ++ 4 files changed, 60 insertions(+), 33 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 00c03fd3a..c9e23a934 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -162,13 +162,8 @@ on_get_status(_InstId, State = #{client_config := Config}) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - try - ChannelState = start_channel(State, Config), - {ok, State#{channels => Channels#{ChannelId => ChannelState}}} - catch - throw:Reason -> - {error, Reason} - end. + ChannelState = start_channel(State, Config), + {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. @@ -217,7 +212,8 @@ start_channel(State, #{ max_records := MaxRecords }, container := Container, - bucket := Bucket + bucket := Bucket, + key := Key } }) -> AggregId = {Type, Name}, @@ -226,7 +222,7 @@ start_channel(State, #{ max_records => MaxRecords, work_dir => work_dir(Type, Name) }, - Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)), + Template = emqx_bridge_s3_upload:mk_key_template(Key), DeliveryOpts = #{ bucket => Bucket, key => Template, @@ -253,11 +249,6 @@ start_channel(State, #{ on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. -ensure_ok({ok, V}) -> - V; -ensure_ok({error, Reason}) -> - throw(Reason). - upload_options(Parameters) -> #{acl => maps:get(acl, Parameters, undefined)}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl index 2bf12f24b..bedefc7c5 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -29,7 +29,10 @@ ]). %% Internal exports --export([convert_actions/2]). +-export([ + convert_actions/2, + validate_key_template/1 +]). -define(DEFAULT_AGGREG_BATCH_SIZE, 100). -define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>). @@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) -> )} ], emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ - {key, #{desc => ?DESC(s3_aggregated_upload_key)}} + {key, #{ + desc => ?DESC(s3_aggregated_upload_key), + validator => fun ?MODULE:validate_key_template/1 + }} ]), emqx_s3_schema:fields(s3_uploader) ]); @@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou Conf#{<<"resource_opts">> := NResourceOpts} end. -%% Interpreting options - --spec mk_key_template(_Parameters :: map()) -> - {ok, emqx_template:str()} | {error, _Reason}. -mk_key_template(#{key := Key}) -> - Template = emqx_template:parse(Key), +validate_key_template(Conf) -> + Template = emqx_template:parse(Conf), case validate_bindings(emqx_template:placeholders(Template)) of - UsedBindings when is_list(UsedBindings) -> - SuffixTemplate = mk_suffix_template(UsedBindings), - case emqx_template:is_const(SuffixTemplate) of - true -> - {ok, Template}; - false -> - {ok, Template ++ SuffixTemplate} - end; - Error = {error, _} -> - Error + Bindings when is_list(Bindings) -> + ok; + {error, {disallowed_placeholders, Disallowed}} -> + {error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])} end. validate_bindings(Bindings) -> @@ -276,7 +272,22 @@ validate_bindings(Bindings) -> [] -> Bindings; Disallowed -> - {error, {invalid_key_template, {disallowed_placeholders, Disallowed}}} + {error, {disallowed_placeholders, Disallowed}} + end. + +%% Interpreting options + +-spec mk_key_template(unicode:chardata()) -> + emqx_template:str(). +mk_key_template(Key) -> + Template = emqx_template:parse(Key), + UsedBindings = emqx_template:placeholders(Template), + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + Template; + false -> + Template ++ SuffixTemplate end. mk_suffix_template(UsedBindings) -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index b7c17bbaa..345c2e9aa 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -177,6 +177,27 @@ t_create_invalid_config(Config) -> ) ). +t_create_invalid_config_key_template(Config) -> + ?assertMatch( + {error, + {_Status, _, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>, + <<"path">> := <<"root.parameters.key">> + } + }}}, + emqx_bridge_v2_testlib:create_bridge_api( + Config, + _Overrides = #{ + <<"parameters">> => #{ + <<"key">> => <<"${action}/${foo}:${bar.rfc3339}">> + } + } + ) + ). + t_update_invalid_config(Config) -> ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch( diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 536b427b3..e4f0d91d1 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -65,6 +65,7 @@ flattermap/2, tcp_keepalive_opts/4, format/1, + format/2, call_first_defined/1, ntoa/1, foldl_while/3, @@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) -> format(Term) -> iolist_to_binary(io_lib:format("~0p", [Term])). +format(Fmt, Args) -> + unicode:characters_to_binary(io_lib:format(Fmt, Args)). + -spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return(). call_first_defined([{Module, Function, Args} | Rest]) -> try From f3ffbd47106b585c3b34b603c5f95fad44752928 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 14:45:51 +0200 Subject: [PATCH 08/23] feat(bridge-s3): provide more meaningful error details in status --- .../src/emqx_bridge_s3_connector.erl | 68 +++++++++++++++---- .../test/emqx_bridge_s3_SUITE.erl | 7 ++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index c9e23a934..204a84a65 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -146,16 +146,14 @@ on_stop(InstId, _State = #{pool_name := PoolName}) -> on_get_status(_InstId, State = #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - {?status_disconnected, State, Reason}; + {?status_disconnected, State, map_error_details(Reason)}; AWSConfig -> try erlcloud_s3:list_buckets(AWSConfig) of Props when is_list(Props) -> ?status_connected catch - error:{aws_error, {http_error, _Code, _, Reason}} -> - {?status_disconnected, State, Reason}; - error:{aws_error, {socket_error, Reason}} -> - {?status_disconnected, State, Reason} + error:Error -> + {?status_disconnected, State, map_error_details(Error)} end end. @@ -284,8 +282,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> catch error:{aws_error, {http_error, 404, _, _Reason}} -> throw({unhealthy_target, "Bucket does not exist"}); - error:{aws_error, {socket_error, Reason}} -> - throw({unhealthy_target, emqx_utils:format(Reason)}) + error:Error -> + throw({unhealthy_target, map_error_details(Error)}) end end. @@ -378,13 +376,31 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> {error, {unrecoverable_error, Reason}} end. -map_error({socket_error, _} = Reason) -> - {recoverable_error, Reason}; -map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> +map_error(Error) -> + {map_error_class(Error), map_error_details(Error)}. + +map_error_class({s3_error, _, _}) -> + unrecoverable_error; +map_error_class({aws_error, Error}) -> + map_error_class(Error); +map_error_class({socket_error, _}) -> + recoverable_error; +map_error_class({http_error, Status, _, _}) when Status >= 500 -> %% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList - {recoverable_error, Reason}; -map_error(Reason) -> - {unrecoverable_error, Reason}. + recoverable_error; +map_error_class(_Error) -> + unrecoverable_error. + +map_error_details({s3_error, Code, Message}) -> + emqx_utils:format("S3 error: ~s ~s", [Code, Message]); +map_error_details({aws_error, Error}) -> + map_error_details(Error); +map_error_details({socket_error, Reason}) -> + emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]); +map_error_details({http_error, _, _, _} = Error) -> + emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]); +map_error_details(Error) -> + Error. render_bucket(Template, Data) -> case emqx_template:render(Template, {emqx_jsonish, Data}) of @@ -407,6 +423,32 @@ render_content(Template, Data) -> iolist_to_string(IOList) -> unicode:characters_to_list(IOList). +%% + +-include_lib("xmerl/include/xmerl.hrl"). + +-spec map_aws_error_details(_AWSError) -> + unicode:chardata(). +map_aws_error_details({http_error, _Status, _, Body}) -> + try xmerl_scan:string(unicode:characters_to_list(Body), [{quiet, true}]) of + {Error = #xmlElement{name = 'Error'}, _} -> + map_aws_error_details(Error); + _ -> + Body + catch + exit:_ -> + Body + end; +map_aws_error_details(#xmlElement{content = Content}) -> + Code = extract_xml_text(lists:keyfind('Code', #xmlElement.name, Content)), + Message = extract_xml_text(lists:keyfind('Message', #xmlElement.name, Content)), + [Code, $:, $\s | Message]. + +extract_xml_text(#xmlElement{content = Content}) -> + [Fragment || #xmlText{value = Fragment} <- Content]; +extract_xml_text(false) -> + []. + %% `emqx_connector_aggreg_delivery` APIs -spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t(). diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index f8eaa1b3a..4771f9d04 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -159,6 +159,13 @@ t_start_broken_update_restart(Config) -> _Attempts = 20, ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId)) ), + ?assertMatch( + {ok, + {{_HTTP, 200, _}, _, #{ + <<"status_reason">> := <<"AWS error: SignatureDoesNotMatch:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:get_connector_api(Type, Name) + ), ?assertMatch( {ok, {{_HTTP, 200, _}, _, _}}, emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf) From fb9afd8313915fab78ce89a89889de98d888006e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 14:56:57 +0200 Subject: [PATCH 09/23] feat(bridge-s3): beautify posix write errors --- apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 204a84a65..60e2c9b87 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -373,7 +373,7 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ok; {error, Reason} -> - {error, {unrecoverable_error, Reason}} + {error, {unrecoverable_error, emqx_utils:explain_posix(Reason)}} end. map_error(Error) -> From 486a041adfeba1a96d38fe1c5ceefbd78aca9662 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 15:15:06 +0200 Subject: [PATCH 10/23] feat(bridge-s3): also map credentials / aggreg upload errors --- .../src/emqx_bridge_s3_connector.erl | 9 ++++++--- .../emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 60e2c9b87..fdc6d255b 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -274,7 +274,7 @@ channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, S check_bucket_accessible(Bucket, #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - throw({unhealthy_target, Reason}); + throw({unhealthy_target, map_error_details(Reason)}); AWSConfig -> try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of Props when is_list(Props) -> @@ -293,8 +293,7 @@ check_aggreg_upload_errors(AggregId) -> %% TODO %% This approach means that, for example, 3 upload failures will cause %% the channel to be marked as unhealthy for 3 consecutive health checks. - ErrorMessage = emqx_utils:format(Error), - throw({unhealthy_target, ErrorMessage}); + throw({unhealthy_target, map_error_details(Error)}); [] -> ok end. @@ -399,6 +398,10 @@ map_error_details({socket_error, Reason}) -> emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]); map_error_details({http_error, _, _, _} = Error) -> emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]); +map_error_details({failed_to_obtain_credentials, Error}) -> + emqx_utils:format("Unable to obtain AWS credentials: ~s", [map_error_details(Error)]); +map_error_details({upload_failed, Error}) -> + map_error_details(Error); map_error_details(Error) -> Error. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index 4771f9d04..fa3205456 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -134,6 +134,22 @@ action_config(Name, ConnectorId) -> t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). +t_create_unavailable_credentials(Config) -> + ConnectorName = ?config(connector_name, Config), + ConnectorType = ?config(connector_type, Config), + ConnectorConfig = maps:without( + [<<"access_key_id">>, <<"secret_access_key">>], + ?config(connector_config, Config) + ), + ?assertMatch( + {ok, + {{_HTTP, 201, _}, _, #{ + <<"status_reason">> := + <<"Unable to obtain AWS credentials: Socket error:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) + ). + t_ignore_batch_opts(Config) -> {ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config), ?assertMatch( From 6190192cbcd9f90207b8a67cebc9697ba5f15ab9 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 20 Jun 2024 11:26:53 +0200 Subject: [PATCH 11/23] fix: redis connector should not timeout because no username and password A redis connector of type single or sentinel always got a timeout error when doing the connector test in the dashboard if no username or password was provided. This commit makes sure that the user instead get an informative error message. Additionally, this commit adds more more error information for all redis connector types. Fixes: https://emqx.atlassian.net/browse/EMQX-12557 --- .../test/emqx_bridge_v2_testlib.erl | 5 +- .../test/emqx_bridge_v2_redis_SUITE.erl | 39 +++++++++++++ apps/emqx_redis/src/emqx_redis.app.src | 2 +- apps/emqx_redis/src/emqx_redis.erl | 58 +++++++++++++++---- 4 files changed, 89 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 82858f00b..8cf8730b0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -945,6 +945,7 @@ t_on_get_status(Config, Opts) -> ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), FailureStatus = maps:get(failure_status, Opts, disconnected), + NormalStatus = maps:get(normal_status, Opts, connected), ?assertMatch({ok, _}, create_bridge(Config)), ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to @@ -952,7 +953,7 @@ t_on_get_status(Config, Opts) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId)) ), case ProxyHost of undefined -> @@ -971,7 +972,7 @@ t_on_get_status(Config, Opts) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId)) ) end, ok. diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl index 7d3003bfa..725d24a88 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE, redis). -define(BRIDGE_TYPE_BIN, <<"redis">>). @@ -46,6 +47,7 @@ matrix_testcases() -> t_start_stop, t_create_via_http, t_on_get_status, + t_on_get_status_no_username_pass, t_sync_query, t_map_to_redis_hset_args ]. @@ -325,6 +327,43 @@ t_on_get_status(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), ok. +t_on_get_status_no_username_pass(matrix) -> + {on_get_status, [ + [single, tcp], + [cluster, tcp], + [sentinel, tcp] + ]}; +t_on_get_status_no_username_pass(Config0) when is_list(Config0) -> + ConnectorConfig0 = ?config(connector_config, Config0), + ConnectorConfig1 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"password">>], ConnectorConfig0, <<"">> + ), + ConnectorConfig2 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"username">>], ConnectorConfig1, <<"">> + ), + Config1 = proplists:delete(connector_config, Config0), + Config2 = [{connector_config, ConnectorConfig2} | Config1], + ?check_trace( + emqx_bridge_v2_testlib:t_on_get_status( + Config2, + #{ + failure_status => disconnected, + normal_status => disconnected + } + ), + fun(ok, Trace) -> + case ?config(redis_type, Config2) of + single -> + ?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace)); + sentinel -> + ?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace)); + cluster -> + ok + end + end + ), + ok. + t_sync_query(matrix) -> {sync_query, [ [single, tcp], diff --git a/apps/emqx_redis/src/emqx_redis.app.src b/apps/emqx_redis/src/emqx_redis.app.src index 660c490e6..02a251637 100644 --- a/apps/emqx_redis/src/emqx_redis.app.src +++ b/apps/emqx_redis/src/emqx_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_redis, [ {description, "EMQX Redis Database Connector"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 17a0ede49..3abff99eb 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -19,6 +19,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]). @@ -231,7 +232,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) -> is_unrecoverable_error(_) -> false. -on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> +on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) -> case eredis_cluster:pool_exists(PoolName) of true -> %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster. @@ -242,24 +243,57 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> [] -> disconnected; [_ | _] -> - Health = eredis_cluster:ping_all(PoolName), - status_result(Health) + do_cluster_status_check(PoolName, State) end; false -> disconnected end; -on_get_status(_InstId, #{pool_name := PoolName}) -> - Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), - status_result(Health). +on_get_status(_InstId, #{pool_name := PoolName} = State) -> + Timeout = 1000, + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + DoPerWorker = + fun(Worker) -> + case ecpool_worker:client(Worker) of + {ok, Conn} -> + erlang:is_process_alive(Conn) andalso + ecpool_worker:exec(Worker, fun ?MODULE:do_get_status/1, Timeout); + Error -> + Error + end + end, + {ok, Results} = + try + {ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)} + catch + exit:timeout -> + {error, timeout} + end, + sum_worker_results(Results, State). + +do_cluster_status_check(Pool, State) -> + Pongs = eredis_cluster:qa(Pool, [<<"PING">>]), + sum_worker_results(Pongs, State). do_get_status(Conn) -> - case eredis:q(Conn, ["PING"]) of - {ok, _} -> true; - _ -> false - end. + eredis:q(Conn, ["PING"]). -status_result(_Status = true) -> connected; -status_result(_Status = false) -> connecting. +sum_worker_results([], _State) -> + connected; +sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) -> + ?tp(emqx_redis_auth_required_error, #{}), + %% This requires user action to fix so we set the status to disconnected + {disconnected, State, Error}; +sum_worker_results([{ok, _} | Rest], State) -> + sum_worker_results(Rest, State); +sum_worker_results([Error | _Rest], State) -> + ?SLOG( + warning, + #{ + msg => "emqx_redis_check_status_error", + error => Error + } + ), + {connecting, State, Error}. do_cmd(PoolName, cluster, {cmd, Command}) -> eredis_cluster:q(PoolName, Command); From 31509f02cc05b994f24fd04a99a375fb371d0159 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 20 Jun 2024 11:38:31 +0200 Subject: [PATCH 12/23] docs: add change log entry --- changes/ee/fix-13305.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/fix-13305.en.md diff --git a/changes/ee/fix-13305.en.md b/changes/ee/fix-13305.en.md new file mode 100644 index 000000000..1936a49e3 --- /dev/null +++ b/changes/ee/fix-13305.en.md @@ -0,0 +1 @@ +Improved error handling for Redis connectors. Previously, Redis connectors of type single or sentinel would always encounter a timeout error during the connector test in the dashboard if no username or password was provided. This update ensures that users now receive an informative error message in such scenarios. Additionally, more detailed error information has been added for all Redis connector types to enhance diagnostics and troubleshooting. From 130571b56e98d398bc4de46ce5d9bf2361ba2904 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 20 Jun 2024 14:40:18 +0200 Subject: [PATCH 13/23] fix: code improvements thanks to comments from @thalesmg --- apps/emqx_redis/src/emqx_redis.erl | 43 +++++++++++++----------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 3abff99eb..059e9aa23 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -20,6 +20,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]). @@ -241,34 +242,26 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) -> %% In this case, we can directly consider it as a disconnect and then proceed to reconnect. case eredis_cluster_monitor:get_all_pools(PoolName) of [] -> - disconnected; + ?status_disconnected; [_ | _] -> do_cluster_status_check(PoolName, State) end; false -> - disconnected + ?status_disconnected end; on_get_status(_InstId, #{pool_name := PoolName} = State) -> - Timeout = 1000, - Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - DoPerWorker = - fun(Worker) -> - case ecpool_worker:client(Worker) of - {ok, Conn} -> - erlang:is_process_alive(Conn) andalso - ecpool_worker:exec(Worker, fun ?MODULE:do_get_status/1, Timeout); - Error -> - Error - end - end, - {ok, Results} = - try - {ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)} - catch - exit:timeout -> - {error, timeout} - end, - sum_worker_results(Results, State). + HealthCheckResoults = emqx_resource_pool:health_check_workers( + PoolName, + fun ?MODULE:do_get_status/1, + emqx_resource_pool:health_check_timeout(), + #{return_values => true} + ), + case HealthCheckResoults of + {ok, Results} -> + sum_worker_results(Results, State); + Error -> + {?status_disconnected, State, Error} + end. do_cluster_status_check(Pool, State) -> Pongs = eredis_cluster:qa(Pool, [<<"PING">>]), @@ -278,11 +271,11 @@ do_get_status(Conn) -> eredis:q(Conn, ["PING"]). sum_worker_results([], _State) -> - connected; + ?status_connected; sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) -> ?tp(emqx_redis_auth_required_error, #{}), %% This requires user action to fix so we set the status to disconnected - {disconnected, State, Error}; + {?status_disconnected, State, {unhealthy_target, Error}}; sum_worker_results([{ok, _} | Rest], State) -> sum_worker_results(Rest, State); sum_worker_results([Error | _Rest], State) -> @@ -293,7 +286,7 @@ sum_worker_results([Error | _Rest], State) -> error => Error } ), - {connecting, State, Error}. + {?status_connecting, State, Error}. do_cmd(PoolName, cluster, {cmd, Command}) -> eredis_cluster:q(PoolName, Command); From 0eedab3d7639f9caf9d905a409b2589a3498f2f1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 15:25:34 +0200 Subject: [PATCH 14/23] chore: add changelog entry --- changes/ee/breaking-13332.en.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes/ee/breaking-13332.en.md diff --git a/changes/ee/breaking-13332.en.md b/changes/ee/breaking-13332.en.md new file mode 100644 index 000000000..0b5bf5896 --- /dev/null +++ b/changes/ee/breaking-13332.en.md @@ -0,0 +1,4 @@ +When an S3 Bridge is improperly configured, error messages now contain more informative and easy to read details. + +## Breaking changes +* S3 Bridge configuration with invalid aggregated upload key template will no longer work. Before this change, such configuration was considered valid but the bridge would never work anyway. From da214be5a13cbd4e009c8a79b7ca0c4fd948787f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 17:12:14 +0200 Subject: [PATCH 15/23] test(bridge-s3): adapt testcase to different CI environment --- apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index fa3205456..ea69a346f 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -145,7 +145,7 @@ t_create_unavailable_credentials(Config) -> {ok, {{_HTTP, 201, _}, _, #{ <<"status_reason">> := - <<"Unable to obtain AWS credentials: Socket error:", _/bytes>> + <<"Unable to obtain AWS credentials:", _/bytes>> }}}, emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ). From 4942f6f75a2a1b303cd7339fcd1614cfb3e3378c Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 21 Jun 2024 18:07:33 +0800 Subject: [PATCH 16/23] feat: improve keepalive_multiplier and keepalive_check_interval --- apps/emqx/src/emqx_channel.erl | 33 ++-- apps/emqx/src/emqx_connection.erl | 4 +- apps/emqx/src/emqx_keepalive.erl | 98 ++++++++--- apps/emqx/src/emqx_schema.erl | 10 +- apps/emqx/src/emqx_ws_connection.erl | 3 +- apps/emqx/test/emqx_config_SUITE.erl | 1 + apps/emqx/test/emqx_keepalive_SUITE.erl | 166 +++++++++++++++++- .../src/emqx_coap_channel.erl | 8 +- .../src/emqx_coap_schema.erl | 8 +- .../src/emqx_gateway_coap.app.src | 2 +- .../test/emqx_coap_SUITE.erl | 7 +- .../src/emqx_exproto_channel.erl | 4 +- .../src/emqx_gateway_exproto.app.src | 2 +- .../src/emqx_gateway_gbt32960.app.src | 2 +- .../src/emqx_gbt32960_channel.erl | 2 +- .../src/emqx_gateway_jt808.app.src | 2 +- .../src/emqx_jt808_channel.erl | 2 +- .../src/emqx_gateway_mqttsn.app.src | 2 +- .../src/emqx_mqttsn_channel.erl | 4 +- .../test/emqx_mgmt_api_clients_SUITE.erl | 2 +- rel/i18n/emqx_schema.hocon | 9 + 21 files changed, 301 insertions(+), 70 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index c1a9cc162..1a24cd260 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -544,8 +544,10 @@ handle_in( {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) end; -handle_in(?PACKET(?PINGREQ), Channel) -> - {ok, ?PACKET(?PINGRESP), Channel}; +handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) -> + {ok, NKeepalive} = emqx_keepalive:check(Keepalive), + NChannel = Channel#channel{keepalive = NKeepalive}, + {ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)}; handle_in( ?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo} @@ -1229,11 +1231,12 @@ handle_call( {keepalive, Interval}, Channel = #channel{ keepalive = KeepAlive, - conninfo = ConnInfo + conninfo = ConnInfo, + clientinfo = #{zone := Zone} } ) -> ClientId = info(clientid, Channel), - NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive), + NKeepalive = emqx_keepalive:update(Zone, Interval, KeepAlive), NConnInfo = maps:put(keepalive, Interval, ConnInfo), NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo}, SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), @@ -1333,22 +1336,22 @@ die_if_test_compiled() -> | {shutdown, Reason :: term(), channel()}. handle_timeout( _TRef, - {keepalive, _StatVal}, + keepalive, Channel = #channel{keepalive = undefined} ) -> {ok, Channel}; handle_timeout( _TRef, - {keepalive, _StatVal}, + keepalive, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - {keepalive, StatVal}, + keepalive, Channel = #channel{keepalive = Keepalive} ) -> - case emqx_keepalive:check(StatVal, Keepalive) of + case emqx_keepalive:check(Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, {ok, reset_timer(keepalive, NChannel)}; @@ -1459,10 +1462,16 @@ reset_timer(Name, Time, Channel) -> ensure_timer(Name, Time, clean_timer(Name, Channel)). clean_timer(Name, Channel = #channel{timers = Timers}) -> - Channel#channel{timers = maps:remove(Name, Timers)}. + case maps:take(Name, Timers) of + error -> + Channel; + {TRef, NTimers} -> + ok = emqx_utils:cancel_timer(TRef), + Channel#channel{timers = NTimers} + end. interval(keepalive, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_delivery, #channel{session = Session}) -> emqx_session:info(retry_interval, Session); interval(expire_awaiting_rel, #channel{session = Session}) -> @@ -2320,9 +2329,7 @@ ensure_keepalive_timer(0, Channel) -> ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> - Multiplier = get_mqtt_conf(Zone, keepalive_multiplier), - RecvCnt = emqx_pd:get_counter(recv_pkt), - Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)), + Keepalive = emqx_keepalive:init(Zone, Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). clear_keepalive(Channel = #channel{timers = Timers}) -> diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index ed62fb63c..517a5cc2f 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -729,9 +729,7 @@ handle_timeout( disconnected -> {ok, State}; _ -> - %% recv_pkt: valid MQTT message - RecvCnt = emqx_pd:get_counter(recv_pkt), - handle_timeout(TRef, {keepalive, RecvCnt}, State) + with_channel(handle_timeout, [TRef, keepalive], State) end; handle_timeout(TRef, Msg, State) -> with_channel(handle_timeout, [TRef, Msg], State). diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 8ed685db2..785893d2d 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -19,10 +19,12 @@ -export([ init/1, init/2, + init/3, info/1, info/2, + check/1, check/2, - update/2 + update/3 ]). -elvis([{elvis_style, no_if_expression, disable}]). @@ -30,8 +32,12 @@ -export_type([keepalive/0]). -record(keepalive, { - interval :: pos_integer(), - statval :: non_neg_integer() + check_interval :: pos_integer(), + %% the received packets since last keepalive check + statval :: non_neg_integer(), + %% The number of idle intervals allowed before disconnecting the client. + idle_milliseconds = 0 :: non_neg_integer(), + max_idle_millisecond :: pos_integer() }). -opaque keepalive() :: #keepalive{}. @@ -39,7 +45,11 @@ %% @doc Init keepalive. -spec init(Interval :: non_neg_integer()) -> keepalive(). -init(Interval) -> init(0, Interval). +init(Interval) -> init(default, 0, Interval). + +init(Zone, Interval) -> + RecvCnt = emqx_pd:get_counter(recv_pkt), + init(Zone, RecvCnt, Interval). %% from mqtt-v3.1.1 specific %% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. @@ -53,42 +63,88 @@ init(Interval) -> init(0, Interval). %% typically this is a few minutes. %% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds. %% @doc Init keepalive. --spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined. -init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL -> - #keepalive{interval = Interval, statval = StatVal}; -init(_, 0) -> +-spec init( + Zone :: atom(), + StatVal :: non_neg_integer(), + Second :: non_neg_integer() +) -> keepalive() | undefined. +init(Zone, StatVal, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL -> + #{keepalive_multiplier := Mul, keepalive_check_interval := CheckInterval} = + emqx_config:get_zone_conf(Zone, [mqtt]), + MilliSeconds = timer:seconds(Second), + Interval = emqx_utils:clamp(CheckInterval, 1000, max(MilliSeconds div 2, 1000)), + MaxIdleMs = ceil(MilliSeconds * Mul), + #keepalive{ + check_interval = Interval, + statval = StatVal, + idle_milliseconds = 0, + max_idle_millisecond = MaxIdleMs + }; +init(_Zone, _, 0) -> undefined; -init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL). +init(Zone, StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(Zone, StatVal, ?MAX_INTERVAL). %% @doc Get Info of the keepalive. -spec info(keepalive()) -> emqx_types:infos(). info(#keepalive{ - interval = Interval, - statval = StatVal + check_interval = Interval, + statval = StatVal, + idle_milliseconds = IdleIntervals, + max_idle_millisecond = MaxMs }) -> #{ - interval => Interval, - statval => StatVal + check_interval => Interval, + statval => StatVal, + idle_milliseconds => IdleIntervals, + max_idle_millisecond => MaxMs }. --spec info(interval | statval, keepalive()) -> +-spec info(check_interval | statval | idle_milliseconds, keepalive()) -> non_neg_integer(). -info(interval, #keepalive{interval = Interval}) -> +info(check_interval, #keepalive{check_interval = Interval}) -> Interval; info(statval, #keepalive{statval = StatVal}) -> StatVal; -info(interval, undefined) -> +info(idle_milliseconds, #keepalive{idle_milliseconds = Val}) -> + Val; +info(check_interval, undefined) -> 0. +check(Keepalive = #keepalive{}) -> + RecvCnt = emqx_pd:get_counter(recv_pkt), + check(RecvCnt, Keepalive); +check(Keepalive) -> + {ok, Keepalive}. + %% @doc Check keepalive. -spec check(non_neg_integer(), keepalive()) -> {ok, keepalive()} | {error, timeout}. -check(Val, #keepalive{statval = Val}) -> {error, timeout}; -check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}. + +check( + NewVal, + #keepalive{ + statval = NewVal, + idle_milliseconds = IdleAcc, + check_interval = Interval, + max_idle_millisecond = Max + } +) when IdleAcc + Interval >= Max -> + {error, timeout}; +check( + NewVal, + #keepalive{ + statval = NewVal, + idle_milliseconds = IdleAcc, + check_interval = Interval + } = KeepAlive +) -> + {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = IdleAcc + Interval}}; +check(NewVal, #keepalive{} = KeepAlive) -> + {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = 0}}. %% @doc Update keepalive. %% The statval of the previous keepalive will be used, %% and normal checks will begin from the next cycle. --spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. -update(Interval, undefined) -> init(0, Interval); -update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval). +-spec update(atom(), non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. +update(Zone, Interval, undefined) -> init(Zone, 0, Interval); +update(Zone, Interval, #keepalive{statval = StatVal}) -> init(Zone, StatVal, Interval). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 9b96ad20a..6b02a4d4b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -3613,9 +3613,17 @@ mqtt_general() -> desc => ?DESC(mqtt_keepalive_multiplier) } )}, + {"keepalive_check_interval", + sc( + timeout_duration(), + #{ + default => <<"30s">>, + desc => ?DESC(mqtt_keepalive_check_interval) + } + )}, {"retry_interval", sc( - duration(), + timeout_duration(), #{ default => <<"30s">>, desc => ?DESC(mqtt_retry_interval) diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 038f3e98e..e46bdc313 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -555,8 +555,7 @@ handle_info(Info, State) -> handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> shutdown(idle_timeout, State); handle_timeout(TRef, keepalive, State) when is_reference(TRef) -> - RecvOct = emqx_pd:get_counter(recv_oct), - handle_timeout(TRef, {keepalive, RecvOct}, State); + with_channel(handle_timeout, [TRef, keepalive], State); handle_timeout( TRef, emit_stats, diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 28f542f81..568f5de20 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -428,6 +428,7 @@ zone_global_defaults() -> ignore_loop_deliver => false, keepalive_backoff => 0.75, keepalive_multiplier => 1.5, + keepalive_check_interval => 30000, max_awaiting_rel => 100, max_clientid_len => 65535, max_inflight => 32, diff --git a/apps/emqx/test/emqx_keepalive_SUITE.erl b/apps/emqx/test/emqx_keepalive_SUITE.erl index 7773774a7..84f66b3a5 100644 --- a/apps/emqx/test/emqx_keepalive_SUITE.erl +++ b/apps/emqx/test/emqx_keepalive_SUITE.erl @@ -19,22 +19,180 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "listeners {" + "tcp.default.bind = 1883," + "ssl.default = marked_for_deletion," + "quic.default = marked_for_deletion," + "ws.default = marked_for_deletion," + "wss.default = marked_for_deletion" + "}"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(apps, Config)). + +t_check_keepalive_default_timeout(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000), + erlang:process_flag(trap_exit, true), + ClientID = <<"default">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(5000, CheckInterval), + %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000) + %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000) + %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout + Timeout = CheckInterval * 3, + %% connector but not send a packet. + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)). + +t_check_keepalive_other_timeout(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 2000), + erlang:process_flag(trap_exit, true), + ClientID = <<"other">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + {ok, _, [0]} = emqtt:subscribe(C, <<"mytopic">>, []), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + %%CheckInterval = ceil(keepalive_check_factor() * KeepaliveSec * 1000), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(2000, CheckInterval), + %% when keepalive_check_interval is 2s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% subscribe T1(packet = 2, idle_milliseconds = 0) + %% check1 T2(packet = 2, idle_milliseconds = 1 * CheckInterval = 2000) + %% check2 T3(packet = 2, idle_milliseconds = 2 * CheckInterval = 4000) + %% check3 T4(packet = 2, idle_milliseconds = 3 * CheckInterval = 6000) + %% check4 T5(packet = 2, idle_milliseconds = 4 * CheckInterval = 8000) + %% check4 T6(packet = 2, idle_milliseconds = 5 * CheckInterval = 10000) + %% check4 T7(packet = 2, idle_milliseconds = 6 * CheckInterval = 12000) + %% check4 T8(packet = 2, idle_milliseconds = 7 * CheckInterval = 14000) + %% check4 T9(packet = 2, idle_milliseconds = 8 * CheckInterval = 16000) > 15000 timeout + Timeout = CheckInterval * 9, + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200), Timeout). + +t_check_keepalive_ping_reset_timer(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 100000), + erlang:process_flag(trap_exit, true), + ClientID = <<"ping_reset">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + ct:sleep(1000), + emqtt:resume(C), + pong = emqtt:ping(C), + emqtt:pause(C), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(5000, CheckInterval), + %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% sleep 1000ms + %% ping (packet = 2, idle_milliseconds = 0) restart timer + %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000) + %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000) + %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout + Timeout = CheckInterval * 3, + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)). + t_check(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000), Keepalive = emqx_keepalive:init(60), - ?assertEqual(60, emqx_keepalive:info(interval, Keepalive)), + ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive)), ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)), Info = emqx_keepalive:info(Keepalive), ?assertEqual( #{ - interval => 60, - statval => 0 + check_interval => 30000, + statval => 0, + idle_milliseconds => 0, + %% 60 * 1.5 * 1000 + max_idle_millisecond => 90000 }, Info ), {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive), ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)), - ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)). + {ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1), + ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)), + {ok, Keepalive3} = emqx_keepalive:check(1, Keepalive2), + ?assertEqual(1, emqx_keepalive:info(statval, Keepalive3)), + ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive3)), + + Keepalive4 = emqx_keepalive:init(90), + ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive4)), + + Keepalive5 = emqx_keepalive:init(1), + ?assertEqual(1000, emqx_keepalive:info(check_interval, Keepalive5)), + ok. + +keepalive_multiplier() -> + emqx_config:get_zone_conf(default, [mqtt, keepalive_multiplier]). + +keepalive_check_interval() -> + emqx_config:get_zone_conf(default, [mqtt, keepalive_check_interval]). + +receive_msg_in_time(ChannelPid, C, Timeout) -> + receive + {'EXIT', ChannelPid, {shutdown, keepalive_timeout}} -> + receive + {'EXIT', C, {shutdown, tcp_closed}} -> + ok + after 500 -> + throw(no_tcp_closed_from_mqtt_client) + end + after Timeout -> + no_keepalive_timeout_received + end. diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index fbab1ff14..844677d12 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -85,7 +85,7 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). --define(DEF_IDLE_TIME, timer:seconds(30)). +-define(DEF_IDLE_SECONDS, 30). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). @@ -149,7 +149,7 @@ init( mountpoint => Mountpoint } ), - Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME), + Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_SECONDS), #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -378,7 +378,7 @@ ensure_keepalive_timer(Channel) -> ensure_keepalive_timer(fun ensure_timer/4, Channel). ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) -> - Heartbeat = emqx_keepalive:info(interval, KeepAlive), + Heartbeat = emqx_keepalive:info(check_interval, KeepAlive), Fun(keepalive, Heartbeat, keepalive, Channel). check_auth_state(Msg, #channel{connection_required = false} = Channel) -> @@ -495,7 +495,7 @@ enrich_conninfo( ) -> case Queries of #{<<"clientid">> := ClientId} -> - Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), + Interval = emqx_keepalive:info(check_interval, KeepAlive), NConnInfo = ConnInfo#{ clientid => ClientId, proto_name => <<"CoAP">>, diff --git a/apps/emqx_gateway_coap/src/emqx_coap_schema.erl b/apps/emqx_gateway_coap/src/emqx_coap_schema.erl index 7d38a2bb6..61d4b7376 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_schema.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_schema.erl @@ -19,12 +19,6 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). --type duration() :: non_neg_integer(). - --typerefl_from_string({duration/0, emqx_schema, to_duration}). - --reflect_type([duration/0]). - %% config schema provides -export([namespace/0, fields/1, desc/1]). @@ -34,7 +28,7 @@ fields(coap) -> [ {heartbeat, sc( - duration(), + emqx_schema:duration_s(), #{ default => <<"30s">>, desc => ?DESC(coap_heartbeat) diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index 3a715eac4..e9c1f2b4a 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index 3201d5dbf..bd403a463 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -100,7 +100,7 @@ init_per_testcase(t_heartbeat, Config) -> OldConf = emqx:get_raw_config([gateway, coap]), {ok, _} = emqx_gateway_conf:update_gateway( coap, - OldConf#{<<"heartbeat">> => <<"800ms">>} + OldConf#{<<"heartbeat">> => <<"1s">>} ), [ {old_conf, OldConf}, @@ -216,8 +216,9 @@ t_heartbeat(Config) -> [], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) ), - - timer:sleep(Heartbeat * 2), + %% The minimum timeout time is 1 second. + %% 1.5 * Heartbeat + 0.5 * Heartbeat(< 1s) = 1.5 * 1 + 1 = 2.5 + timer:sleep(Heartbeat * 2 + 1000), ?assertEqual( [], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index c145506c9..93646acbf 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -715,7 +715,7 @@ ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> Channel; ensure_keepalive_timer(Interval, Channel) -> StatVal = emqx_gateway_conn:keepalive_stats(recv), - Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), + Keepalive = emqx_keepalive:init(default, StatVal, Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). ensure_timer(Name, Channel = #channel{timers = Timers}) -> @@ -746,7 +746,7 @@ interval(force_close_idle, #channel{conninfo = #{idle_timeout := IdleTimeout}}) interval(force_close, _) -> 15000; interval(keepalive, #channel{keepalive = Keepalive}) -> - emqx_keepalive:info(interval, Keepalive). + emqx_keepalive:info(check_interval, Keepalive). %%-------------------------------------------------------------------- %% Dispatch diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index 34fcca216..fe237779b 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src index 123b60203..bcb54e0f1 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_gbt32960, [ {description, "GBT32960 Gateway"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 9652290d3..809a79f7d 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -506,7 +506,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(alive_timer, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_timer, #channel{retx_interval = RetxIntv}) -> RetxIntv. diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src index 8e5157695..8d1e33f74 100644 --- a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_jt808, [ {description, "JT/T 808 Gateway"}, - {vsn, "0.0.3"}, + {vsn, "0.1.0"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index 876f623e9..a74214a1c 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -616,7 +616,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(alive_timer, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_timer, #channel{retx_interval = RetxIntv}) -> RetxIntv. diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 45a1d5da7..585410356 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 501308ea0..c9e109c3f 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -430,7 +430,7 @@ ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel) -> - Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))), + Keepalive = emqx_keepalive:init(Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). %%-------------------------------------------------------------------- @@ -2245,7 +2245,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(keepalive, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_delivery, #channel{session = Session}) -> emqx_mqttsn_session:info(retry_interval, Session); interval(expire_awaiting_rel, #channel{session = Session}) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 2c71e9822..9557c3214 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -1109,7 +1109,7 @@ t_keepalive(_Config) -> [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), %% will reset to max keepalive if keepalive > max keepalive #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid), - ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))), + ?assertMatch({keepalive, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))), {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]), diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index e80f36817..eca281646 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -855,6 +855,15 @@ The default value 1.5 is following the MQTT 5.0 specification. This multiplier i mqtt_keepalive_multiplier.label: """Keep Alive Multiplier""" +mqtt_keepalive_check_interval.desc: +"""The frequency of checking for incoming MQTT packets determines how often the server will check for new MQTT packets. +If a certain amount of time passes without any packets being sent from the client,this time will be added up. +Once the accumulated time exceeds the keepalive interval * the keepalive multiplier, the connection will be terminated. +The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of Interval/2.""" + +mqtt_keepalive_check_interval.label: +"""Keep Alive Check Interval""" + force_gc_bytes.desc: """GC the process after specified number of bytes have passed through.""" From 1735f8deef48709f6b17d40119fadd488998db03 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 26 Jun 2024 18:08:17 +0800 Subject: [PATCH 17/23] chore: apply review suggestion --- rel/i18n/emqx_schema.hocon | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index eca281646..f9978fe6f 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -857,9 +857,9 @@ mqtt_keepalive_multiplier.label: mqtt_keepalive_check_interval.desc: """The frequency of checking for incoming MQTT packets determines how often the server will check for new MQTT packets. -If a certain amount of time passes without any packets being sent from the client,this time will be added up. -Once the accumulated time exceeds the keepalive interval * the keepalive multiplier, the connection will be terminated. -The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of Interval/2.""" +If a certain amount of time passes without any packets being sent from the client, this time will be added up. +Once the accumulated time exceeds `keepalive-interval * keepalive-multiplier`, the connection will be terminated. +The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of `keepalive-interval / 2`.""" mqtt_keepalive_check_interval.label: """Keep Alive Check Interval""" From f6f2ea745111983b5219bab0f505d7404278df7d Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Wed, 26 Jun 2024 15:49:10 +0200 Subject: [PATCH 18/23] build: pin base docker image to stable-20240612-slim latest version of 12-slim as of today is configured to fetch i386 packages --- .github/workflows/build_and_push_docker_images.yaml | 4 ++-- Makefile | 2 +- deploy/docker/Dockerfile | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index a43da1825..502e0e5f6 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -165,7 +165,7 @@ jobs: DOCKER_PUSH: false DOCKER_BUILD_NOCACHE: true DOCKER_LOAD: true - EMQX_RUNNER: 'public.ecr.aws/debian/debian:12-slim' + EMQX_RUNNER: 'public.ecr.aws/debian/debian:stable-20240612-slim' EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} @@ -214,7 +214,7 @@ jobs: DOCKER_BUILD_NOCACHE: false DOCKER_PLATFORMS: linux/amd64,linux/arm64 DOCKER_LOAD: false - EMQX_RUNNER: 'public.ecr.aws/debian/debian:12-slim' + EMQX_RUNNER: 'public.ecr.aws/debian/debian:stable-20240612-slim' EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} diff --git a/Makefile b/Makefile index b73c39a11..14a5fa228 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts export EMQX_RELUP ?= true export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-8:1.15.7-26.2.5-2-debian12 -export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:12-slim +export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:stable-20240612-slim export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index a81d3dbc2..7377709a2 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -1,5 +1,5 @@ ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-8:1.15.7-26.2.5-2-debian12 -ARG RUN_FROM=public.ecr.aws/debian/debian:12-slim +ARG RUN_FROM=public.ecr.aws/debian/debian:stable-20240612-slim ARG SOURCE_TYPE=src # tgz FROM ${BUILD_FROM} as builder_src From 954adc71c4b899f4ca6bfefe68230338508247ae Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 25 Jun 2024 17:55:31 -0300 Subject: [PATCH 19/23] test: attempt to fix flaky test https://github.com/emqx/emqx/actions/runs/9662725303/job/26653594859?pr=13328#step:6:186 ``` %%% emqx_resource_SUITE ==> t_expiration_retry: FAILED %%% emqx_resource_SUITE ==> {{panic, #{msg => "Unexpected result", result => {run_stage_failed,error, {badmatch,{ok,timeout}}, [{emqx_resource_SUITE,'-do_t_expiration_retry/0-fun-12-',0, [{file, "/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"}, {line,2569}]}, {emqx_resource_SUITE,do_t_expiration_retry,0, [{file, "/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"}, {line,2518}]}]}}}, [{emqx_resource_SUITE,do_t_expiration_retry,0, [{file,"/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"}, {line,2594}]}, {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1793}]}, {test_server,run_test_case_eval1,6,[{file,"test_server.erl"},{line,1302}]}, {test_server,run_test_case_eval,9,[{file,"test_server.erl"},{line,1234}]}]} ``` --- .../test/emqx_resource_SUITE.erl | 57 ++++++++++++------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 764c65e6f..af9abe95b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -29,6 +29,9 @@ -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). -define(TELEMETRY_PREFIX, emqx, resource). +-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX), + {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX} +). -import(emqx_common_test_helpers, [on_exit/1]). @@ -2494,7 +2497,7 @@ t_expiration_retry(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(). + do_t_expiration_retry(#{is_batch => false}). t_expiration_retry_batch(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), @@ -2511,20 +2514,17 @@ t_expiration_retry_batch(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(). + do_t_expiration_retry(#{is_batch => true}). -do_t_expiration_retry() -> +do_t_expiration_retry(Context) -> + IsBatch = maps:get(is_batch, Context), ResumeInterval = 300, ?check_trace( + #{timetrap => 10_000}, begin ok = emqx_resource:simple_sync_query(?ID, block), - {ok, SRef0} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := buffer_worker_flush_nack}), - 1, - 200 - ), - TimeoutMS = 100, + TimeoutMS = 200, %% the request that expires must be first, so it's the %% head of the inflight table (and retriable). {ok, SRef1} = snabbkaffe:subscribe( @@ -2542,6 +2542,8 @@ do_t_expiration_retry() -> ) ) end), + %% This second message must be enqueued while the resource is blocked by the + %% previous message. Pid1 = spawn_link(fun() -> receive @@ -2556,22 +2558,33 @@ do_t_expiration_retry() -> ) ) end), + ?tp("waiting for first message to be appended to the queue", #{}), {ok, _} = snabbkaffe:receive_events(SRef1), + + ?tp("waiting for first message to expire during blocked retries", #{}), + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_expired}), + + %% Now we wait until the worker tries the second message at least once before + %% unblocking it. Pid1 ! go, - {ok, _} = snabbkaffe:receive_events(SRef0), + ?tp("waiting for second message to be retried and be nacked while blocked", #{}), + case IsBatch of + false -> + {ok, _} = ?block_until(#{ + ?snk_kind := buffer_worker_flush_nack, + batch_or_query := ?QUERY(_, {inc_counter, 2}, _, _, _) + }); + true -> + {ok, _} = ?block_until(#{ + ?snk_kind := buffer_worker_flush_nack, + batch_or_query := [?QUERY(_, {inc_counter, 2}, _, _, _) | _] + }) + end, - {ok, _} = - ?block_until( - #{?snk_kind := buffer_worker_retry_expired}, - ResumeInterval * 10 - ), - - {ok, {ok, _}} = - ?wait_async_action( - emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := buffer_worker_retry_inflight_succeeded}, - ResumeInterval * 5 - ), + %% Bypass the buffer worker and unblock the resource. + ok = emqx_resource:simple_sync_query(?ID, resume), + ?tp("waiting for second message to be retried and be acked, unblocking", #{}), + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_inflight_succeeded}), ok end, From ef89afae3eceac4b2314bf0c1e87ece7ad121aba Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 26 Jun 2024 10:03:35 -0300 Subject: [PATCH 20/23] ci: add debugging info when building for docker tests --- .github/workflows/build_docker_for_test.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_docker_for_test.yaml b/.github/workflows/build_docker_for_test.yaml index 8090bcc22..d5c07386e 100644 --- a/.github/workflows/build_docker_for_test.yaml +++ b/.github/workflows/build_docker_for_test.yaml @@ -52,9 +52,13 @@ jobs: run: | CID=$(docker run -d --rm -P $_EMQX_DOCKER_IMAGE_TAG) HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID) - ./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT + ./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT || { + docker logs $CID + exit 1 + } docker stop $CID - name: export docker image + if: always() run: | docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 From e81494a132c2a7de667bd69d63875a7e0ac5ca42 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 26 Jun 2024 09:57:47 -0300 Subject: [PATCH 21/23] fix: don't crash if application is already loaded ``` Runtime terminating during boot ({{badmatch,{error,{already_loaded,wolff}}},[{emqx_conf,load,2,[{file,"emqx_conf.erl"},{line,167}]},{lists,foreach_1,2,[{file,"lists.erl"},{line,1686}]},{emqx_conf,dump_schema,2,[{file,"emqx_conf.erl"},{line,150}]},{erl_eval,do_apply,7,[{file,"erl_eval.erl"},{line,746}]},{erl_eval,expr,6,[{file,"erl_eval.erl"},{line,494}]},{erl_eval,exprs,6,[{file,"erl_eval.erl"},{line,136}]},{init,start_it,1,[]},{init,start_em,1,[]}]}) ``` --- apps/emqx_conf/src/emqx_conf.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 09883dc63..a181fbf5a 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -163,8 +163,13 @@ dump_schema(Dir, SchemaModule) -> ), emqx_dashboard:save_dispatch_eterm(SchemaModule). -load(emqx_enterprise_schema, emqx_telemetry) -> ignore; -load(_, Lib) -> ok = application:load(Lib). +load(emqx_enterprise_schema, emqx_telemetry) -> + ignore; +load(_, Lib) -> + case application:load(Lib) of + ok -> ok; + {error, {already_loaded, _}} -> ok + end. %% for scripts/spellcheck. gen_schema_json(Dir, SchemaModule, Lang) -> From 82e7b75a02a8beaf9a630d59b34da0e5f5f68784 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 26 Jun 2024 22:47:18 +0200 Subject: [PATCH 22/23] chore: bump app versions --- apps/emqx/src/emqx.app.src | 2 +- apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src | 2 +- apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src | 2 +- apps/emqx_conf/src/emqx_conf.app.src | 2 +- apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src | 2 +- apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src | 2 +- apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src | 2 +- apps/emqx_utils/src/emqx_utils.app.src | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index c0e5161a0..1f103e7f3 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.3.2"}, + {vsn, "5.3.3"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index 8c3223e8b..8ab084323 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_greptimedb, [ {description, "EMQX GreptimeDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index 4bbe04978..da9cd1a96 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_s3, [ {description, "EMQX Enterprise S3 Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index 3ddcabbb3..dc406b735 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib]}, diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index fe237779b..1d5cb85b8 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src index bcb54e0f1..f96d112e9 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_gbt32960, [ {description, "GBT32960 Gateway"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 585410356..1dc3f6939 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index ee3342021..b2ec221e3 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.2.2"}, + {vsn, "5.2.3"}, {modules, [ emqx_utils, emqx_utils_api, From 557a843c69b3b21d9cfbef7005e1c1379bde1e72 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 26 Jun 2024 22:48:50 +0200 Subject: [PATCH 23/23] chore: minimize oss/platform diff --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index cfaa25255..30930c494 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 5a862c492..244326ee9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -55,6 +55,8 @@ %% only for testing/mocking -export([supported_versions/1]). +-export([format_bridge_metrics/1, format_metrics/1]). + -define(BPAPI_NAME, emqx_bridge). -define(BRIDGE_NOT_ENABLED,