Merge remote-tracking branch 'origin/release-572' into release-57

This commit is contained in:
zmstone 2024-06-26 22:28:08 +02:00
commit 98f70ea8d8
42 changed files with 704 additions and 186 deletions

View File

@ -52,9 +52,13 @@ jobs:
run: |
CID=$(docker run -d --rm -P $_EMQX_DOCKER_IMAGE_TAG)
HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID)
./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT
./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT || {
docker logs $CID
exit 1
}
docker stop $CID
- name: export docker image
if: always()
run: |
docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1

View File

@ -28,7 +28,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},

View File

@ -544,8 +544,10 @@ handle_in(
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
end;
handle_in(?PACKET(?PINGREQ), Channel) ->
{ok, ?PACKET(?PINGRESP), Channel};
handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
{ok, NKeepalive} = emqx_keepalive:check(Keepalive),
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)};
handle_in(
?DISCONNECT_PACKET(ReasonCode, Properties),
Channel = #channel{conninfo = ConnInfo}
@ -1229,11 +1231,12 @@ handle_call(
{keepalive, Interval},
Channel = #channel{
keepalive = KeepAlive,
conninfo = ConnInfo
conninfo = ConnInfo,
clientinfo = #{zone := Zone}
}
) ->
ClientId = info(clientid, Channel),
NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive),
NKeepalive = emqx_keepalive:update(Zone, Interval, KeepAlive),
NConnInfo = maps:put(keepalive, Interval, ConnInfo),
NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
@ -1333,22 +1336,22 @@ die_if_test_compiled() ->
| {shutdown, Reason :: term(), channel()}.
handle_timeout(
_TRef,
{keepalive, _StatVal},
keepalive,
Channel = #channel{keepalive = undefined}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, _StatVal},
keepalive,
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, StatVal},
keepalive,
Channel = #channel{keepalive = Keepalive}
) ->
case emqx_keepalive:check(StatVal, Keepalive) of
case emqx_keepalive:check(Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(keepalive, NChannel)};
@ -1459,10 +1462,16 @@ reset_timer(Name, Time, Channel) ->
ensure_timer(Name, Time, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
case maps:take(Name, Timers) of
error ->
Channel;
{TRef, NTimers} ->
ok = emqx_utils:cancel_timer(TRef),
Channel#channel{timers = NTimers}
end.
interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_delivery, #channel{session = Session}) ->
emqx_session:info(retry_interval, Session);
interval(expire_awaiting_rel, #channel{session = Session}) ->
@ -2320,9 +2329,7 @@ ensure_keepalive_timer(0, Channel) ->
ensure_keepalive_timer(disabled, Channel) ->
Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Multiplier = get_mqtt_conf(Zone, keepalive_multiplier),
RecvCnt = emqx_pd:get_counter(recv_pkt),
Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
Keepalive = emqx_keepalive:init(Zone, Interval),
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->

View File

@ -729,9 +729,7 @@ handle_timeout(
disconnected ->
{ok, State};
_ ->
%% recv_pkt: valid MQTT message
RecvCnt = emqx_pd:get_counter(recv_pkt),
handle_timeout(TRef, {keepalive, RecvCnt}, State)
with_channel(handle_timeout, [TRef, keepalive], State)
end;
handle_timeout(TRef, Msg, State) ->
with_channel(handle_timeout, [TRef, Msg], State).

View File

@ -287,14 +287,25 @@ parse_connect(FrameBin, StrictMode) ->
% Note: return malformed if reserved flag is not 0.
parse_connect2(
ProtoName,
<<BridgeTag:4, ProtoVer:4, UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2, WillFlag:1,
CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
<<BridgeTag:4, ProtoVer:4, UsernameFlag:1, PasswordFlag:1, WillRetainB:1, WillQoS:2,
WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
StrictMode
) ->
case Reserved of
0 -> ok;
1 -> ?PARSE_ERR(reserved_connect_flag)
end,
WillFlag = bool(WillFlagB),
WillRetain = bool(WillRetainB),
case WillFlag of
%% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
false when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
true when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
false when WillRetain -> ?PARSE_ERR(invalid_will_retain);
_ -> ok
end,
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
ConnPacket = #mqtt_packet_connect{
@ -304,9 +315,9 @@ parse_connect2(
%% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
is_bridge = (BridgeTag =:= 8),
clean_start = bool(CleanStart),
will_flag = bool(WillFlag),
will_flag = WillFlag,
will_qos = WillQoS,
will_retain = bool(WillRetain),
will_retain = WillRetain,
keepalive = KeepAlive,
properties = Properties,
clientid = ClientId

View File

@ -19,10 +19,12 @@
-export([
init/1,
init/2,
init/3,
info/1,
info/2,
check/1,
check/2,
update/2
update/3
]).
-elvis([{elvis_style, no_if_expression, disable}]).
@ -30,8 +32,12 @@
-export_type([keepalive/0]).
-record(keepalive, {
interval :: pos_integer(),
statval :: non_neg_integer()
check_interval :: pos_integer(),
%% the received packets since last keepalive check
statval :: non_neg_integer(),
%% The number of idle intervals allowed before disconnecting the client.
idle_milliseconds = 0 :: non_neg_integer(),
max_idle_millisecond :: pos_integer()
}).
-opaque keepalive() :: #keepalive{}.
@ -39,7 +45,11 @@
%% @doc Init keepalive.
-spec init(Interval :: non_neg_integer()) -> keepalive().
init(Interval) -> init(0, Interval).
init(Interval) -> init(default, 0, Interval).
init(Zone, Interval) ->
RecvCnt = emqx_pd:get_counter(recv_pkt),
init(Zone, RecvCnt, Interval).
%% from mqtt-v3.1.1 specific
%% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.
@ -53,42 +63,88 @@ init(Interval) -> init(0, Interval).
%% typically this is a few minutes.
%% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds.
%% @doc Init keepalive.
-spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined.
init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL ->
#keepalive{interval = Interval, statval = StatVal};
init(_, 0) ->
-spec init(
Zone :: atom(),
StatVal :: non_neg_integer(),
Second :: non_neg_integer()
) -> keepalive() | undefined.
init(Zone, StatVal, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL ->
#{keepalive_multiplier := Mul, keepalive_check_interval := CheckInterval} =
emqx_config:get_zone_conf(Zone, [mqtt]),
MilliSeconds = timer:seconds(Second),
Interval = emqx_utils:clamp(CheckInterval, 1000, max(MilliSeconds div 2, 1000)),
MaxIdleMs = ceil(MilliSeconds * Mul),
#keepalive{
check_interval = Interval,
statval = StatVal,
idle_milliseconds = 0,
max_idle_millisecond = MaxIdleMs
};
init(_Zone, _, 0) ->
undefined;
init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL).
init(Zone, StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(Zone, StatVal, ?MAX_INTERVAL).
%% @doc Get Info of the keepalive.
-spec info(keepalive()) -> emqx_types:infos().
info(#keepalive{
interval = Interval,
statval = StatVal
check_interval = Interval,
statval = StatVal,
idle_milliseconds = IdleIntervals,
max_idle_millisecond = MaxMs
}) ->
#{
interval => Interval,
statval => StatVal
check_interval => Interval,
statval => StatVal,
idle_milliseconds => IdleIntervals,
max_idle_millisecond => MaxMs
}.
-spec info(interval | statval, keepalive()) ->
-spec info(check_interval | statval | idle_milliseconds, keepalive()) ->
non_neg_integer().
info(interval, #keepalive{interval = Interval}) ->
info(check_interval, #keepalive{check_interval = Interval}) ->
Interval;
info(statval, #keepalive{statval = StatVal}) ->
StatVal;
info(interval, undefined) ->
info(idle_milliseconds, #keepalive{idle_milliseconds = Val}) ->
Val;
info(check_interval, undefined) ->
0.
check(Keepalive = #keepalive{}) ->
RecvCnt = emqx_pd:get_counter(recv_pkt),
check(RecvCnt, Keepalive);
check(Keepalive) ->
{ok, Keepalive}.
%% @doc Check keepalive.
-spec check(non_neg_integer(), keepalive()) ->
{ok, keepalive()} | {error, timeout}.
check(Val, #keepalive{statval = Val}) -> {error, timeout};
check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}.
check(
NewVal,
#keepalive{
statval = NewVal,
idle_milliseconds = IdleAcc,
check_interval = Interval,
max_idle_millisecond = Max
}
) when IdleAcc + Interval >= Max ->
{error, timeout};
check(
NewVal,
#keepalive{
statval = NewVal,
idle_milliseconds = IdleAcc,
check_interval = Interval
} = KeepAlive
) ->
{ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = IdleAcc + Interval}};
check(NewVal, #keepalive{} = KeepAlive) ->
{ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = 0}}.
%% @doc Update keepalive.
%% The statval of the previous keepalive will be used,
%% and normal checks will begin from the next cycle.
-spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined.
update(Interval, undefined) -> init(0, Interval);
update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval).
-spec update(atom(), non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined.
update(Zone, Interval, undefined) -> init(Zone, 0, Interval);
update(Zone, Interval, #keepalive{statval = StatVal}) -> init(Zone, StatVal, Interval).

View File

@ -3491,6 +3491,7 @@ mqtt_general() ->
)},
{"max_clientid_len",
sc(
%% MQTT-v3.1.1-[MQTT-3.1.3-5], MQTT-v5.0-[MQTT-3.1.3-5]
range(23, 65535),
#{
default => 65535,
@ -3612,9 +3613,17 @@ mqtt_general() ->
desc => ?DESC(mqtt_keepalive_multiplier)
}
)},
{"keepalive_check_interval",
sc(
timeout_duration(),
#{
default => <<"30s">>,
desc => ?DESC(mqtt_keepalive_check_interval)
}
)},
{"retry_interval",
sc(
duration(),
timeout_duration(),
#{
default => <<"30s">>,
desc => ?DESC(mqtt_retry_interval)

View File

@ -555,8 +555,7 @@ handle_info(Info, State) ->
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
shutdown(idle_timeout, State);
handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
RecvOct = emqx_pd:get_counter(recv_oct),
handle_timeout(TRef, {keepalive, RecvOct}, State);
with_channel(handle_timeout, [TRef, keepalive], State);
handle_timeout(
TRef,
emit_stats,

View File

@ -428,6 +428,7 @@ zone_global_defaults() ->
ignore_loop_deliver => false,
keepalive_backoff => 0.75,
keepalive_multiplier => 1.5,
keepalive_check_interval => 30000,
max_awaiting_rel => 100,
max_clientid_len => 65535,
max_inflight => 32,

View File

@ -64,7 +64,10 @@ groups() ->
t_malformed_connect_header,
t_malformed_connect_data,
t_reserved_connect_flag,
t_invalid_clientid
t_invalid_clientid,
t_undefined_password,
t_invalid_will_retain,
t_invalid_will_qos
]},
{connack, [parallel], [
t_serialize_parse_connack,
@ -738,6 +741,56 @@ t_undefined_password(_) ->
),
ok.
t_invalid_will_retain(_) ->
ConnectFlags = <<2#01100000>>,
ConnectBin =
<<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55,
122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84,
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
?assertException(
throw,
{frame_parse_error, invalid_will_retain},
emqx_frame:parse(ConnectBin)
),
ok.
t_invalid_will_qos(_) ->
Will_F_WillQoS0 = <<2#010:3, 2#00:2, 2#000:3>>,
Will_F_WillQoS1 = <<2#010:3, 2#01:2, 2#000:3>>,
Will_F_WillQoS2 = <<2#010:3, 2#10:2, 2#000:3>>,
Will_F_WillQoS3 = <<2#010:3, 2#11:2, 2#000:3>>,
Will_T_WillQoS3 = <<2#011:3, 2#11:2, 2#000:3>>,
ConnectBinFun = fun(ConnectFlags) ->
<<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55,
122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84,
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>
end,
?assertMatch(
{ok, _, _, _},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS0))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
),
ok.
parse_serialize(Packet) ->
parse_serialize(Packet, #{strict_mode => true}).

View File

@ -19,22 +19,180 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"listeners {"
"tcp.default.bind = 1883,"
"ssl.default = marked_for_deletion,"
"quic.default = marked_for_deletion,"
"ws.default = marked_for_deletion,"
"wss.default = marked_for_deletion"
"}"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
t_check_keepalive_default_timeout(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000),
erlang:process_flag(trap_exit, true),
ClientID = <<"default">>,
KeepaliveSec = 10,
{ok, C} = emqtt:start_link([
{keepalive, KeepaliveSec},
{clientid, binary_to_list(ClientID)}
]),
{ok, _} = emqtt:connect(C),
emqtt:pause(C),
[ChannelPid] = emqx_cm:lookup_channels(ClientID),
erlang:link(ChannelPid),
CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
?assertMatch(5000, CheckInterval),
%% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5
%% connect T0(packet = 1, idle_milliseconds = 0)
%% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000)
%% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000)
%% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout
Timeout = CheckInterval * 3,
%% connector but not send a packet.
?assertMatch(
no_keepalive_timeout_received,
receive_msg_in_time(ChannelPid, C, Timeout - 200),
Timeout - 200
),
?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)).
t_check_keepalive_other_timeout(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 2000),
erlang:process_flag(trap_exit, true),
ClientID = <<"other">>,
KeepaliveSec = 10,
{ok, C} = emqtt:start_link([
{keepalive, KeepaliveSec},
{clientid, binary_to_list(ClientID)}
]),
{ok, _} = emqtt:connect(C),
emqtt:pause(C),
{ok, _, [0]} = emqtt:subscribe(C, <<"mytopic">>, []),
[ChannelPid] = emqx_cm:lookup_channels(ClientID),
erlang:link(ChannelPid),
%%CheckInterval = ceil(keepalive_check_factor() * KeepaliveSec * 1000),
CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
?assertMatch(2000, CheckInterval),
%% when keepalive_check_interval is 2s and keepalive_multiplier is 1.5
%% connect T0(packet = 1, idle_milliseconds = 0)
%% subscribe T1(packet = 2, idle_milliseconds = 0)
%% check1 T2(packet = 2, idle_milliseconds = 1 * CheckInterval = 2000)
%% check2 T3(packet = 2, idle_milliseconds = 2 * CheckInterval = 4000)
%% check3 T4(packet = 2, idle_milliseconds = 3 * CheckInterval = 6000)
%% check4 T5(packet = 2, idle_milliseconds = 4 * CheckInterval = 8000)
%% check4 T6(packet = 2, idle_milliseconds = 5 * CheckInterval = 10000)
%% check4 T7(packet = 2, idle_milliseconds = 6 * CheckInterval = 12000)
%% check4 T8(packet = 2, idle_milliseconds = 7 * CheckInterval = 14000)
%% check4 T9(packet = 2, idle_milliseconds = 8 * CheckInterval = 16000) > 15000 timeout
Timeout = CheckInterval * 9,
?assertMatch(
no_keepalive_timeout_received,
receive_msg_in_time(ChannelPid, C, Timeout - 200),
Timeout - 200
),
?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200), Timeout).
t_check_keepalive_ping_reset_timer(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 100000),
erlang:process_flag(trap_exit, true),
ClientID = <<"ping_reset">>,
KeepaliveSec = 10,
{ok, C} = emqtt:start_link([
{keepalive, KeepaliveSec},
{clientid, binary_to_list(ClientID)}
]),
{ok, _} = emqtt:connect(C),
emqtt:pause(C),
ct:sleep(1000),
emqtt:resume(C),
pong = emqtt:ping(C),
emqtt:pause(C),
[ChannelPid] = emqx_cm:lookup_channels(ClientID),
erlang:link(ChannelPid),
CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
?assertMatch(5000, CheckInterval),
%% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5
%% connect T0(packet = 1, idle_milliseconds = 0)
%% sleep 1000ms
%% ping (packet = 2, idle_milliseconds = 0) restart timer
%% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000)
%% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000)
%% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout
Timeout = CheckInterval * 3,
?assertMatch(
no_keepalive_timeout_received,
receive_msg_in_time(ChannelPid, C, Timeout - 200),
Timeout - 200
),
?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)).
t_check(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000),
Keepalive = emqx_keepalive:init(60),
?assertEqual(60, emqx_keepalive:info(interval, Keepalive)),
?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive)),
?assertEqual(0, emqx_keepalive:info(statval, Keepalive)),
Info = emqx_keepalive:info(Keepalive),
?assertEqual(
#{
interval => 60,
statval => 0
check_interval => 30000,
statval => 0,
idle_milliseconds => 0,
%% 60 * 1.5 * 1000
max_idle_millisecond => 90000
},
Info
),
{ok, Keepalive1} = emqx_keepalive:check(1, Keepalive),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)).
{ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
{ok, Keepalive3} = emqx_keepalive:check(1, Keepalive2),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive3)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive3)),
Keepalive4 = emqx_keepalive:init(90),
?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive4)),
Keepalive5 = emqx_keepalive:init(1),
?assertEqual(1000, emqx_keepalive:info(check_interval, Keepalive5)),
ok.
keepalive_multiplier() ->
emqx_config:get_zone_conf(default, [mqtt, keepalive_multiplier]).
keepalive_check_interval() ->
emqx_config:get_zone_conf(default, [mqtt, keepalive_check_interval]).
receive_msg_in_time(ChannelPid, C, Timeout) ->
receive
{'EXIT', ChannelPid, {shutdown, keepalive_timeout}} ->
receive
{'EXIT', C, {shutdown, tcp_closed}} ->
ok
after 500 ->
throw(no_tcp_closed_from_mqtt_client)
end
after Timeout ->
no_keepalive_timeout_received
end.

View File

@ -945,6 +945,7 @@ t_on_get_status(Config, Opts) ->
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
FailureStatus = maps:get(failure_status, Opts, disconnected),
NormalStatus = maps:get(normal_status, Opts, connected),
?assertMatch({ok, _}, create_bridge(Config)),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
@ -952,7 +953,7 @@ t_on_get_status(Config, Opts) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId))
),
case ProxyHost of
undefined ->
@ -971,7 +972,7 @@ t_on_get_status(Config, Opts) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId))
)
end,
ok.

View File

@ -1448,7 +1448,10 @@ t_connection_down_before_starting(Config) ->
),
{ok, _} = create_bridge(Config),
{ok, _} = snabbkaffe:receive_events(SRef0),
?assertMatch({ok, connecting}, health_check(Config)),
?assertMatch(
{ok, Status} when Status =:= connecting orelse Status =:= disconnected,
health_check(Config)
),
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort),
?retry(

View File

@ -363,7 +363,7 @@ do_start_client(
{error, Reason}
end.
grpc_config() ->
grpc_opts() ->
#{
sync_start => true,
connect_timeout => ?CONNECT_TIMEOUT
@ -382,7 +382,7 @@ client_config(
{pool, InstId},
{pool_type, random},
{auto_reconnect, ?AUTO_RECONNECT_S},
{gprc_options, grpc_config()}
{grpc_opts, grpc_opts()}
] ++ protocol_config(Config).
protocol_config(

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, redis).
-define(BRIDGE_TYPE_BIN, <<"redis">>).
@ -46,6 +47,7 @@ matrix_testcases() ->
t_start_stop,
t_create_via_http,
t_on_get_status,
t_on_get_status_no_username_pass,
t_sync_query,
t_map_to_redis_hset_args
].
@ -325,6 +327,43 @@ t_on_get_status(Config) when is_list(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok.
t_on_get_status_no_username_pass(matrix) ->
{on_get_status, [
[single, tcp],
[cluster, tcp],
[sentinel, tcp]
]};
t_on_get_status_no_username_pass(Config0) when is_list(Config0) ->
ConnectorConfig0 = ?config(connector_config, Config0),
ConnectorConfig1 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"password">>], ConnectorConfig0, <<"">>
),
ConnectorConfig2 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"username">>], ConnectorConfig1, <<"">>
),
Config1 = proplists:delete(connector_config, Config0),
Config2 = [{connector_config, ConnectorConfig2} | Config1],
?check_trace(
emqx_bridge_v2_testlib:t_on_get_status(
Config2,
#{
failure_status => disconnected,
normal_status => disconnected
}
),
fun(ok, Trace) ->
case ?config(redis_type, Config2) of
single ->
?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace));
sentinel ->
?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace));
cluster ->
ok
end
end
),
ok.
t_sync_query(matrix) ->
{sync_query, [
[single, tcp],

View File

@ -146,29 +146,22 @@ on_stop(InstId, _State = #{pool_name := PoolName}) ->
on_get_status(_InstId, State = #{client_config := Config}) ->
case emqx_s3_client:aws_config(Config) of
{error, Reason} ->
{?status_disconnected, State, Reason};
{?status_disconnected, State, map_error_details(Reason)};
AWSConfig ->
try erlcloud_s3:list_buckets(AWSConfig) of
Props when is_list(Props) ->
?status_connected
catch
error:{aws_error, {http_error, _Code, _, Reason}} ->
{?status_disconnected, State, Reason};
error:{aws_error, {socket_error, Reason}} ->
{?status_disconnected, State, Reason}
error:Error ->
{?status_disconnected, State, map_error_details(Error)}
end
end.
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
{ok, state()} | {error, _Reason}.
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
try
ChannelState = start_channel(State, Config),
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}
catch
throw:Reason ->
{error, Reason}
end.
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
{ok, state()}.
@ -217,7 +210,8 @@ start_channel(State, #{
max_records := MaxRecords
},
container := Container,
bucket := Bucket
bucket := Bucket,
key := Key
}
}) ->
AggregId = {Type, Name},
@ -226,7 +220,7 @@ start_channel(State, #{
max_records => MaxRecords,
work_dir => work_dir(Type, Name)
},
Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)),
Template = emqx_bridge_s3_upload:mk_key_template(Key),
DeliveryOpts = #{
bucket => Bucket,
key => Template,
@ -253,11 +247,6 @@ start_channel(State, #{
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
}.
ensure_ok({ok, V}) ->
V;
ensure_ok({error, Reason}) ->
throw(Reason).
upload_options(Parameters) ->
#{acl => maps:get(acl, Parameters, undefined)}.
@ -285,7 +274,7 @@ channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, S
check_bucket_accessible(Bucket, #{client_config := Config}) ->
case emqx_s3_client:aws_config(Config) of
{error, Reason} ->
throw({unhealthy_target, Reason});
throw({unhealthy_target, map_error_details(Reason)});
AWSConfig ->
try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of
Props when is_list(Props) ->
@ -293,8 +282,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
catch
error:{aws_error, {http_error, 404, _, _Reason}} ->
throw({unhealthy_target, "Bucket does not exist"});
error:{aws_error, {socket_error, Reason}} ->
throw({unhealthy_target, emqx_utils:format(Reason)})
error:Error ->
throw({unhealthy_target, map_error_details(Error)})
end
end.
@ -304,8 +293,7 @@ check_aggreg_upload_errors(AggregId) ->
%% TODO
%% This approach means that, for example, 3 upload failures will cause
%% the channel to be marked as unhealthy for 3 consecutive health checks.
ErrorMessage = emqx_utils:format(Error),
throw({unhealthy_target, ErrorMessage});
throw({unhealthy_target, map_error_details(Error)});
[] ->
ok
end.
@ -384,16 +372,38 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
ok;
{error, Reason} ->
{error, {unrecoverable_error, Reason}}
{error, {unrecoverable_error, emqx_utils:explain_posix(Reason)}}
end.
map_error({socket_error, _} = Reason) ->
{recoverable_error, Reason};
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
map_error(Error) ->
{map_error_class(Error), map_error_details(Error)}.
map_error_class({s3_error, _, _}) ->
unrecoverable_error;
map_error_class({aws_error, Error}) ->
map_error_class(Error);
map_error_class({socket_error, _}) ->
recoverable_error;
map_error_class({http_error, Status, _, _}) when Status >= 500 ->
%% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
{recoverable_error, Reason};
map_error(Reason) ->
{unrecoverable_error, Reason}.
recoverable_error;
map_error_class(_Error) ->
unrecoverable_error.
map_error_details({s3_error, Code, Message}) ->
emqx_utils:format("S3 error: ~s ~s", [Code, Message]);
map_error_details({aws_error, Error}) ->
map_error_details(Error);
map_error_details({socket_error, Reason}) ->
emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]);
map_error_details({http_error, _, _, _} = Error) ->
emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]);
map_error_details({failed_to_obtain_credentials, Error}) ->
emqx_utils:format("Unable to obtain AWS credentials: ~s", [map_error_details(Error)]);
map_error_details({upload_failed, Error}) ->
map_error_details(Error);
map_error_details(Error) ->
Error.
render_bucket(Template, Data) ->
case emqx_template:render(Template, {emqx_jsonish, Data}) of
@ -416,6 +426,32 @@ render_content(Template, Data) ->
iolist_to_string(IOList) ->
unicode:characters_to_list(IOList).
%%
-include_lib("xmerl/include/xmerl.hrl").
-spec map_aws_error_details(_AWSError) ->
unicode:chardata().
map_aws_error_details({http_error, _Status, _, Body}) ->
try xmerl_scan:string(unicode:characters_to_list(Body), [{quiet, true}]) of
{Error = #xmlElement{name = 'Error'}, _} ->
map_aws_error_details(Error);
_ ->
Body
catch
exit:_ ->
Body
end;
map_aws_error_details(#xmlElement{content = Content}) ->
Code = extract_xml_text(lists:keyfind('Code', #xmlElement.name, Content)),
Message = extract_xml_text(lists:keyfind('Message', #xmlElement.name, Content)),
[Code, $:, $\s | Message].
extract_xml_text(#xmlElement{content = Content}) ->
[Fragment || #xmlText{value = Fragment} <- Content];
extract_xml_text(false) ->
[].
%% `emqx_connector_aggreg_delivery` APIs
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().

View File

@ -29,7 +29,10 @@
]).
%% Internal exports
-export([convert_actions/2]).
-export([
convert_actions/2,
validate_key_template/1
]).
-define(DEFAULT_AGGREG_BATCH_SIZE, 100).
-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) ->
)}
],
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
{key, #{
desc => ?DESC(s3_aggregated_upload_key),
validator => fun ?MODULE:validate_key_template/1
}}
]),
emqx_s3_schema:fields(s3_uploader)
]);
@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou
Conf#{<<"resource_opts">> := NResourceOpts}
end.
%% Interpreting options
-spec mk_key_template(_Parameters :: map()) ->
{ok, emqx_template:str()} | {error, _Reason}.
mk_key_template(#{key := Key}) ->
Template = emqx_template:parse(Key),
validate_key_template(Conf) ->
Template = emqx_template:parse(Conf),
case validate_bindings(emqx_template:placeholders(Template)) of
UsedBindings when is_list(UsedBindings) ->
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
{ok, Template};
false ->
{ok, Template ++ SuffixTemplate}
end;
Error = {error, _} ->
Error
Bindings when is_list(Bindings) ->
ok;
{error, {disallowed_placeholders, Disallowed}} ->
{error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])}
end.
validate_bindings(Bindings) ->
@ -276,7 +272,22 @@ validate_bindings(Bindings) ->
[] ->
Bindings;
Disallowed ->
{error, {invalid_key_template, {disallowed_placeholders, Disallowed}}}
{error, {disallowed_placeholders, Disallowed}}
end.
%% Interpreting options
-spec mk_key_template(unicode:chardata()) ->
emqx_template:str().
mk_key_template(Key) ->
Template = emqx_template:parse(Key),
UsedBindings = emqx_template:placeholders(Template),
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
Template;
false ->
Template ++ SuffixTemplate
end.
mk_suffix_template(UsedBindings) ->

View File

@ -134,6 +134,22 @@ action_config(Name, ConnectorId) ->
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
t_create_unavailable_credentials(Config) ->
ConnectorName = ?config(connector_name, Config),
ConnectorType = ?config(connector_type, Config),
ConnectorConfig = maps:without(
[<<"access_key_id">>, <<"secret_access_key">>],
?config(connector_config, Config)
),
?assertMatch(
{ok,
{{_HTTP, 201, _}, _, #{
<<"status_reason">> :=
<<"Unable to obtain AWS credentials:", _/bytes>>
}}},
emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig)
).
t_ignore_batch_opts(Config) ->
{ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
?assertMatch(
@ -159,6 +175,13 @@ t_start_broken_update_restart(Config) ->
_Attempts = 20,
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
),
?assertMatch(
{ok,
{{_HTTP, 200, _}, _, #{
<<"status_reason">> := <<"AWS error: SignatureDoesNotMatch:", _/bytes>>
}}},
emqx_bridge_v2_testlib:get_connector_api(Type, Name)
),
?assertMatch(
{ok, {{_HTTP, 200, _}, _, _}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)

View File

@ -177,6 +177,27 @@ t_create_invalid_config(Config) ->
)
).
t_create_invalid_config_key_template(Config) ->
?assertMatch(
{error,
{_Status, _, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>,
<<"path">> := <<"root.parameters.key">>
}
}}},
emqx_bridge_v2_testlib:create_bridge_api(
Config,
_Overrides = #{
<<"parameters">> => #{
<<"key">> => <<"${action}/${foo}:${bar.rfc3339}">>
}
}
)
).
t_update_invalid_config(Config) ->
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
?assertMatch(

View File

@ -163,8 +163,13 @@ dump_schema(Dir, SchemaModule) ->
),
emqx_dashboard:save_dispatch_eterm(SchemaModule).
load(emqx_enterprise_schema, emqx_telemetry) -> ignore;
load(_, Lib) -> ok = application:load(Lib).
load(emqx_enterprise_schema, emqx_telemetry) ->
ignore;
load(_, Lib) ->
case application:load(Lib) of
ok -> ok;
{error, {already_loaded, _}} -> ok
end.
%% for scripts/spellcheck.
gen_schema_json(Dir, SchemaModule, Lang) ->

View File

@ -74,13 +74,14 @@ end_per_testcase(_Config) ->
t_base_test(_Config) ->
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
Pid = self(),
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
Msg = ?FUNCTION_NAME,
MFA = {M, F, A} = {?MODULE, echo, [Pid, Msg]},
{ok, TnxId, ok} = multicall(M, F, A),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFA, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual(ok, receive_msg(3, Msg)),
?assertEqual({ok, 2, ok}, multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of
@ -118,9 +119,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
Msg = ?FUNCTION_NAME,
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual(ok, receive_msg(3, Msg)),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = multicall(M, F, A, 1, 1000),
@ -154,9 +156,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
t_commit_concurrency(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
Msg = ?FUNCTION_NAME,
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
?assertEqual({ok, 1, ok}, multicall(BaseM, BaseF, BaseA)),
?assertEqual(ok, receive_msg(3, Msg)),
%% call concurrently without stale tnx_id error
Workers = lists:seq(1, 256),
@ -231,23 +234,24 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = multicall(io, format, ["test"], 1, 1000),
{ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
ct:sleep(1000),
{atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)),
Pid = self(),
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
Msg = ?FUNCTION_NAME,
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
{ok, TnxId, ok} = multicall(M1, F1, A1),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFAEcho, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual(ok, receive_msg(3, Msg)),
ok.
t_del_stale_mfa(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {io, format, ["test"]},
MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]},
Keys = lists:seq(1, 50),
Keys2 = lists:seq(51, 150),
Ids =
@ -288,7 +292,7 @@ t_del_stale_mfa(_Config) ->
t_skip_failed_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -308,7 +312,7 @@ t_skip_failed_commit(_Config) ->
t_fast_forward_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -356,7 +360,11 @@ tnx_ids(Status) ->
start() ->
{ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
{ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
ok = emqx_cluster_rpc:reset(),
%% Ensure all processes are idle status.
ok = gen_server:call(?NODE2, test),
ok = gen_server:call(?NODE3, test),
ok.
stop() ->
@ -366,6 +374,7 @@ stop() ->
undefined ->
ok;
P ->
erlang:unregister(N),
erlang:unlink(P),
erlang:exit(P, kill)
end
@ -379,8 +388,9 @@ receive_msg(Count, Msg) when Count > 0 ->
receive
Msg ->
receive_msg(Count - 1, Msg)
after 1000 ->
timeout
after 1300 ->
Msg = iolist_to_binary(io_lib:format("There's still ~w messages to be received", [Count])),
{Msg, flush_msg([])}
end.
echo(Pid, Msg) ->
@ -425,3 +435,11 @@ multicall(M, F, A, N, T) ->
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).
flush_msg(Acc) ->
receive
Msg ->
flush_msg([Msg | Acc])
after 10 ->
Acc
end.

View File

@ -85,7 +85,7 @@
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
-define(DEF_IDLE_TIME, timer:seconds(30)).
-define(DEF_IDLE_SECONDS, 30).
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
@ -149,7 +149,7 @@ init(
mountpoint => Mountpoint
}
),
Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME),
Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_SECONDS),
#channel{
ctx = Ctx,
conninfo = ConnInfo,
@ -378,7 +378,7 @@ ensure_keepalive_timer(Channel) ->
ensure_keepalive_timer(fun ensure_timer/4, Channel).
ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
Heartbeat = emqx_keepalive:info(interval, KeepAlive),
Heartbeat = emqx_keepalive:info(check_interval, KeepAlive),
Fun(keepalive, Heartbeat, keepalive, Channel).
check_auth_state(Msg, #channel{connection_required = false} = Channel) ->
@ -495,7 +495,7 @@ enrich_conninfo(
) ->
case Queries of
#{<<"clientid">> := ClientId} ->
Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
Interval = emqx_keepalive:info(check_interval, KeepAlive),
NConnInfo = ConnInfo#{
clientid => ClientId,
proto_name => <<"CoAP">>,

View File

@ -19,12 +19,6 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-type duration() :: non_neg_integer().
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-reflect_type([duration/0]).
%% config schema provides
-export([namespace/0, fields/1, desc/1]).
@ -34,7 +28,7 @@ fields(coap) ->
[
{heartbeat,
sc(
duration(),
emqx_schema:duration_s(),
#{
default => <<"30s">>,
desc => ?DESC(coap_heartbeat)

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_coap, [
{description, "CoAP Gateway"},
{vsn, "0.1.8"},
{vsn, "0.1.9"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},

View File

@ -100,7 +100,7 @@ init_per_testcase(t_heartbeat, Config) ->
OldConf = emqx:get_raw_config([gateway, coap]),
{ok, _} = emqx_gateway_conf:update_gateway(
coap,
OldConf#{<<"heartbeat">> => <<"800ms">>}
OldConf#{<<"heartbeat">> => <<"1s">>}
),
[
{old_conf, OldConf},
@ -216,8 +216,9 @@ t_heartbeat(Config) ->
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
timer:sleep(Heartbeat * 2),
%% The minimum timeout time is 1 second.
%% 1.5 * Heartbeat + 0.5 * Heartbeat(< 1s) = 1.5 * 1 + 1 = 2.5
timer:sleep(Heartbeat * 2 + 1000),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)

View File

@ -715,7 +715,7 @@ ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
Channel;
ensure_keepalive_timer(Interval, Channel) ->
StatVal = emqx_gateway_conn:keepalive_stats(recv),
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
Keepalive = emqx_keepalive:init(default, StatVal, Interval),
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
@ -746,7 +746,7 @@ interval(force_close_idle, #channel{conninfo = #{idle_timeout := IdleTimeout}})
interval(force_close, _) ->
15000;
interval(keepalive, #channel{keepalive = Keepalive}) ->
emqx_keepalive:info(interval, Keepalive).
emqx_keepalive:info(check_interval, Keepalive).
%%--------------------------------------------------------------------
%% Dispatch

View File

@ -506,7 +506,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
RetxIntv.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_jt808, [
{description, "JT/T 808 Gateway"},
{vsn, "0.0.3"},
{vsn, "0.1.0"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},

View File

@ -616,7 +616,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
RetxIntv.

View File

@ -430,7 +430,7 @@ ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(0, Channel) ->
Channel;
ensure_keepalive_timer(Interval, Channel) ->
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))),
Keepalive = emqx_keepalive:init(Interval),
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
%%--------------------------------------------------------------------
@ -2245,7 +2245,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_delivery, #channel{session = Session}) ->
emqx_mqttsn_session:info(retry_interval, Session);
interval(expire_awaiting_rel, #channel{session = Session}) ->

View File

@ -1109,7 +1109,7 @@ t_keepalive(_Config) ->
[Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
%% will reset to max keepalive if keepalive > max keepalive
#{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))),
?assertMatch({keepalive, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))),
{ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
#{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]),

View File

@ -1,6 +1,6 @@
{application, emqx_redis, [
{description, "EMQX Redis Database Connector"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,

View File

@ -19,6 +19,8 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]).
@ -231,7 +233,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) ->
is_unrecoverable_error(_) ->
false.
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) ->
case eredis_cluster:pool_exists(PoolName) of
true ->
%% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
@ -240,26 +242,51 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
%% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
case eredis_cluster_monitor:get_all_pools(PoolName) of
[] ->
disconnected;
?status_disconnected;
[_ | _] ->
Health = eredis_cluster:ping_all(PoolName),
status_result(Health)
do_cluster_status_check(PoolName, State)
end;
false ->
disconnected
?status_disconnected
end;
on_get_status(_InstId, #{pool_name := PoolName}) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
status_result(Health).
do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of
{ok, _} -> true;
_ -> false
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
HealthCheckResoults = emqx_resource_pool:health_check_workers(
PoolName,
fun ?MODULE:do_get_status/1,
emqx_resource_pool:health_check_timeout(),
#{return_values => true}
),
case HealthCheckResoults of
{ok, Results} ->
sum_worker_results(Results, State);
Error ->
{?status_disconnected, State, Error}
end.
status_result(_Status = true) -> connected;
status_result(_Status = false) -> connecting.
do_cluster_status_check(Pool, State) ->
Pongs = eredis_cluster:qa(Pool, [<<"PING">>]),
sum_worker_results(Pongs, State).
do_get_status(Conn) ->
eredis:q(Conn, ["PING"]).
sum_worker_results([], _State) ->
?status_connected;
sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) ->
?tp(emqx_redis_auth_required_error, #{}),
%% This requires user action to fix so we set the status to disconnected
{?status_disconnected, State, {unhealthy_target, Error}};
sum_worker_results([{ok, _} | Rest], State) ->
sum_worker_results(Rest, State);
sum_worker_results([Error | _Rest], State) ->
?SLOG(
warning,
#{
msg => "emqx_redis_check_status_error",
error => Error
}
),
{?status_connecting, State, Error}.
do_cmd(PoolName, cluster, {cmd, Command}) ->
eredis_cluster:q(PoolName, Command);

View File

@ -29,6 +29,9 @@
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
-define(TELEMETRY_PREFIX, emqx, resource).
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX),
{query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
).
-import(emqx_common_test_helpers, [on_exit/1]).
@ -2494,7 +2497,7 @@ t_expiration_retry(_Config) ->
resume_interval => 300
}
),
do_t_expiration_retry().
do_t_expiration_retry(#{is_batch => false}).
t_expiration_retry_batch(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
@ -2511,20 +2514,17 @@ t_expiration_retry_batch(_Config) ->
resume_interval => 300
}
),
do_t_expiration_retry().
do_t_expiration_retry(#{is_batch => true}).
do_t_expiration_retry() ->
do_t_expiration_retry(Context) ->
IsBatch = maps:get(is_batch, Context),
ResumeInterval = 300,
?check_trace(
#{timetrap => 10_000},
begin
ok = emqx_resource:simple_sync_query(?ID, block),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
1,
200
),
TimeoutMS = 100,
TimeoutMS = 200,
%% the request that expires must be first, so it's the
%% head of the inflight table (and retriable).
{ok, SRef1} = snabbkaffe:subscribe(
@ -2542,6 +2542,8 @@ do_t_expiration_retry() ->
)
)
end),
%% This second message must be enqueued while the resource is blocked by the
%% previous message.
Pid1 =
spawn_link(fun() ->
receive
@ -2556,22 +2558,33 @@ do_t_expiration_retry() ->
)
)
end),
?tp("waiting for first message to be appended to the queue", #{}),
{ok, _} = snabbkaffe:receive_events(SRef1),
?tp("waiting for first message to expire during blocked retries", #{}),
{ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_expired}),
%% Now we wait until the worker tries the second message at least once before
%% unblocking it.
Pid1 ! go,
{ok, _} = snabbkaffe:receive_events(SRef0),
?tp("waiting for second message to be retried and be nacked while blocked", #{}),
case IsBatch of
false ->
{ok, _} = ?block_until(#{
?snk_kind := buffer_worker_flush_nack,
batch_or_query := ?QUERY(_, {inc_counter, 2}, _, _, _)
});
true ->
{ok, _} = ?block_until(#{
?snk_kind := buffer_worker_flush_nack,
batch_or_query := [?QUERY(_, {inc_counter, 2}, _, _, _) | _]
})
end,
{ok, _} =
?block_until(
#{?snk_kind := buffer_worker_retry_expired},
ResumeInterval * 10
),
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 5
),
%% Bypass the buffer worker and unblock the resource.
ok = emqx_resource:simple_sync_query(?ID, resume),
?tp("waiting for second message to be retried and be acked, unblocking", #{}),
{ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_inflight_succeeded}),
ok
end,

View File

@ -65,6 +65,7 @@
flattermap/2,
tcp_keepalive_opts/4,
format/1,
format/2,
call_first_defined/1,
ntoa/1,
foldl_while/3,
@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
format(Term) ->
iolist_to_binary(io_lib:format("~0p", [Term])).
format(Fmt, Args) ->
unicode:characters_to_binary(io_lib:format(Fmt, Args)).
-spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return().
call_first_defined([{Module, Function, Args} | Rest]) ->
try

View File

@ -0,0 +1,5 @@
Fix the flags check and error handling related to the Will message in the `CONNECT` packet.
See also:
- MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
- MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
- MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]

View File

@ -0,0 +1,7 @@
Upgrade ekka lib to 0.19.5
ekka 0.19.5 uses mria 0.8.8 that improves auto-heal functionality.
Previously, the auto-heal worked only when all core nodes were reachable again.
This update allows to apply auto-heal once the majority of core nodes are alive.
[Mria PR](https://github.com/emqx/mria/pull/180)

View File

@ -0,0 +1,4 @@
When an S3 Bridge is improperly configured, error messages now contain more informative and easy to read details.
## Breaking changes
* S3 Bridge configuration with invalid aggregated upload key template will no longer work. Before this change, such configuration was considered valid but the bridge would never work anyway.

View File

@ -0,0 +1 @@
Improved error handling for Redis connectors. Previously, Redis connectors of type single or sentinel would always encounter a timeout error during the connector test in the dashboard if no username or password was provided. This update ensures that users now receive an informative error message in such scenarios. Additionally, more detailed error information has been added for all Redis connector types to enhance diagnostics and troubleshooting.

View File

@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.11.2", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true},
{:ekka, github: "emqx/ekka", tag: "0.19.3", override: true},
{:ekka, github: "emqx/ekka", tag: "0.19.5", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
{:minirest, github: "emqx/minirest", tag: "1.4.3", override: true},

View File

@ -83,7 +83,7 @@
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
{minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.3"}}},

View File

@ -855,6 +855,15 @@ The default value 1.5 is following the MQTT 5.0 specification. This multiplier i
mqtt_keepalive_multiplier.label:
"""Keep Alive Multiplier"""
mqtt_keepalive_check_interval.desc:
"""The frequency of checking for incoming MQTT packets determines how often the server will check for new MQTT packets.
If a certain amount of time passes without any packets being sent from the client, this time will be added up.
Once the accumulated time exceeds `keepalive-interval * keepalive-multiplier`, the connection will be terminated.
The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of `keepalive-interval / 2`."""
mqtt_keepalive_check_interval.label:
"""Keep Alive Check Interval"""
force_gc_bytes.desc:
"""GC the process after specified number of bytes have passed through."""