diff --git a/.github/workflows/build_docker_for_test.yaml b/.github/workflows/build_docker_for_test.yaml index 8090bcc22..d5c07386e 100644 --- a/.github/workflows/build_docker_for_test.yaml +++ b/.github/workflows/build_docker_for_test.yaml @@ -52,9 +52,13 @@ jobs: run: | CID=$(docker run -d --rm -P $_EMQX_DOCKER_IMAGE_TAG) HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID) - ./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT + ./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT || { + docker logs $CID + exit 1 + } docker stop $CID - name: export docker image + if: always() run: | docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 27648a88d..f3089d11f 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index c0e5161a0..1f103e7f3 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.3.2"}, + {vsn, "5.3.3"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index c1a9cc162..1a24cd260 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -544,8 +544,10 @@ handle_in( {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) end; -handle_in(?PACKET(?PINGREQ), Channel) -> - {ok, ?PACKET(?PINGRESP), Channel}; +handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) -> + {ok, NKeepalive} = emqx_keepalive:check(Keepalive), + NChannel = Channel#channel{keepalive = NKeepalive}, + {ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)}; handle_in( ?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo} @@ -1229,11 +1231,12 @@ handle_call( {keepalive, Interval}, Channel = #channel{ keepalive = KeepAlive, - conninfo = ConnInfo + conninfo = ConnInfo, + clientinfo = #{zone := Zone} } ) -> ClientId = info(clientid, Channel), - NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive), + NKeepalive = emqx_keepalive:update(Zone, Interval, KeepAlive), NConnInfo = maps:put(keepalive, Interval, ConnInfo), NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo}, SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), @@ -1333,22 +1336,22 @@ die_if_test_compiled() -> | {shutdown, Reason :: term(), channel()}. handle_timeout( _TRef, - {keepalive, _StatVal}, + keepalive, Channel = #channel{keepalive = undefined} ) -> {ok, Channel}; handle_timeout( _TRef, - {keepalive, _StatVal}, + keepalive, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - {keepalive, StatVal}, + keepalive, Channel = #channel{keepalive = Keepalive} ) -> - case emqx_keepalive:check(StatVal, Keepalive) of + case emqx_keepalive:check(Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, {ok, reset_timer(keepalive, NChannel)}; @@ -1459,10 +1462,16 @@ reset_timer(Name, Time, Channel) -> ensure_timer(Name, Time, clean_timer(Name, Channel)). clean_timer(Name, Channel = #channel{timers = Timers}) -> - Channel#channel{timers = maps:remove(Name, Timers)}. + case maps:take(Name, Timers) of + error -> + Channel; + {TRef, NTimers} -> + ok = emqx_utils:cancel_timer(TRef), + Channel#channel{timers = NTimers} + end. interval(keepalive, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_delivery, #channel{session = Session}) -> emqx_session:info(retry_interval, Session); interval(expire_awaiting_rel, #channel{session = Session}) -> @@ -2320,9 +2329,7 @@ ensure_keepalive_timer(0, Channel) -> ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> - Multiplier = get_mqtt_conf(Zone, keepalive_multiplier), - RecvCnt = emqx_pd:get_counter(recv_pkt), - Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)), + Keepalive = emqx_keepalive:init(Zone, Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). clear_keepalive(Channel = #channel{timers = Timers}) -> diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index ed62fb63c..517a5cc2f 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -729,9 +729,7 @@ handle_timeout( disconnected -> {ok, State}; _ -> - %% recv_pkt: valid MQTT message - RecvCnt = emqx_pd:get_counter(recv_pkt), - handle_timeout(TRef, {keepalive, RecvCnt}, State) + with_channel(handle_timeout, [TRef, keepalive], State) end; handle_timeout(TRef, Msg, State) -> with_channel(handle_timeout, [TRef, Msg], State). diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 0b02ad1f5..4b4a2d5cf 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -287,14 +287,25 @@ parse_connect(FrameBin, StrictMode) -> % Note: return malformed if reserved flag is not 0. parse_connect2( ProtoName, - <>, + <>, StrictMode ) -> case Reserved of 0 -> ok; 1 -> ?PARSE_ERR(reserved_connect_flag) end, + WillFlag = bool(WillFlagB), + WillRetain = bool(WillRetainB), + case WillFlag of + %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] + false when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos); + %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] + true when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos); + %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] + false when WillRetain -> ?PARSE_ERR(invalid_will_retain); + _ -> ok + end, {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), {ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid), ConnPacket = #mqtt_packet_connect{ @@ -304,9 +315,9 @@ parse_connect2( %% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html is_bridge = (BridgeTag =:= 8), clean_start = bool(CleanStart), - will_flag = bool(WillFlag), + will_flag = WillFlag, will_qos = WillQoS, - will_retain = bool(WillRetain), + will_retain = WillRetain, keepalive = KeepAlive, properties = Properties, clientid = ClientId diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 8ed685db2..785893d2d 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -19,10 +19,12 @@ -export([ init/1, init/2, + init/3, info/1, info/2, + check/1, check/2, - update/2 + update/3 ]). -elvis([{elvis_style, no_if_expression, disable}]). @@ -30,8 +32,12 @@ -export_type([keepalive/0]). -record(keepalive, { - interval :: pos_integer(), - statval :: non_neg_integer() + check_interval :: pos_integer(), + %% the received packets since last keepalive check + statval :: non_neg_integer(), + %% The number of idle intervals allowed before disconnecting the client. + idle_milliseconds = 0 :: non_neg_integer(), + max_idle_millisecond :: pos_integer() }). -opaque keepalive() :: #keepalive{}. @@ -39,7 +45,11 @@ %% @doc Init keepalive. -spec init(Interval :: non_neg_integer()) -> keepalive(). -init(Interval) -> init(0, Interval). +init(Interval) -> init(default, 0, Interval). + +init(Zone, Interval) -> + RecvCnt = emqx_pd:get_counter(recv_pkt), + init(Zone, RecvCnt, Interval). %% from mqtt-v3.1.1 specific %% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. @@ -53,42 +63,88 @@ init(Interval) -> init(0, Interval). %% typically this is a few minutes. %% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds. %% @doc Init keepalive. --spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined. -init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL -> - #keepalive{interval = Interval, statval = StatVal}; -init(_, 0) -> +-spec init( + Zone :: atom(), + StatVal :: non_neg_integer(), + Second :: non_neg_integer() +) -> keepalive() | undefined. +init(Zone, StatVal, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL -> + #{keepalive_multiplier := Mul, keepalive_check_interval := CheckInterval} = + emqx_config:get_zone_conf(Zone, [mqtt]), + MilliSeconds = timer:seconds(Second), + Interval = emqx_utils:clamp(CheckInterval, 1000, max(MilliSeconds div 2, 1000)), + MaxIdleMs = ceil(MilliSeconds * Mul), + #keepalive{ + check_interval = Interval, + statval = StatVal, + idle_milliseconds = 0, + max_idle_millisecond = MaxIdleMs + }; +init(_Zone, _, 0) -> undefined; -init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL). +init(Zone, StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(Zone, StatVal, ?MAX_INTERVAL). %% @doc Get Info of the keepalive. -spec info(keepalive()) -> emqx_types:infos(). info(#keepalive{ - interval = Interval, - statval = StatVal + check_interval = Interval, + statval = StatVal, + idle_milliseconds = IdleIntervals, + max_idle_millisecond = MaxMs }) -> #{ - interval => Interval, - statval => StatVal + check_interval => Interval, + statval => StatVal, + idle_milliseconds => IdleIntervals, + max_idle_millisecond => MaxMs }. --spec info(interval | statval, keepalive()) -> +-spec info(check_interval | statval | idle_milliseconds, keepalive()) -> non_neg_integer(). -info(interval, #keepalive{interval = Interval}) -> +info(check_interval, #keepalive{check_interval = Interval}) -> Interval; info(statval, #keepalive{statval = StatVal}) -> StatVal; -info(interval, undefined) -> +info(idle_milliseconds, #keepalive{idle_milliseconds = Val}) -> + Val; +info(check_interval, undefined) -> 0. +check(Keepalive = #keepalive{}) -> + RecvCnt = emqx_pd:get_counter(recv_pkt), + check(RecvCnt, Keepalive); +check(Keepalive) -> + {ok, Keepalive}. + %% @doc Check keepalive. -spec check(non_neg_integer(), keepalive()) -> {ok, keepalive()} | {error, timeout}. -check(Val, #keepalive{statval = Val}) -> {error, timeout}; -check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}. + +check( + NewVal, + #keepalive{ + statval = NewVal, + idle_milliseconds = IdleAcc, + check_interval = Interval, + max_idle_millisecond = Max + } +) when IdleAcc + Interval >= Max -> + {error, timeout}; +check( + NewVal, + #keepalive{ + statval = NewVal, + idle_milliseconds = IdleAcc, + check_interval = Interval + } = KeepAlive +) -> + {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = IdleAcc + Interval}}; +check(NewVal, #keepalive{} = KeepAlive) -> + {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = 0}}. %% @doc Update keepalive. %% The statval of the previous keepalive will be used, %% and normal checks will begin from the next cycle. --spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. -update(Interval, undefined) -> init(0, Interval); -update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval). +-spec update(atom(), non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. +update(Zone, Interval, undefined) -> init(Zone, 0, Interval); +update(Zone, Interval, #keepalive{statval = StatVal}) -> init(Zone, StatVal, Interval). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 63b77f8d2..6b02a4d4b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -3491,6 +3491,7 @@ mqtt_general() -> )}, {"max_clientid_len", sc( + %% MQTT-v3.1.1-[MQTT-3.1.3-5], MQTT-v5.0-[MQTT-3.1.3-5] range(23, 65535), #{ default => 65535, @@ -3612,9 +3613,17 @@ mqtt_general() -> desc => ?DESC(mqtt_keepalive_multiplier) } )}, + {"keepalive_check_interval", + sc( + timeout_duration(), + #{ + default => <<"30s">>, + desc => ?DESC(mqtt_keepalive_check_interval) + } + )}, {"retry_interval", sc( - duration(), + timeout_duration(), #{ default => <<"30s">>, desc => ?DESC(mqtt_retry_interval) diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 038f3e98e..e46bdc313 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -555,8 +555,7 @@ handle_info(Info, State) -> handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> shutdown(idle_timeout, State); handle_timeout(TRef, keepalive, State) when is_reference(TRef) -> - RecvOct = emqx_pd:get_counter(recv_oct), - handle_timeout(TRef, {keepalive, RecvOct}, State); + with_channel(handle_timeout, [TRef, keepalive], State); handle_timeout( TRef, emit_stats, diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 28f542f81..568f5de20 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -428,6 +428,7 @@ zone_global_defaults() -> ignore_loop_deliver => false, keepalive_backoff => 0.75, keepalive_multiplier => 1.5, + keepalive_check_interval => 30000, max_awaiting_rel => 100, max_clientid_len => 65535, max_inflight => 32, diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 8193f9c31..2457c3faf 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -64,7 +64,10 @@ groups() -> t_malformed_connect_header, t_malformed_connect_data, t_reserved_connect_flag, - t_invalid_clientid + t_invalid_clientid, + t_undefined_password, + t_invalid_will_retain, + t_invalid_will_qos ]}, {connack, [parallel], [ t_serialize_parse_connack, @@ -738,6 +741,56 @@ t_undefined_password(_) -> ), ok. +t_invalid_will_retain(_) -> + ConnectFlags = <<2#01100000>>, + ConnectBin = + <<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55, + 122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84, + 54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>, + ?assertException( + throw, + {frame_parse_error, invalid_will_retain}, + emqx_frame:parse(ConnectBin) + ), + ok. + +t_invalid_will_qos(_) -> + Will_F_WillQoS0 = <<2#010:3, 2#00:2, 2#000:3>>, + Will_F_WillQoS1 = <<2#010:3, 2#01:2, 2#000:3>>, + Will_F_WillQoS2 = <<2#010:3, 2#10:2, 2#000:3>>, + Will_F_WillQoS3 = <<2#010:3, 2#11:2, 2#000:3>>, + Will_T_WillQoS3 = <<2#011:3, 2#11:2, 2#000:3>>, + ConnectBinFun = fun(ConnectFlags) -> + <<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55, + 122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84, + 54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>> + end, + ?assertMatch( + {ok, _, _, _}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS0)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3)) + ), + ?assertException( + throw, + {frame_parse_error, invalid_will_qos}, + emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3)) + ), + ok. + parse_serialize(Packet) -> parse_serialize(Packet, #{strict_mode => true}). diff --git a/apps/emqx/test/emqx_keepalive_SUITE.erl b/apps/emqx/test/emqx_keepalive_SUITE.erl index 7773774a7..84f66b3a5 100644 --- a/apps/emqx/test/emqx_keepalive_SUITE.erl +++ b/apps/emqx/test/emqx_keepalive_SUITE.erl @@ -19,22 +19,180 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "listeners {" + "tcp.default.bind = 1883," + "ssl.default = marked_for_deletion," + "quic.default = marked_for_deletion," + "ws.default = marked_for_deletion," + "wss.default = marked_for_deletion" + "}"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(apps, Config)). + +t_check_keepalive_default_timeout(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000), + erlang:process_flag(trap_exit, true), + ClientID = <<"default">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(5000, CheckInterval), + %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000) + %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000) + %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout + Timeout = CheckInterval * 3, + %% connector but not send a packet. + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)). + +t_check_keepalive_other_timeout(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 2000), + erlang:process_flag(trap_exit, true), + ClientID = <<"other">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + {ok, _, [0]} = emqtt:subscribe(C, <<"mytopic">>, []), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + %%CheckInterval = ceil(keepalive_check_factor() * KeepaliveSec * 1000), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(2000, CheckInterval), + %% when keepalive_check_interval is 2s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% subscribe T1(packet = 2, idle_milliseconds = 0) + %% check1 T2(packet = 2, idle_milliseconds = 1 * CheckInterval = 2000) + %% check2 T3(packet = 2, idle_milliseconds = 2 * CheckInterval = 4000) + %% check3 T4(packet = 2, idle_milliseconds = 3 * CheckInterval = 6000) + %% check4 T5(packet = 2, idle_milliseconds = 4 * CheckInterval = 8000) + %% check4 T6(packet = 2, idle_milliseconds = 5 * CheckInterval = 10000) + %% check4 T7(packet = 2, idle_milliseconds = 6 * CheckInterval = 12000) + %% check4 T8(packet = 2, idle_milliseconds = 7 * CheckInterval = 14000) + %% check4 T9(packet = 2, idle_milliseconds = 8 * CheckInterval = 16000) > 15000 timeout + Timeout = CheckInterval * 9, + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200), Timeout). + +t_check_keepalive_ping_reset_timer(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 100000), + erlang:process_flag(trap_exit, true), + ClientID = <<"ping_reset">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + ct:sleep(1000), + emqtt:resume(C), + pong = emqtt:ping(C), + emqtt:pause(C), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(5000, CheckInterval), + %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% sleep 1000ms + %% ping (packet = 2, idle_milliseconds = 0) restart timer + %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000) + %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000) + %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout + Timeout = CheckInterval * 3, + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)). + t_check(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000), Keepalive = emqx_keepalive:init(60), - ?assertEqual(60, emqx_keepalive:info(interval, Keepalive)), + ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive)), ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)), Info = emqx_keepalive:info(Keepalive), ?assertEqual( #{ - interval => 60, - statval => 0 + check_interval => 30000, + statval => 0, + idle_milliseconds => 0, + %% 60 * 1.5 * 1000 + max_idle_millisecond => 90000 }, Info ), {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive), ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)), - ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)). + {ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1), + ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)), + {ok, Keepalive3} = emqx_keepalive:check(1, Keepalive2), + ?assertEqual(1, emqx_keepalive:info(statval, Keepalive3)), + ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive3)), + + Keepalive4 = emqx_keepalive:init(90), + ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive4)), + + Keepalive5 = emqx_keepalive:init(1), + ?assertEqual(1000, emqx_keepalive:info(check_interval, Keepalive5)), + ok. + +keepalive_multiplier() -> + emqx_config:get_zone_conf(default, [mqtt, keepalive_multiplier]). + +keepalive_check_interval() -> + emqx_config:get_zone_conf(default, [mqtt, keepalive_check_interval]). + +receive_msg_in_time(ChannelPid, C, Timeout) -> + receive + {'EXIT', ChannelPid, {shutdown, keepalive_timeout}} -> + receive + {'EXIT', C, {shutdown, tcp_closed}} -> + ok + after 500 -> + throw(no_tcp_closed_from_mqtt_client) + end + after Timeout -> + no_keepalive_timeout_received + end. diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index cfaa25255..30930c494 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 5a862c492..244326ee9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -55,6 +55,8 @@ %% only for testing/mocking -export([supported_versions/1]). +-export([format_bridge_metrics/1, format_metrics/1]). + -define(BPAPI_NAME, emqx_bridge). -define(BRIDGE_NOT_ENABLED, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 82858f00b..8cf8730b0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -945,6 +945,7 @@ t_on_get_status(Config, Opts) -> ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), FailureStatus = maps:get(failure_status, Opts, disconnected), + NormalStatus = maps:get(normal_status, Opts, connected), ?assertMatch({ok, _}, create_bridge(Config)), ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to @@ -952,7 +953,7 @@ t_on_get_status(Config, Opts) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId)) ), case ProxyHost of undefined -> @@ -971,7 +972,7 @@ t_on_get_status(Config, Opts) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId)) ) end, ok. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index c96eeeccf..9450d02f0 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -1448,7 +1448,10 @@ t_connection_down_before_starting(Config) -> ), {ok, _} = create_bridge(Config), {ok, _} = snabbkaffe:receive_events(SRef0), - ?assertMatch({ok, connecting}, health_check(Config)), + ?assertMatch( + {ok, Status} when Status =:= connecting orelse Status =:= disconnected, + health_check(Config) + ), emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), ?retry( diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index 8c3223e8b..8ab084323 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_greptimedb, [ {description, "EMQX GreptimeDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index be52f4469..e4cc0aa31 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -363,7 +363,7 @@ do_start_client( {error, Reason} end. -grpc_config() -> +grpc_opts() -> #{ sync_start => true, connect_timeout => ?CONNECT_TIMEOUT @@ -382,7 +382,7 @@ client_config( {pool, InstId}, {pool_type, random}, {auto_reconnect, ?AUTO_RECONNECT_S}, - {gprc_options, grpc_config()} + {grpc_opts, grpc_opts()} ] ++ protocol_config(Config). protocol_config( diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl index 7d3003bfa..725d24a88 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE, redis). -define(BRIDGE_TYPE_BIN, <<"redis">>). @@ -46,6 +47,7 @@ matrix_testcases() -> t_start_stop, t_create_via_http, t_on_get_status, + t_on_get_status_no_username_pass, t_sync_query, t_map_to_redis_hset_args ]. @@ -325,6 +327,43 @@ t_on_get_status(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), ok. +t_on_get_status_no_username_pass(matrix) -> + {on_get_status, [ + [single, tcp], + [cluster, tcp], + [sentinel, tcp] + ]}; +t_on_get_status_no_username_pass(Config0) when is_list(Config0) -> + ConnectorConfig0 = ?config(connector_config, Config0), + ConnectorConfig1 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"password">>], ConnectorConfig0, <<"">> + ), + ConnectorConfig2 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"username">>], ConnectorConfig1, <<"">> + ), + Config1 = proplists:delete(connector_config, Config0), + Config2 = [{connector_config, ConnectorConfig2} | Config1], + ?check_trace( + emqx_bridge_v2_testlib:t_on_get_status( + Config2, + #{ + failure_status => disconnected, + normal_status => disconnected + } + ), + fun(ok, Trace) -> + case ?config(redis_type, Config2) of + single -> + ?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace)); + sentinel -> + ?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace)); + cluster -> + ok + end + end + ), + ok. + t_sync_query(matrix) -> {sync_query, [ [single, tcp], diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index 4bbe04978..da9cd1a96 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_s3, [ {description, "EMQX Enterprise S3 Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 00c03fd3a..fdc6d255b 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -146,29 +146,22 @@ on_stop(InstId, _State = #{pool_name := PoolName}) -> on_get_status(_InstId, State = #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - {?status_disconnected, State, Reason}; + {?status_disconnected, State, map_error_details(Reason)}; AWSConfig -> try erlcloud_s3:list_buckets(AWSConfig) of Props when is_list(Props) -> ?status_connected catch - error:{aws_error, {http_error, _Code, _, Reason}} -> - {?status_disconnected, State, Reason}; - error:{aws_error, {socket_error, Reason}} -> - {?status_disconnected, State, Reason} + error:Error -> + {?status_disconnected, State, map_error_details(Error)} end end. -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - try - ChannelState = start_channel(State, Config), - {ok, State#{channels => Channels#{ChannelId => ChannelState}}} - catch - throw:Reason -> - {error, Reason} - end. + ChannelState = start_channel(State, Config), + {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. @@ -217,7 +210,8 @@ start_channel(State, #{ max_records := MaxRecords }, container := Container, - bucket := Bucket + bucket := Bucket, + key := Key } }) -> AggregId = {Type, Name}, @@ -226,7 +220,7 @@ start_channel(State, #{ max_records => MaxRecords, work_dir => work_dir(Type, Name) }, - Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)), + Template = emqx_bridge_s3_upload:mk_key_template(Key), DeliveryOpts = #{ bucket => Bucket, key => Template, @@ -253,11 +247,6 @@ start_channel(State, #{ on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. -ensure_ok({ok, V}) -> - V; -ensure_ok({error, Reason}) -> - throw(Reason). - upload_options(Parameters) -> #{acl => maps:get(acl, Parameters, undefined)}. @@ -285,7 +274,7 @@ channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, S check_bucket_accessible(Bucket, #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - throw({unhealthy_target, Reason}); + throw({unhealthy_target, map_error_details(Reason)}); AWSConfig -> try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of Props when is_list(Props) -> @@ -293,8 +282,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> catch error:{aws_error, {http_error, 404, _, _Reason}} -> throw({unhealthy_target, "Bucket does not exist"}); - error:{aws_error, {socket_error, Reason}} -> - throw({unhealthy_target, emqx_utils:format(Reason)}) + error:Error -> + throw({unhealthy_target, map_error_details(Error)}) end end. @@ -304,8 +293,7 @@ check_aggreg_upload_errors(AggregId) -> %% TODO %% This approach means that, for example, 3 upload failures will cause %% the channel to be marked as unhealthy for 3 consecutive health checks. - ErrorMessage = emqx_utils:format(Error), - throw({unhealthy_target, ErrorMessage}); + throw({unhealthy_target, map_error_details(Error)}); [] -> ok end. @@ -384,16 +372,38 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ok; {error, Reason} -> - {error, {unrecoverable_error, Reason}} + {error, {unrecoverable_error, emqx_utils:explain_posix(Reason)}} end. -map_error({socket_error, _} = Reason) -> - {recoverable_error, Reason}; -map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> +map_error(Error) -> + {map_error_class(Error), map_error_details(Error)}. + +map_error_class({s3_error, _, _}) -> + unrecoverable_error; +map_error_class({aws_error, Error}) -> + map_error_class(Error); +map_error_class({socket_error, _}) -> + recoverable_error; +map_error_class({http_error, Status, _, _}) when Status >= 500 -> %% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList - {recoverable_error, Reason}; -map_error(Reason) -> - {unrecoverable_error, Reason}. + recoverable_error; +map_error_class(_Error) -> + unrecoverable_error. + +map_error_details({s3_error, Code, Message}) -> + emqx_utils:format("S3 error: ~s ~s", [Code, Message]); +map_error_details({aws_error, Error}) -> + map_error_details(Error); +map_error_details({socket_error, Reason}) -> + emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]); +map_error_details({http_error, _, _, _} = Error) -> + emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]); +map_error_details({failed_to_obtain_credentials, Error}) -> + emqx_utils:format("Unable to obtain AWS credentials: ~s", [map_error_details(Error)]); +map_error_details({upload_failed, Error}) -> + map_error_details(Error); +map_error_details(Error) -> + Error. render_bucket(Template, Data) -> case emqx_template:render(Template, {emqx_jsonish, Data}) of @@ -416,6 +426,32 @@ render_content(Template, Data) -> iolist_to_string(IOList) -> unicode:characters_to_list(IOList). +%% + +-include_lib("xmerl/include/xmerl.hrl"). + +-spec map_aws_error_details(_AWSError) -> + unicode:chardata(). +map_aws_error_details({http_error, _Status, _, Body}) -> + try xmerl_scan:string(unicode:characters_to_list(Body), [{quiet, true}]) of + {Error = #xmlElement{name = 'Error'}, _} -> + map_aws_error_details(Error); + _ -> + Body + catch + exit:_ -> + Body + end; +map_aws_error_details(#xmlElement{content = Content}) -> + Code = extract_xml_text(lists:keyfind('Code', #xmlElement.name, Content)), + Message = extract_xml_text(lists:keyfind('Message', #xmlElement.name, Content)), + [Code, $:, $\s | Message]. + +extract_xml_text(#xmlElement{content = Content}) -> + [Fragment || #xmlText{value = Fragment} <- Content]; +extract_xml_text(false) -> + []. + %% `emqx_connector_aggreg_delivery` APIs -spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t(). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl index 2bf12f24b..bedefc7c5 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -29,7 +29,10 @@ ]). %% Internal exports --export([convert_actions/2]). +-export([ + convert_actions/2, + validate_key_template/1 +]). -define(DEFAULT_AGGREG_BATCH_SIZE, 100). -define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>). @@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) -> )} ], emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ - {key, #{desc => ?DESC(s3_aggregated_upload_key)}} + {key, #{ + desc => ?DESC(s3_aggregated_upload_key), + validator => fun ?MODULE:validate_key_template/1 + }} ]), emqx_s3_schema:fields(s3_uploader) ]); @@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou Conf#{<<"resource_opts">> := NResourceOpts} end. -%% Interpreting options - --spec mk_key_template(_Parameters :: map()) -> - {ok, emqx_template:str()} | {error, _Reason}. -mk_key_template(#{key := Key}) -> - Template = emqx_template:parse(Key), +validate_key_template(Conf) -> + Template = emqx_template:parse(Conf), case validate_bindings(emqx_template:placeholders(Template)) of - UsedBindings when is_list(UsedBindings) -> - SuffixTemplate = mk_suffix_template(UsedBindings), - case emqx_template:is_const(SuffixTemplate) of - true -> - {ok, Template}; - false -> - {ok, Template ++ SuffixTemplate} - end; - Error = {error, _} -> - Error + Bindings when is_list(Bindings) -> + ok; + {error, {disallowed_placeholders, Disallowed}} -> + {error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])} end. validate_bindings(Bindings) -> @@ -276,7 +272,22 @@ validate_bindings(Bindings) -> [] -> Bindings; Disallowed -> - {error, {invalid_key_template, {disallowed_placeholders, Disallowed}}} + {error, {disallowed_placeholders, Disallowed}} + end. + +%% Interpreting options + +-spec mk_key_template(unicode:chardata()) -> + emqx_template:str(). +mk_key_template(Key) -> + Template = emqx_template:parse(Key), + UsedBindings = emqx_template:placeholders(Template), + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + Template; + false -> + Template ++ SuffixTemplate end. mk_suffix_template(UsedBindings) -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index f8eaa1b3a..ea69a346f 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -134,6 +134,22 @@ action_config(Name, ConnectorId) -> t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). +t_create_unavailable_credentials(Config) -> + ConnectorName = ?config(connector_name, Config), + ConnectorType = ?config(connector_type, Config), + ConnectorConfig = maps:without( + [<<"access_key_id">>, <<"secret_access_key">>], + ?config(connector_config, Config) + ), + ?assertMatch( + {ok, + {{_HTTP, 201, _}, _, #{ + <<"status_reason">> := + <<"Unable to obtain AWS credentials:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) + ). + t_ignore_batch_opts(Config) -> {ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config), ?assertMatch( @@ -159,6 +175,13 @@ t_start_broken_update_restart(Config) -> _Attempts = 20, ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId)) ), + ?assertMatch( + {ok, + {{_HTTP, 200, _}, _, #{ + <<"status_reason">> := <<"AWS error: SignatureDoesNotMatch:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:get_connector_api(Type, Name) + ), ?assertMatch( {ok, {{_HTTP, 200, _}, _, _}}, emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf) diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index b7c17bbaa..345c2e9aa 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -177,6 +177,27 @@ t_create_invalid_config(Config) -> ) ). +t_create_invalid_config_key_template(Config) -> + ?assertMatch( + {error, + {_Status, _, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>, + <<"path">> := <<"root.parameters.key">> + } + }}}, + emqx_bridge_v2_testlib:create_bridge_api( + Config, + _Overrides = #{ + <<"parameters">> => #{ + <<"key">> => <<"${action}/${foo}:${bar.rfc3339}">> + } + } + ) + ). + t_update_invalid_config(Config) -> ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch( diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index 3ddcabbb3..dc406b735 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib]}, diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 09883dc63..a181fbf5a 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -163,8 +163,13 @@ dump_schema(Dir, SchemaModule) -> ), emqx_dashboard:save_dispatch_eterm(SchemaModule). -load(emqx_enterprise_schema, emqx_telemetry) -> ignore; -load(_, Lib) -> ok = application:load(Lib). +load(emqx_enterprise_schema, emqx_telemetry) -> + ignore; +load(_, Lib) -> + case application:load(Lib) of + ok -> ok; + {error, {already_loaded, _}} -> ok + end. %% for scripts/spellcheck. gen_schema_json(Dir, SchemaModule, Lang) -> diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index b054988be..cfdc5820e 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -74,13 +74,14 @@ end_per_testcase(_Config) -> t_base_test(_Config) -> ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), - MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + MFA = {M, F, A} = {?MODULE, echo, [Pid, Msg]}, {ok, TnxId, ok} = multicall(M, F, A), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), ?assertEqual({ok, 2, ok}, multicall(M, F, A)), {atomic, Status} = emqx_cluster_rpc:status(), case length(Status) =:= 3 of @@ -118,9 +119,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), Pid = self(), - {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]}, {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {ok, _, ok} = multicall(M, F, A, 1, 1000), @@ -154,9 +156,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> t_commit_concurrency(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), Pid = self(), - {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, - {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), - ?assertEqual(ok, receive_msg(3, test)), + Msg = ?FUNCTION_NAME, + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]}, + ?assertEqual({ok, 1, ok}, multicall(BaseM, BaseF, BaseA)), + ?assertEqual(ok, receive_msg(3, Msg)), %% call concurrently without stale tnx_id error Workers = lists:seq(1, 256), @@ -231,23 +234,24 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {atomic, [_Status | L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ets:insert(test, {other_mfa_result, ok}), - {ok, 2, ok} = multicall(io, format, ["test"], 1, 1000), + {ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000), ct:sleep(1000), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), - MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]}, {ok, TnxId, ok} = multicall(M1, F1, A1), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), ok. t_del_stale_mfa(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - MFA = {M, F, A} = {io, format, ["test"]}, + MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]}, Keys = lists:seq(1, 50), Keys2 = lists:seq(51, 150), Ids = @@ -288,7 +292,7 @@ t_del_stale_mfa(_Config) -> t_skip_failed_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -308,7 +312,7 @@ t_skip_failed_commit(_Config) -> t_fast_forward_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -356,7 +360,11 @@ tnx_ids(Status) -> start() -> {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), + ok = emqx_cluster_rpc:wait_for_cluster_rpc(), ok = emqx_cluster_rpc:reset(), + %% Ensure all processes are idle status. + ok = gen_server:call(?NODE2, test), + ok = gen_server:call(?NODE3, test), ok. stop() -> @@ -366,6 +374,7 @@ stop() -> undefined -> ok; P -> + erlang:unregister(N), erlang:unlink(P), erlang:exit(P, kill) end @@ -379,8 +388,9 @@ receive_msg(Count, Msg) when Count > 0 -> receive Msg -> receive_msg(Count - 1, Msg) - after 1000 -> - timeout + after 1300 -> + Msg = iolist_to_binary(io_lib:format("There's still ~w messages to be received", [Count])), + {Msg, flush_msg([])} end. echo(Pid, Msg) -> @@ -425,3 +435,11 @@ multicall(M, F, A, N, T) -> multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). + +flush_msg(Acc) -> + receive + Msg -> + flush_msg([Msg | Acc]) + after 10 -> + Acc + end. diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index fbab1ff14..844677d12 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -85,7 +85,7 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). --define(DEF_IDLE_TIME, timer:seconds(30)). +-define(DEF_IDLE_SECONDS, 30). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). @@ -149,7 +149,7 @@ init( mountpoint => Mountpoint } ), - Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME), + Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_SECONDS), #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -378,7 +378,7 @@ ensure_keepalive_timer(Channel) -> ensure_keepalive_timer(fun ensure_timer/4, Channel). ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) -> - Heartbeat = emqx_keepalive:info(interval, KeepAlive), + Heartbeat = emqx_keepalive:info(check_interval, KeepAlive), Fun(keepalive, Heartbeat, keepalive, Channel). check_auth_state(Msg, #channel{connection_required = false} = Channel) -> @@ -495,7 +495,7 @@ enrich_conninfo( ) -> case Queries of #{<<"clientid">> := ClientId} -> - Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), + Interval = emqx_keepalive:info(check_interval, KeepAlive), NConnInfo = ConnInfo#{ clientid => ClientId, proto_name => <<"CoAP">>, diff --git a/apps/emqx_gateway_coap/src/emqx_coap_schema.erl b/apps/emqx_gateway_coap/src/emqx_coap_schema.erl index 7d38a2bb6..61d4b7376 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_schema.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_schema.erl @@ -19,12 +19,6 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). --type duration() :: non_neg_integer(). - --typerefl_from_string({duration/0, emqx_schema, to_duration}). - --reflect_type([duration/0]). - %% config schema provides -export([namespace/0, fields/1, desc/1]). @@ -34,7 +28,7 @@ fields(coap) -> [ {heartbeat, sc( - duration(), + emqx_schema:duration_s(), #{ default => <<"30s">>, desc => ?DESC(coap_heartbeat) diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index 3a715eac4..e9c1f2b4a 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index 3201d5dbf..bd403a463 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -100,7 +100,7 @@ init_per_testcase(t_heartbeat, Config) -> OldConf = emqx:get_raw_config([gateway, coap]), {ok, _} = emqx_gateway_conf:update_gateway( coap, - OldConf#{<<"heartbeat">> => <<"800ms">>} + OldConf#{<<"heartbeat">> => <<"1s">>} ), [ {old_conf, OldConf}, @@ -216,8 +216,9 @@ t_heartbeat(Config) -> [], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) ), - - timer:sleep(Heartbeat * 2), + %% The minimum timeout time is 1 second. + %% 1.5 * Heartbeat + 0.5 * Heartbeat(< 1s) = 1.5 * 1 + 1 = 2.5 + timer:sleep(Heartbeat * 2 + 1000), ?assertEqual( [], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index c145506c9..93646acbf 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -715,7 +715,7 @@ ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> Channel; ensure_keepalive_timer(Interval, Channel) -> StatVal = emqx_gateway_conn:keepalive_stats(recv), - Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), + Keepalive = emqx_keepalive:init(default, StatVal, Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). ensure_timer(Name, Channel = #channel{timers = Timers}) -> @@ -746,7 +746,7 @@ interval(force_close_idle, #channel{conninfo = #{idle_timeout := IdleTimeout}}) interval(force_close, _) -> 15000; interval(keepalive, #channel{keepalive = Keepalive}) -> - emqx_keepalive:info(interval, Keepalive). + emqx_keepalive:info(check_interval, Keepalive). %%-------------------------------------------------------------------- %% Dispatch diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index fe237779b..1d5cb85b8 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src index bcb54e0f1..f96d112e9 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_gbt32960, [ {description, "GBT32960 Gateway"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 9652290d3..809a79f7d 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -506,7 +506,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(alive_timer, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_timer, #channel{retx_interval = RetxIntv}) -> RetxIntv. diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src index 8e5157695..8d1e33f74 100644 --- a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_jt808, [ {description, "JT/T 808 Gateway"}, - {vsn, "0.0.3"}, + {vsn, "0.1.0"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index 876f623e9..a74214a1c 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -616,7 +616,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(alive_timer, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_timer, #channel{retx_interval = RetxIntv}) -> RetxIntv. diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 585410356..1dc3f6939 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 501308ea0..c9e109c3f 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -430,7 +430,7 @@ ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel) -> - Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))), + Keepalive = emqx_keepalive:init(Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). %%-------------------------------------------------------------------- @@ -2245,7 +2245,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(keepalive, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_delivery, #channel{session = Session}) -> emqx_mqttsn_session:info(retry_interval, Session); interval(expire_awaiting_rel, #channel{session = Session}) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 2c71e9822..9557c3214 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -1109,7 +1109,7 @@ t_keepalive(_Config) -> [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), %% will reset to max keepalive if keepalive > max keepalive #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid), - ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))), + ?assertMatch({keepalive, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))), {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]), diff --git a/apps/emqx_redis/src/emqx_redis.app.src b/apps/emqx_redis/src/emqx_redis.app.src index 660c490e6..02a251637 100644 --- a/apps/emqx_redis/src/emqx_redis.app.src +++ b/apps/emqx_redis/src/emqx_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_redis, [ {description, "EMQX Redis Database Connector"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 17a0ede49..059e9aa23 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -19,6 +19,8 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]). @@ -231,7 +233,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) -> is_unrecoverable_error(_) -> false. -on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> +on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) -> case eredis_cluster:pool_exists(PoolName) of true -> %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster. @@ -240,26 +242,51 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> %% In this case, we can directly consider it as a disconnect and then proceed to reconnect. case eredis_cluster_monitor:get_all_pools(PoolName) of [] -> - disconnected; + ?status_disconnected; [_ | _] -> - Health = eredis_cluster:ping_all(PoolName), - status_result(Health) + do_cluster_status_check(PoolName, State) end; false -> - disconnected + ?status_disconnected end; -on_get_status(_InstId, #{pool_name := PoolName}) -> - Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), - status_result(Health). - -do_get_status(Conn) -> - case eredis:q(Conn, ["PING"]) of - {ok, _} -> true; - _ -> false +on_get_status(_InstId, #{pool_name := PoolName} = State) -> + HealthCheckResoults = emqx_resource_pool:health_check_workers( + PoolName, + fun ?MODULE:do_get_status/1, + emqx_resource_pool:health_check_timeout(), + #{return_values => true} + ), + case HealthCheckResoults of + {ok, Results} -> + sum_worker_results(Results, State); + Error -> + {?status_disconnected, State, Error} end. -status_result(_Status = true) -> connected; -status_result(_Status = false) -> connecting. +do_cluster_status_check(Pool, State) -> + Pongs = eredis_cluster:qa(Pool, [<<"PING">>]), + sum_worker_results(Pongs, State). + +do_get_status(Conn) -> + eredis:q(Conn, ["PING"]). + +sum_worker_results([], _State) -> + ?status_connected; +sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) -> + ?tp(emqx_redis_auth_required_error, #{}), + %% This requires user action to fix so we set the status to disconnected + {?status_disconnected, State, {unhealthy_target, Error}}; +sum_worker_results([{ok, _} | Rest], State) -> + sum_worker_results(Rest, State); +sum_worker_results([Error | _Rest], State) -> + ?SLOG( + warning, + #{ + msg => "emqx_redis_check_status_error", + error => Error + } + ), + {?status_connecting, State, Error}. do_cmd(PoolName, cluster, {cmd, Command}) -> eredis_cluster:q(PoolName, Command); diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 764c65e6f..af9abe95b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -29,6 +29,9 @@ -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). -define(TELEMETRY_PREFIX, emqx, resource). +-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX), + {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX} +). -import(emqx_common_test_helpers, [on_exit/1]). @@ -2494,7 +2497,7 @@ t_expiration_retry(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(). + do_t_expiration_retry(#{is_batch => false}). t_expiration_retry_batch(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), @@ -2511,20 +2514,17 @@ t_expiration_retry_batch(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(). + do_t_expiration_retry(#{is_batch => true}). -do_t_expiration_retry() -> +do_t_expiration_retry(Context) -> + IsBatch = maps:get(is_batch, Context), ResumeInterval = 300, ?check_trace( + #{timetrap => 10_000}, begin ok = emqx_resource:simple_sync_query(?ID, block), - {ok, SRef0} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := buffer_worker_flush_nack}), - 1, - 200 - ), - TimeoutMS = 100, + TimeoutMS = 200, %% the request that expires must be first, so it's the %% head of the inflight table (and retriable). {ok, SRef1} = snabbkaffe:subscribe( @@ -2542,6 +2542,8 @@ do_t_expiration_retry() -> ) ) end), + %% This second message must be enqueued while the resource is blocked by the + %% previous message. Pid1 = spawn_link(fun() -> receive @@ -2556,22 +2558,33 @@ do_t_expiration_retry() -> ) ) end), + ?tp("waiting for first message to be appended to the queue", #{}), {ok, _} = snabbkaffe:receive_events(SRef1), + + ?tp("waiting for first message to expire during blocked retries", #{}), + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_expired}), + + %% Now we wait until the worker tries the second message at least once before + %% unblocking it. Pid1 ! go, - {ok, _} = snabbkaffe:receive_events(SRef0), + ?tp("waiting for second message to be retried and be nacked while blocked", #{}), + case IsBatch of + false -> + {ok, _} = ?block_until(#{ + ?snk_kind := buffer_worker_flush_nack, + batch_or_query := ?QUERY(_, {inc_counter, 2}, _, _, _) + }); + true -> + {ok, _} = ?block_until(#{ + ?snk_kind := buffer_worker_flush_nack, + batch_or_query := [?QUERY(_, {inc_counter, 2}, _, _, _) | _] + }) + end, - {ok, _} = - ?block_until( - #{?snk_kind := buffer_worker_retry_expired}, - ResumeInterval * 10 - ), - - {ok, {ok, _}} = - ?wait_async_action( - emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := buffer_worker_retry_inflight_succeeded}, - ResumeInterval * 5 - ), + %% Bypass the buffer worker and unblock the resource. + ok = emqx_resource:simple_sync_query(?ID, resume), + ?tp("waiting for second message to be retried and be acked, unblocking", #{}), + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_inflight_succeeded}), ok end, diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index ee3342021..b2ec221e3 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.2.2"}, + {vsn, "5.2.3"}, {modules, [ emqx_utils, emqx_utils_api, diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 536b427b3..e4f0d91d1 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -65,6 +65,7 @@ flattermap/2, tcp_keepalive_opts/4, format/1, + format/2, call_first_defined/1, ntoa/1, foldl_while/3, @@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) -> format(Term) -> iolist_to_binary(io_lib:format("~0p", [Term])). +format(Fmt, Args) -> + unicode:characters_to_binary(io_lib:format(Fmt, Args)). + -spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return(). call_first_defined([{Module, Function, Args} | Rest]) -> try diff --git a/changes/ce/fix-13222.en.md b/changes/ce/fix-13222.en.md new file mode 100644 index 000000000..0fc7a40ac --- /dev/null +++ b/changes/ce/fix-13222.en.md @@ -0,0 +1,5 @@ +Fix the flags check and error handling related to the Will message in the `CONNECT` packet. +See also: +- MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] +- MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] +- MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] diff --git a/changes/ce/fix-13307.en.md b/changes/ce/fix-13307.en.md new file mode 100644 index 000000000..d15732586 --- /dev/null +++ b/changes/ce/fix-13307.en.md @@ -0,0 +1,7 @@ +Upgrade ekka lib to 0.19.5 + +ekka 0.19.5 uses mria 0.8.8 that improves auto-heal functionality. +Previously, the auto-heal worked only when all core nodes were reachable again. +This update allows to apply auto-heal once the majority of core nodes are alive. + +[Mria PR](https://github.com/emqx/mria/pull/180) diff --git a/changes/ee/breaking-13332.en.md b/changes/ee/breaking-13332.en.md new file mode 100644 index 000000000..0b5bf5896 --- /dev/null +++ b/changes/ee/breaking-13332.en.md @@ -0,0 +1,4 @@ +When an S3 Bridge is improperly configured, error messages now contain more informative and easy to read details. + +## Breaking changes +* S3 Bridge configuration with invalid aggregated upload key template will no longer work. Before this change, such configuration was considered valid but the bridge would never work anyway. diff --git a/changes/ee/fix-13305.en.md b/changes/ee/fix-13305.en.md new file mode 100644 index 000000000..1936a49e3 --- /dev/null +++ b/changes/ee/fix-13305.en.md @@ -0,0 +1 @@ +Improved error handling for Redis connectors. Previously, Redis connectors of type single or sentinel would always encounter a timeout error during the connector test in the dashboard if no username or password was provided. This update ensures that users now receive an informative error message in such scenarios. Additionally, more detailed error information has been added for all Redis connector types to enhance diagnostics and troubleshooting. diff --git a/mix.exs b/mix.exs index 6ee0c73e5..2cc48d979 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.19.3", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.4.3", override: true}, diff --git a/rebar.config b/rebar.config index 346014c17..2be8656a6 100644 --- a/rebar.config +++ b/rebar.config @@ -83,7 +83,7 @@ {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.3"}}}, diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index e80f36817..f9978fe6f 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -855,6 +855,15 @@ The default value 1.5 is following the MQTT 5.0 specification. This multiplier i mqtt_keepalive_multiplier.label: """Keep Alive Multiplier""" +mqtt_keepalive_check_interval.desc: +"""The frequency of checking for incoming MQTT packets determines how often the server will check for new MQTT packets. +If a certain amount of time passes without any packets being sent from the client, this time will be added up. +Once the accumulated time exceeds `keepalive-interval * keepalive-multiplier`, the connection will be terminated. +The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of `keepalive-interval / 2`.""" + +mqtt_keepalive_check_interval.label: +"""Keep Alive Check Interval""" + force_gc_bytes.desc: """GC the process after specified number of bytes have passed through."""