Merge pull request #13346 from zmstone/0626-merge-release-572-to-release-57

0626 merge release 572 to release 57
This commit is contained in:
zmstone 2024-06-27 11:07:44 +02:00 committed by GitHub
commit 852a135040
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 715 additions and 195 deletions

View File

@ -52,9 +52,13 @@ jobs:
run: | run: |
CID=$(docker run -d --rm -P $_EMQX_DOCKER_IMAGE_TAG) CID=$(docker run -d --rm -P $_EMQX_DOCKER_IMAGE_TAG)
HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID) HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID)
./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT ./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT || {
docker logs $CID
exit 1
}
docker stop $CID docker stop $CID
- name: export docker image - name: export docker image
if: always()
run: | run: |
docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1

View File

@ -28,7 +28,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},

View File

@ -2,7 +2,7 @@
{application, emqx, [ {application, emqx, [
{id, "emqx"}, {id, "emqx"},
{description, "EMQX Core"}, {description, "EMQX Core"},
{vsn, "5.3.2"}, {vsn, "5.3.3"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ {applications, [

View File

@ -544,8 +544,10 @@ handle_in(
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel) handle_out(disconnect, ReasonCode, Channel)
end; end;
handle_in(?PACKET(?PINGREQ), Channel) -> handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
{ok, ?PACKET(?PINGRESP), Channel}; {ok, NKeepalive} = emqx_keepalive:check(Keepalive),
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)};
handle_in( handle_in(
?DISCONNECT_PACKET(ReasonCode, Properties), ?DISCONNECT_PACKET(ReasonCode, Properties),
Channel = #channel{conninfo = ConnInfo} Channel = #channel{conninfo = ConnInfo}
@ -1229,11 +1231,12 @@ handle_call(
{keepalive, Interval}, {keepalive, Interval},
Channel = #channel{ Channel = #channel{
keepalive = KeepAlive, keepalive = KeepAlive,
conninfo = ConnInfo conninfo = ConnInfo,
clientinfo = #{zone := Zone}
} }
) -> ) ->
ClientId = info(clientid, Channel), ClientId = info(clientid, Channel),
NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive), NKeepalive = emqx_keepalive:update(Zone, Interval, KeepAlive),
NConnInfo = maps:put(keepalive, Interval, ConnInfo), NConnInfo = maps:put(keepalive, Interval, ConnInfo),
NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo}, NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
@ -1333,22 +1336,22 @@ die_if_test_compiled() ->
| {shutdown, Reason :: term(), channel()}. | {shutdown, Reason :: term(), channel()}.
handle_timeout( handle_timeout(
_TRef, _TRef,
{keepalive, _StatVal}, keepalive,
Channel = #channel{keepalive = undefined} Channel = #channel{keepalive = undefined}
) -> ) ->
{ok, Channel}; {ok, Channel};
handle_timeout( handle_timeout(
_TRef, _TRef,
{keepalive, _StatVal}, keepalive,
Channel = #channel{conn_state = disconnected} Channel = #channel{conn_state = disconnected}
) -> ) ->
{ok, Channel}; {ok, Channel};
handle_timeout( handle_timeout(
_TRef, _TRef,
{keepalive, StatVal}, keepalive,
Channel = #channel{keepalive = Keepalive} Channel = #channel{keepalive = Keepalive}
) -> ) ->
case emqx_keepalive:check(StatVal, Keepalive) of case emqx_keepalive:check(Keepalive) of
{ok, NKeepalive} -> {ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive}, NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(keepalive, NChannel)}; {ok, reset_timer(keepalive, NChannel)};
@ -1459,10 +1462,16 @@ reset_timer(Name, Time, Channel) ->
ensure_timer(Name, Time, clean_timer(Name, Channel)). ensure_timer(Name, Time, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) -> clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. case maps:take(Name, Timers) of
error ->
Channel;
{TRef, NTimers} ->
ok = emqx_utils:cancel_timer(TRef),
Channel#channel{timers = NTimers}
end.
interval(keepalive, #channel{keepalive = KeepAlive}) -> interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_delivery, #channel{session = Session}) -> interval(retry_delivery, #channel{session = Session}) ->
emqx_session:info(retry_interval, Session); emqx_session:info(retry_interval, Session);
interval(expire_awaiting_rel, #channel{session = Session}) -> interval(expire_awaiting_rel, #channel{session = Session}) ->
@ -2320,9 +2329,7 @@ ensure_keepalive_timer(0, Channel) ->
ensure_keepalive_timer(disabled, Channel) -> ensure_keepalive_timer(disabled, Channel) ->
Channel; Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Multiplier = get_mqtt_conf(Zone, keepalive_multiplier), Keepalive = emqx_keepalive:init(Zone, Interval),
RecvCnt = emqx_pd:get_counter(recv_pkt),
Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) -> clear_keepalive(Channel = #channel{timers = Timers}) ->

View File

@ -729,9 +729,7 @@ handle_timeout(
disconnected -> disconnected ->
{ok, State}; {ok, State};
_ -> _ ->
%% recv_pkt: valid MQTT message with_channel(handle_timeout, [TRef, keepalive], State)
RecvCnt = emqx_pd:get_counter(recv_pkt),
handle_timeout(TRef, {keepalive, RecvCnt}, State)
end; end;
handle_timeout(TRef, Msg, State) -> handle_timeout(TRef, Msg, State) ->
with_channel(handle_timeout, [TRef, Msg], State). with_channel(handle_timeout, [TRef, Msg], State).

View File

@ -287,14 +287,25 @@ parse_connect(FrameBin, StrictMode) ->
% Note: return malformed if reserved flag is not 0. % Note: return malformed if reserved flag is not 0.
parse_connect2( parse_connect2(
ProtoName, ProtoName,
<<BridgeTag:4, ProtoVer:4, UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2, WillFlag:1, <<BridgeTag:4, ProtoVer:4, UsernameFlag:1, PasswordFlag:1, WillRetainB:1, WillQoS:2,
CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>, WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
StrictMode StrictMode
) -> ) ->
case Reserved of case Reserved of
0 -> ok; 0 -> ok;
1 -> ?PARSE_ERR(reserved_connect_flag) 1 -> ?PARSE_ERR(reserved_connect_flag)
end, end,
WillFlag = bool(WillFlagB),
WillRetain = bool(WillRetainB),
case WillFlag of
%% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
false when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
true when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
false when WillRetain -> ?PARSE_ERR(invalid_will_retain);
_ -> ok
end,
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid), {ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
ConnPacket = #mqtt_packet_connect{ ConnPacket = #mqtt_packet_connect{
@ -304,9 +315,9 @@ parse_connect2(
%% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html %% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
is_bridge = (BridgeTag =:= 8), is_bridge = (BridgeTag =:= 8),
clean_start = bool(CleanStart), clean_start = bool(CleanStart),
will_flag = bool(WillFlag), will_flag = WillFlag,
will_qos = WillQoS, will_qos = WillQoS,
will_retain = bool(WillRetain), will_retain = WillRetain,
keepalive = KeepAlive, keepalive = KeepAlive,
properties = Properties, properties = Properties,
clientid = ClientId clientid = ClientId

View File

@ -19,10 +19,12 @@
-export([ -export([
init/1, init/1,
init/2, init/2,
init/3,
info/1, info/1,
info/2, info/2,
check/1,
check/2, check/2,
update/2 update/3
]). ]).
-elvis([{elvis_style, no_if_expression, disable}]). -elvis([{elvis_style, no_if_expression, disable}]).
@ -30,8 +32,12 @@
-export_type([keepalive/0]). -export_type([keepalive/0]).
-record(keepalive, { -record(keepalive, {
interval :: pos_integer(), check_interval :: pos_integer(),
statval :: non_neg_integer() %% the received packets since last keepalive check
statval :: non_neg_integer(),
%% The number of idle intervals allowed before disconnecting the client.
idle_milliseconds = 0 :: non_neg_integer(),
max_idle_millisecond :: pos_integer()
}). }).
-opaque keepalive() :: #keepalive{}. -opaque keepalive() :: #keepalive{}.
@ -39,7 +45,11 @@
%% @doc Init keepalive. %% @doc Init keepalive.
-spec init(Interval :: non_neg_integer()) -> keepalive(). -spec init(Interval :: non_neg_integer()) -> keepalive().
init(Interval) -> init(0, Interval). init(Interval) -> init(default, 0, Interval).
init(Zone, Interval) ->
RecvCnt = emqx_pd:get_counter(recv_pkt),
init(Zone, RecvCnt, Interval).
%% from mqtt-v3.1.1 specific %% from mqtt-v3.1.1 specific
%% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. %% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.
@ -53,42 +63,88 @@ init(Interval) -> init(0, Interval).
%% typically this is a few minutes. %% typically this is a few minutes.
%% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds. %% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds.
%% @doc Init keepalive. %% @doc Init keepalive.
-spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined. -spec init(
init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL -> Zone :: atom(),
#keepalive{interval = Interval, statval = StatVal}; StatVal :: non_neg_integer(),
init(_, 0) -> Second :: non_neg_integer()
) -> keepalive() | undefined.
init(Zone, StatVal, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL ->
#{keepalive_multiplier := Mul, keepalive_check_interval := CheckInterval} =
emqx_config:get_zone_conf(Zone, [mqtt]),
MilliSeconds = timer:seconds(Second),
Interval = emqx_utils:clamp(CheckInterval, 1000, max(MilliSeconds div 2, 1000)),
MaxIdleMs = ceil(MilliSeconds * Mul),
#keepalive{
check_interval = Interval,
statval = StatVal,
idle_milliseconds = 0,
max_idle_millisecond = MaxIdleMs
};
init(_Zone, _, 0) ->
undefined; undefined;
init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL). init(Zone, StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(Zone, StatVal, ?MAX_INTERVAL).
%% @doc Get Info of the keepalive. %% @doc Get Info of the keepalive.
-spec info(keepalive()) -> emqx_types:infos(). -spec info(keepalive()) -> emqx_types:infos().
info(#keepalive{ info(#keepalive{
interval = Interval, check_interval = Interval,
statval = StatVal statval = StatVal,
idle_milliseconds = IdleIntervals,
max_idle_millisecond = MaxMs
}) -> }) ->
#{ #{
interval => Interval, check_interval => Interval,
statval => StatVal statval => StatVal,
idle_milliseconds => IdleIntervals,
max_idle_millisecond => MaxMs
}. }.
-spec info(interval | statval, keepalive()) -> -spec info(check_interval | statval | idle_milliseconds, keepalive()) ->
non_neg_integer(). non_neg_integer().
info(interval, #keepalive{interval = Interval}) -> info(check_interval, #keepalive{check_interval = Interval}) ->
Interval; Interval;
info(statval, #keepalive{statval = StatVal}) -> info(statval, #keepalive{statval = StatVal}) ->
StatVal; StatVal;
info(interval, undefined) -> info(idle_milliseconds, #keepalive{idle_milliseconds = Val}) ->
Val;
info(check_interval, undefined) ->
0. 0.
check(Keepalive = #keepalive{}) ->
RecvCnt = emqx_pd:get_counter(recv_pkt),
check(RecvCnt, Keepalive);
check(Keepalive) ->
{ok, Keepalive}.
%% @doc Check keepalive. %% @doc Check keepalive.
-spec check(non_neg_integer(), keepalive()) -> -spec check(non_neg_integer(), keepalive()) ->
{ok, keepalive()} | {error, timeout}. {ok, keepalive()} | {error, timeout}.
check(Val, #keepalive{statval = Val}) -> {error, timeout};
check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}. check(
NewVal,
#keepalive{
statval = NewVal,
idle_milliseconds = IdleAcc,
check_interval = Interval,
max_idle_millisecond = Max
}
) when IdleAcc + Interval >= Max ->
{error, timeout};
check(
NewVal,
#keepalive{
statval = NewVal,
idle_milliseconds = IdleAcc,
check_interval = Interval
} = KeepAlive
) ->
{ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = IdleAcc + Interval}};
check(NewVal, #keepalive{} = KeepAlive) ->
{ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = 0}}.
%% @doc Update keepalive. %% @doc Update keepalive.
%% The statval of the previous keepalive will be used, %% The statval of the previous keepalive will be used,
%% and normal checks will begin from the next cycle. %% and normal checks will begin from the next cycle.
-spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. -spec update(atom(), non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined.
update(Interval, undefined) -> init(0, Interval); update(Zone, Interval, undefined) -> init(Zone, 0, Interval);
update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval). update(Zone, Interval, #keepalive{statval = StatVal}) -> init(Zone, StatVal, Interval).

View File

@ -3491,6 +3491,7 @@ mqtt_general() ->
)}, )},
{"max_clientid_len", {"max_clientid_len",
sc( sc(
%% MQTT-v3.1.1-[MQTT-3.1.3-5], MQTT-v5.0-[MQTT-3.1.3-5]
range(23, 65535), range(23, 65535),
#{ #{
default => 65535, default => 65535,
@ -3612,9 +3613,17 @@ mqtt_general() ->
desc => ?DESC(mqtt_keepalive_multiplier) desc => ?DESC(mqtt_keepalive_multiplier)
} }
)}, )},
{"keepalive_check_interval",
sc(
timeout_duration(),
#{
default => <<"30s">>,
desc => ?DESC(mqtt_keepalive_check_interval)
}
)},
{"retry_interval", {"retry_interval",
sc( sc(
duration(), timeout_duration(),
#{ #{
default => <<"30s">>, default => <<"30s">>,
desc => ?DESC(mqtt_retry_interval) desc => ?DESC(mqtt_retry_interval)

View File

@ -555,8 +555,7 @@ handle_info(Info, State) ->
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);
handle_timeout(TRef, keepalive, State) when is_reference(TRef) -> handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
RecvOct = emqx_pd:get_counter(recv_oct), with_channel(handle_timeout, [TRef, keepalive], State);
handle_timeout(TRef, {keepalive, RecvOct}, State);
handle_timeout( handle_timeout(
TRef, TRef,
emit_stats, emit_stats,

View File

@ -428,6 +428,7 @@ zone_global_defaults() ->
ignore_loop_deliver => false, ignore_loop_deliver => false,
keepalive_backoff => 0.75, keepalive_backoff => 0.75,
keepalive_multiplier => 1.5, keepalive_multiplier => 1.5,
keepalive_check_interval => 30000,
max_awaiting_rel => 100, max_awaiting_rel => 100,
max_clientid_len => 65535, max_clientid_len => 65535,
max_inflight => 32, max_inflight => 32,

View File

@ -64,7 +64,10 @@ groups() ->
t_malformed_connect_header, t_malformed_connect_header,
t_malformed_connect_data, t_malformed_connect_data,
t_reserved_connect_flag, t_reserved_connect_flag,
t_invalid_clientid t_invalid_clientid,
t_undefined_password,
t_invalid_will_retain,
t_invalid_will_qos
]}, ]},
{connack, [parallel], [ {connack, [parallel], [
t_serialize_parse_connack, t_serialize_parse_connack,
@ -738,6 +741,56 @@ t_undefined_password(_) ->
), ),
ok. ok.
t_invalid_will_retain(_) ->
ConnectFlags = <<2#01100000>>,
ConnectBin =
<<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55,
122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84,
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
?assertException(
throw,
{frame_parse_error, invalid_will_retain},
emqx_frame:parse(ConnectBin)
),
ok.
t_invalid_will_qos(_) ->
Will_F_WillQoS0 = <<2#010:3, 2#00:2, 2#000:3>>,
Will_F_WillQoS1 = <<2#010:3, 2#01:2, 2#000:3>>,
Will_F_WillQoS2 = <<2#010:3, 2#10:2, 2#000:3>>,
Will_F_WillQoS3 = <<2#010:3, 2#11:2, 2#000:3>>,
Will_T_WillQoS3 = <<2#011:3, 2#11:2, 2#000:3>>,
ConnectBinFun = fun(ConnectFlags) ->
<<16, 51, 0, 4, 77, 81, 84, 84, 5, ConnectFlags/binary, 174, 157, 24, 38, 0, 14, 98, 55,
122, 51, 83, 73, 89, 50, 54, 79, 77, 73, 65, 86, 0, 5, 66, 117, 53, 57, 66, 0, 6, 84,
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>
end,
?assertMatch(
{ok, _, _, _},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS0))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
),
ok.
parse_serialize(Packet) -> parse_serialize(Packet) ->
parse_serialize(Packet, #{strict_mode => true}). parse_serialize(Packet, #{strict_mode => true}).

View File

@ -19,22 +19,180 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"listeners {"
"tcp.default.bind = 1883,"
"ssl.default = marked_for_deletion,"
"quic.default = marked_for_deletion,"
"ws.default = marked_for_deletion,"
"wss.default = marked_for_deletion"
"}"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
t_check_keepalive_default_timeout(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000),
erlang:process_flag(trap_exit, true),
ClientID = <<"default">>,
KeepaliveSec = 10,
{ok, C} = emqtt:start_link([
{keepalive, KeepaliveSec},
{clientid, binary_to_list(ClientID)}
]),
{ok, _} = emqtt:connect(C),
emqtt:pause(C),
[ChannelPid] = emqx_cm:lookup_channels(ClientID),
erlang:link(ChannelPid),
CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
?assertMatch(5000, CheckInterval),
%% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5
%% connect T0(packet = 1, idle_milliseconds = 0)
%% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000)
%% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000)
%% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout
Timeout = CheckInterval * 3,
%% connector but not send a packet.
?assertMatch(
no_keepalive_timeout_received,
receive_msg_in_time(ChannelPid, C, Timeout - 200),
Timeout - 200
),
?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)).
t_check_keepalive_other_timeout(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 2000),
erlang:process_flag(trap_exit, true),
ClientID = <<"other">>,
KeepaliveSec = 10,
{ok, C} = emqtt:start_link([
{keepalive, KeepaliveSec},
{clientid, binary_to_list(ClientID)}
]),
{ok, _} = emqtt:connect(C),
emqtt:pause(C),
{ok, _, [0]} = emqtt:subscribe(C, <<"mytopic">>, []),
[ChannelPid] = emqx_cm:lookup_channels(ClientID),
erlang:link(ChannelPid),
%%CheckInterval = ceil(keepalive_check_factor() * KeepaliveSec * 1000),
CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
?assertMatch(2000, CheckInterval),
%% when keepalive_check_interval is 2s and keepalive_multiplier is 1.5
%% connect T0(packet = 1, idle_milliseconds = 0)
%% subscribe T1(packet = 2, idle_milliseconds = 0)
%% check1 T2(packet = 2, idle_milliseconds = 1 * CheckInterval = 2000)
%% check2 T3(packet = 2, idle_milliseconds = 2 * CheckInterval = 4000)
%% check3 T4(packet = 2, idle_milliseconds = 3 * CheckInterval = 6000)
%% check4 T5(packet = 2, idle_milliseconds = 4 * CheckInterval = 8000)
%% check4 T6(packet = 2, idle_milliseconds = 5 * CheckInterval = 10000)
%% check4 T7(packet = 2, idle_milliseconds = 6 * CheckInterval = 12000)
%% check4 T8(packet = 2, idle_milliseconds = 7 * CheckInterval = 14000)
%% check4 T9(packet = 2, idle_milliseconds = 8 * CheckInterval = 16000) > 15000 timeout
Timeout = CheckInterval * 9,
?assertMatch(
no_keepalive_timeout_received,
receive_msg_in_time(ChannelPid, C, Timeout - 200),
Timeout - 200
),
?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200), Timeout).
t_check_keepalive_ping_reset_timer(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 100000),
erlang:process_flag(trap_exit, true),
ClientID = <<"ping_reset">>,
KeepaliveSec = 10,
{ok, C} = emqtt:start_link([
{keepalive, KeepaliveSec},
{clientid, binary_to_list(ClientID)}
]),
{ok, _} = emqtt:connect(C),
emqtt:pause(C),
ct:sleep(1000),
emqtt:resume(C),
pong = emqtt:ping(C),
emqtt:pause(C),
[ChannelPid] = emqx_cm:lookup_channels(ClientID),
erlang:link(ChannelPid),
CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
?assertMatch(5000, CheckInterval),
%% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5
%% connect T0(packet = 1, idle_milliseconds = 0)
%% sleep 1000ms
%% ping (packet = 2, idle_milliseconds = 0) restart timer
%% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000)
%% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000)
%% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout
Timeout = CheckInterval * 3,
?assertMatch(
no_keepalive_timeout_received,
receive_msg_in_time(ChannelPid, C, Timeout - 200),
Timeout - 200
),
?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)).
t_check(_) -> t_check(_) ->
emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000),
Keepalive = emqx_keepalive:init(60), Keepalive = emqx_keepalive:init(60),
?assertEqual(60, emqx_keepalive:info(interval, Keepalive)), ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive)),
?assertEqual(0, emqx_keepalive:info(statval, Keepalive)), ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)),
Info = emqx_keepalive:info(Keepalive), Info = emqx_keepalive:info(Keepalive),
?assertEqual( ?assertEqual(
#{ #{
interval => 60, check_interval => 30000,
statval => 0 statval => 0,
idle_milliseconds => 0,
%% 60 * 1.5 * 1000
max_idle_millisecond => 90000
}, },
Info Info
), ),
{ok, Keepalive1} = emqx_keepalive:check(1, Keepalive), {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)), ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)). {ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
{ok, Keepalive3} = emqx_keepalive:check(1, Keepalive2),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive3)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive3)),
Keepalive4 = emqx_keepalive:init(90),
?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive4)),
Keepalive5 = emqx_keepalive:init(1),
?assertEqual(1000, emqx_keepalive:info(check_interval, Keepalive5)),
ok.
keepalive_multiplier() ->
emqx_config:get_zone_conf(default, [mqtt, keepalive_multiplier]).
keepalive_check_interval() ->
emqx_config:get_zone_conf(default, [mqtt, keepalive_check_interval]).
receive_msg_in_time(ChannelPid, C, Timeout) ->
receive
{'EXIT', ChannelPid, {shutdown, keepalive_timeout}} ->
receive
{'EXIT', C, {shutdown, tcp_closed}} ->
ok
after 500 ->
throw(no_tcp_closed_from_mqtt_client)
end
after Timeout ->
no_keepalive_timeout_received
end.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge, [ {application, emqx_bridge, [
{description, "EMQX bridges"}, {description, "EMQX bridges"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, [emqx_bridge_sup]}, {registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, [ {applications, [

View File

@ -55,6 +55,8 @@
%% only for testing/mocking %% only for testing/mocking
-export([supported_versions/1]). -export([supported_versions/1]).
-export([format_bridge_metrics/1, format_metrics/1]).
-define(BPAPI_NAME, emqx_bridge). -define(BPAPI_NAME, emqx_bridge).
-define(BRIDGE_NOT_ENABLED, -define(BRIDGE_NOT_ENABLED,

View File

@ -945,6 +945,7 @@ t_on_get_status(Config, Opts) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
FailureStatus = maps:get(failure_status, Opts, disconnected), FailureStatus = maps:get(failure_status, Opts, disconnected),
NormalStatus = maps:get(normal_status, Opts, connected),
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge(Config)),
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to %% Since the connection process is async, we give it some time to
@ -952,7 +953,7 @@ t_on_get_status(Config, Opts) ->
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,
_Attempts = 20, _Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId))
), ),
case ProxyHost of case ProxyHost of
undefined -> undefined ->
@ -971,7 +972,7 @@ t_on_get_status(Config, Opts) ->
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,
_Attempts = 20, _Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId))
) )
end, end,
ok. ok.

View File

@ -1448,7 +1448,10 @@ t_connection_down_before_starting(Config) ->
), ),
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
{ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef0),
?assertMatch({ok, connecting}, health_check(Config)), ?assertMatch(
{ok, Status} when Status =:= connecting orelse Status =:= disconnected,
health_check(Config)
),
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort),
?retry( ?retry(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [ {application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"}, {description, "EMQX GreptimeDB Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -363,7 +363,7 @@ do_start_client(
{error, Reason} {error, Reason}
end. end.
grpc_config() -> grpc_opts() ->
#{ #{
sync_start => true, sync_start => true,
connect_timeout => ?CONNECT_TIMEOUT connect_timeout => ?CONNECT_TIMEOUT
@ -382,7 +382,7 @@ client_config(
{pool, InstId}, {pool, InstId},
{pool_type, random}, {pool_type, random},
{auto_reconnect, ?AUTO_RECONNECT_S}, {auto_reconnect, ?AUTO_RECONNECT_S},
{gprc_options, grpc_config()} {grpc_opts, grpc_opts()}
] ++ protocol_config(Config). ] ++ protocol_config(Config).
protocol_config( protocol_config(

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, redis). -define(BRIDGE_TYPE, redis).
-define(BRIDGE_TYPE_BIN, <<"redis">>). -define(BRIDGE_TYPE_BIN, <<"redis">>).
@ -46,6 +47,7 @@ matrix_testcases() ->
t_start_stop, t_start_stop,
t_create_via_http, t_create_via_http,
t_on_get_status, t_on_get_status,
t_on_get_status_no_username_pass,
t_sync_query, t_sync_query,
t_map_to_redis_hset_args t_map_to_redis_hset_args
]. ].
@ -325,6 +327,43 @@ t_on_get_status(Config) when is_list(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok. ok.
t_on_get_status_no_username_pass(matrix) ->
{on_get_status, [
[single, tcp],
[cluster, tcp],
[sentinel, tcp]
]};
t_on_get_status_no_username_pass(Config0) when is_list(Config0) ->
ConnectorConfig0 = ?config(connector_config, Config0),
ConnectorConfig1 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"password">>], ConnectorConfig0, <<"">>
),
ConnectorConfig2 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"username">>], ConnectorConfig1, <<"">>
),
Config1 = proplists:delete(connector_config, Config0),
Config2 = [{connector_config, ConnectorConfig2} | Config1],
?check_trace(
emqx_bridge_v2_testlib:t_on_get_status(
Config2,
#{
failure_status => disconnected,
normal_status => disconnected
}
),
fun(ok, Trace) ->
case ?config(redis_type, Config2) of
single ->
?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace));
sentinel ->
?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace));
cluster ->
ok
end
end
),
ok.
t_sync_query(matrix) -> t_sync_query(matrix) ->
{sync_query, [ {sync_query, [
[single, tcp], [single, tcp],

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_s3, [ {application, emqx_bridge_s3, [
{description, "EMQX Enterprise S3 Bridge"}, {description, "EMQX Enterprise S3 Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -146,29 +146,22 @@ on_stop(InstId, _State = #{pool_name := PoolName}) ->
on_get_status(_InstId, State = #{client_config := Config}) -> on_get_status(_InstId, State = #{client_config := Config}) ->
case emqx_s3_client:aws_config(Config) of case emqx_s3_client:aws_config(Config) of
{error, Reason} -> {error, Reason} ->
{?status_disconnected, State, Reason}; {?status_disconnected, State, map_error_details(Reason)};
AWSConfig -> AWSConfig ->
try erlcloud_s3:list_buckets(AWSConfig) of try erlcloud_s3:list_buckets(AWSConfig) of
Props when is_list(Props) -> Props when is_list(Props) ->
?status_connected ?status_connected
catch catch
error:{aws_error, {http_error, _Code, _, Reason}} -> error:Error ->
{?status_disconnected, State, Reason}; {?status_disconnected, State, map_error_details(Error)}
error:{aws_error, {socket_error, Reason}} ->
{?status_disconnected, State, Reason}
end end
end. end.
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
{ok, state()} | {error, _Reason}. {ok, state()} | {error, _Reason}.
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
try ChannelState = start_channel(State, Config),
ChannelState = start_channel(State, Config), {ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}
catch
throw:Reason ->
{error, Reason}
end.
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
{ok, state()}. {ok, state()}.
@ -217,7 +210,8 @@ start_channel(State, #{
max_records := MaxRecords max_records := MaxRecords
}, },
container := Container, container := Container,
bucket := Bucket bucket := Bucket,
key := Key
} }
}) -> }) ->
AggregId = {Type, Name}, AggregId = {Type, Name},
@ -226,7 +220,7 @@ start_channel(State, #{
max_records => MaxRecords, max_records => MaxRecords,
work_dir => work_dir(Type, Name) work_dir => work_dir(Type, Name)
}, },
Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)), Template = emqx_bridge_s3_upload:mk_key_template(Key),
DeliveryOpts = #{ DeliveryOpts = #{
bucket => Bucket, bucket => Bucket,
key => Template, key => Template,
@ -253,11 +247,6 @@ start_channel(State, #{
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
}. }.
ensure_ok({ok, V}) ->
V;
ensure_ok({error, Reason}) ->
throw(Reason).
upload_options(Parameters) -> upload_options(Parameters) ->
#{acl => maps:get(acl, Parameters, undefined)}. #{acl => maps:get(acl, Parameters, undefined)}.
@ -285,7 +274,7 @@ channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, S
check_bucket_accessible(Bucket, #{client_config := Config}) -> check_bucket_accessible(Bucket, #{client_config := Config}) ->
case emqx_s3_client:aws_config(Config) of case emqx_s3_client:aws_config(Config) of
{error, Reason} -> {error, Reason} ->
throw({unhealthy_target, Reason}); throw({unhealthy_target, map_error_details(Reason)});
AWSConfig -> AWSConfig ->
try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of
Props when is_list(Props) -> Props when is_list(Props) ->
@ -293,8 +282,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
catch catch
error:{aws_error, {http_error, 404, _, _Reason}} -> error:{aws_error, {http_error, 404, _, _Reason}} ->
throw({unhealthy_target, "Bucket does not exist"}); throw({unhealthy_target, "Bucket does not exist"});
error:{aws_error, {socket_error, Reason}} -> error:Error ->
throw({unhealthy_target, emqx_utils:format(Reason)}) throw({unhealthy_target, map_error_details(Error)})
end end
end. end.
@ -304,8 +293,7 @@ check_aggreg_upload_errors(AggregId) ->
%% TODO %% TODO
%% This approach means that, for example, 3 upload failures will cause %% This approach means that, for example, 3 upload failures will cause
%% the channel to be marked as unhealthy for 3 consecutive health checks. %% the channel to be marked as unhealthy for 3 consecutive health checks.
ErrorMessage = emqx_utils:format(Error), throw({unhealthy_target, map_error_details(Error)});
throw({unhealthy_target, ErrorMessage});
[] -> [] ->
ok ok
end. end.
@ -384,16 +372,38 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
ok; ok;
{error, Reason} -> {error, Reason} ->
{error, {unrecoverable_error, Reason}} {error, {unrecoverable_error, emqx_utils:explain_posix(Reason)}}
end. end.
map_error({socket_error, _} = Reason) -> map_error(Error) ->
{recoverable_error, Reason}; {map_error_class(Error), map_error_details(Error)}.
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
map_error_class({s3_error, _, _}) ->
unrecoverable_error;
map_error_class({aws_error, Error}) ->
map_error_class(Error);
map_error_class({socket_error, _}) ->
recoverable_error;
map_error_class({http_error, Status, _, _}) when Status >= 500 ->
%% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList %% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
{recoverable_error, Reason}; recoverable_error;
map_error(Reason) -> map_error_class(_Error) ->
{unrecoverable_error, Reason}. unrecoverable_error.
map_error_details({s3_error, Code, Message}) ->
emqx_utils:format("S3 error: ~s ~s", [Code, Message]);
map_error_details({aws_error, Error}) ->
map_error_details(Error);
map_error_details({socket_error, Reason}) ->
emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]);
map_error_details({http_error, _, _, _} = Error) ->
emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]);
map_error_details({failed_to_obtain_credentials, Error}) ->
emqx_utils:format("Unable to obtain AWS credentials: ~s", [map_error_details(Error)]);
map_error_details({upload_failed, Error}) ->
map_error_details(Error);
map_error_details(Error) ->
Error.
render_bucket(Template, Data) -> render_bucket(Template, Data) ->
case emqx_template:render(Template, {emqx_jsonish, Data}) of case emqx_template:render(Template, {emqx_jsonish, Data}) of
@ -416,6 +426,32 @@ render_content(Template, Data) ->
iolist_to_string(IOList) -> iolist_to_string(IOList) ->
unicode:characters_to_list(IOList). unicode:characters_to_list(IOList).
%%
-include_lib("xmerl/include/xmerl.hrl").
-spec map_aws_error_details(_AWSError) ->
unicode:chardata().
map_aws_error_details({http_error, _Status, _, Body}) ->
try xmerl_scan:string(unicode:characters_to_list(Body), [{quiet, true}]) of
{Error = #xmlElement{name = 'Error'}, _} ->
map_aws_error_details(Error);
_ ->
Body
catch
exit:_ ->
Body
end;
map_aws_error_details(#xmlElement{content = Content}) ->
Code = extract_xml_text(lists:keyfind('Code', #xmlElement.name, Content)),
Message = extract_xml_text(lists:keyfind('Message', #xmlElement.name, Content)),
[Code, $:, $\s | Message].
extract_xml_text(#xmlElement{content = Content}) ->
[Fragment || #xmlText{value = Fragment} <- Content];
extract_xml_text(false) ->
[].
%% `emqx_connector_aggreg_delivery` APIs %% `emqx_connector_aggreg_delivery` APIs
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t(). -spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().

View File

@ -29,7 +29,10 @@
]). ]).
%% Internal exports %% Internal exports
-export([convert_actions/2]). -export([
convert_actions/2,
validate_key_template/1
]).
-define(DEFAULT_AGGREG_BATCH_SIZE, 100). -define(DEFAULT_AGGREG_BATCH_SIZE, 100).
-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>). -define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) ->
)} )}
], ],
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
{key, #{desc => ?DESC(s3_aggregated_upload_key)}} {key, #{
desc => ?DESC(s3_aggregated_upload_key),
validator => fun ?MODULE:validate_key_template/1
}}
]), ]),
emqx_s3_schema:fields(s3_uploader) emqx_s3_schema:fields(s3_uploader)
]); ]);
@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou
Conf#{<<"resource_opts">> := NResourceOpts} Conf#{<<"resource_opts">> := NResourceOpts}
end. end.
%% Interpreting options validate_key_template(Conf) ->
Template = emqx_template:parse(Conf),
-spec mk_key_template(_Parameters :: map()) ->
{ok, emqx_template:str()} | {error, _Reason}.
mk_key_template(#{key := Key}) ->
Template = emqx_template:parse(Key),
case validate_bindings(emqx_template:placeholders(Template)) of case validate_bindings(emqx_template:placeholders(Template)) of
UsedBindings when is_list(UsedBindings) -> Bindings when is_list(Bindings) ->
SuffixTemplate = mk_suffix_template(UsedBindings), ok;
case emqx_template:is_const(SuffixTemplate) of {error, {disallowed_placeholders, Disallowed}} ->
true -> {error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])}
{ok, Template};
false ->
{ok, Template ++ SuffixTemplate}
end;
Error = {error, _} ->
Error
end. end.
validate_bindings(Bindings) -> validate_bindings(Bindings) ->
@ -276,7 +272,22 @@ validate_bindings(Bindings) ->
[] -> [] ->
Bindings; Bindings;
Disallowed -> Disallowed ->
{error, {invalid_key_template, {disallowed_placeholders, Disallowed}}} {error, {disallowed_placeholders, Disallowed}}
end.
%% Interpreting options
-spec mk_key_template(unicode:chardata()) ->
emqx_template:str().
mk_key_template(Key) ->
Template = emqx_template:parse(Key),
UsedBindings = emqx_template:placeholders(Template),
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
Template;
false ->
Template ++ SuffixTemplate
end. end.
mk_suffix_template(UsedBindings) -> mk_suffix_template(UsedBindings) ->

View File

@ -134,6 +134,22 @@ action_config(Name, ConnectorId) ->
t_start_stop(Config) -> t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
t_create_unavailable_credentials(Config) ->
ConnectorName = ?config(connector_name, Config),
ConnectorType = ?config(connector_type, Config),
ConnectorConfig = maps:without(
[<<"access_key_id">>, <<"secret_access_key">>],
?config(connector_config, Config)
),
?assertMatch(
{ok,
{{_HTTP, 201, _}, _, #{
<<"status_reason">> :=
<<"Unable to obtain AWS credentials:", _/bytes>>
}}},
emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig)
).
t_ignore_batch_opts(Config) -> t_ignore_batch_opts(Config) ->
{ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config), {ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
?assertMatch( ?assertMatch(
@ -159,6 +175,13 @@ t_start_broken_update_restart(Config) ->
_Attempts = 20, _Attempts = 20,
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId)) ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
), ),
?assertMatch(
{ok,
{{_HTTP, 200, _}, _, #{
<<"status_reason">> := <<"AWS error: SignatureDoesNotMatch:", _/bytes>>
}}},
emqx_bridge_v2_testlib:get_connector_api(Type, Name)
),
?assertMatch( ?assertMatch(
{ok, {{_HTTP, 200, _}, _, _}}, {ok, {{_HTTP, 200, _}, _, _}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf) emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)

View File

@ -177,6 +177,27 @@ t_create_invalid_config(Config) ->
) )
). ).
t_create_invalid_config_key_template(Config) ->
?assertMatch(
{error,
{_Status, _, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>,
<<"path">> := <<"root.parameters.key">>
}
}}},
emqx_bridge_v2_testlib:create_bridge_api(
Config,
_Overrides = #{
<<"parameters">> => #{
<<"key">> => <<"${action}/${foo}:${bar.rfc3339}">>
}
}
)
).
t_update_invalid_config(Config) -> t_update_invalid_config(Config) ->
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
?assertMatch( ?assertMatch(

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [ {application, emqx_conf, [
{description, "EMQX configuration management"}, {description, "EMQX configuration management"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{mod, {emqx_conf_app, []}}, {mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},

View File

@ -163,8 +163,13 @@ dump_schema(Dir, SchemaModule) ->
), ),
emqx_dashboard:save_dispatch_eterm(SchemaModule). emqx_dashboard:save_dispatch_eterm(SchemaModule).
load(emqx_enterprise_schema, emqx_telemetry) -> ignore; load(emqx_enterprise_schema, emqx_telemetry) ->
load(_, Lib) -> ok = application:load(Lib). ignore;
load(_, Lib) ->
case application:load(Lib) of
ok -> ok;
{error, {already_loaded, _}} -> ok
end.
%% for scripts/spellcheck. %% for scripts/spellcheck.
gen_schema_json(Dir, SchemaModule, Lang) -> gen_schema_json(Dir, SchemaModule, Lang) ->

View File

@ -74,13 +74,14 @@ end_per_testcase(_Config) ->
t_base_test(_Config) -> t_base_test(_Config) ->
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
Pid = self(), Pid = self(),
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, Msg = ?FUNCTION_NAME,
MFA = {M, F, A} = {?MODULE, echo, [Pid, Msg]},
{ok, TnxId, ok} = multicall(M, F, A), {ok, TnxId, ok} = multicall(M, F, A),
{atomic, Query} = emqx_cluster_rpc:query(TnxId), {atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(MFA, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)), ?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)), ?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)), ?assertEqual(ok, receive_msg(3, Msg)),
?assertEqual({ok, 2, ok}, multicall(M, F, A)), ?assertEqual({ok, 2, ok}, multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of case length(Status) =:= 3 of
@ -118,9 +119,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(), emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
Pid = self(), Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, Msg = ?FUNCTION_NAME,
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)), ?assertEqual(ok, receive_msg(3, Msg)),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = multicall(M, F, A, 1, 1000), {ok, _, ok} = multicall(M, F, A, 1, 1000),
@ -154,9 +156,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
t_commit_concurrency(_Config) -> t_commit_concurrency(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
Pid = self(), Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, Msg = ?FUNCTION_NAME,
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
?assertEqual(ok, receive_msg(3, test)), ?assertEqual({ok, 1, ok}, multicall(BaseM, BaseF, BaseA)),
?assertEqual(ok, receive_msg(3, Msg)),
%% call concurrently without stale tnx_id error %% call concurrently without stale tnx_id error
Workers = lists:seq(1, 256), Workers = lists:seq(1, 256),
@ -231,23 +234,24 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{atomic, [_Status | L]} = emqx_cluster_rpc:status(), {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L), ?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}), ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = multicall(io, format, ["test"], 1, 1000), {ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
ct:sleep(1000), ct:sleep(1000),
{atomic, NewStatus} = emqx_cluster_rpc:status(), {atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)), ?assertEqual(3, length(NewStatus)),
Pid = self(), Pid = self(),
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, Msg = ?FUNCTION_NAME,
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
{ok, TnxId, ok} = multicall(M1, F1, A1), {ok, TnxId, ok} = multicall(M1, F1, A1),
{atomic, Query} = emqx_cluster_rpc:query(TnxId), {atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(MFAEcho, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)), ?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)), ?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)), ?assertEqual(ok, receive_msg(3, Msg)),
ok. ok.
t_del_stale_mfa(_Config) -> t_del_stale_mfa(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {io, format, ["test"]}, MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]},
Keys = lists:seq(1, 50), Keys = lists:seq(1, 50),
Keys2 = lists:seq(51, 150), Keys2 = lists:seq(51, 150),
Ids = Ids =
@ -288,7 +292,7 @@ t_del_stale_mfa(_Config) ->
t_skip_failed_commit(_Config) -> t_skip_failed_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180), ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(), {atomic, List1} = emqx_cluster_rpc:status(),
Node = node(), Node = node(),
@ -308,7 +312,7 @@ t_skip_failed_commit(_Config) ->
t_fast_forward_commit(_Config) -> t_fast_forward_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180), ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(), {atomic, List1} = emqx_cluster_rpc:status(),
Node = node(), Node = node(),
@ -356,7 +360,11 @@ tnx_ids(Status) ->
start() -> start() ->
{ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
{ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
ok = emqx_cluster_rpc:reset(), ok = emqx_cluster_rpc:reset(),
%% Ensure all processes are idle status.
ok = gen_server:call(?NODE2, test),
ok = gen_server:call(?NODE3, test),
ok. ok.
stop() -> stop() ->
@ -366,6 +374,7 @@ stop() ->
undefined -> undefined ->
ok; ok;
P -> P ->
erlang:unregister(N),
erlang:unlink(P), erlang:unlink(P),
erlang:exit(P, kill) erlang:exit(P, kill)
end end
@ -379,8 +388,9 @@ receive_msg(Count, Msg) when Count > 0 ->
receive receive
Msg -> Msg ->
receive_msg(Count - 1, Msg) receive_msg(Count - 1, Msg)
after 1000 -> after 1300 ->
timeout Msg = iolist_to_binary(io_lib:format("There's still ~w messages to be received", [Count])),
{Msg, flush_msg([])}
end. end.
echo(Pid, Msg) -> echo(Pid, Msg) ->
@ -425,3 +435,11 @@ multicall(M, F, A, N, T) ->
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)). multicall(M, F, A, all, timer:minutes(2)).
flush_msg(Acc) ->
receive
Msg ->
flush_msg([Msg | Acc])
after 10 ->
Acc
end.

View File

@ -85,7 +85,7 @@
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
-define(DEF_IDLE_TIME, timer:seconds(30)). -define(DEF_IDLE_SECONDS, 30).
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
@ -149,7 +149,7 @@ init(
mountpoint => Mountpoint mountpoint => Mountpoint
} }
), ),
Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME), Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_SECONDS),
#channel{ #channel{
ctx = Ctx, ctx = Ctx,
conninfo = ConnInfo, conninfo = ConnInfo,
@ -378,7 +378,7 @@ ensure_keepalive_timer(Channel) ->
ensure_keepalive_timer(fun ensure_timer/4, Channel). ensure_keepalive_timer(fun ensure_timer/4, Channel).
ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) -> ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
Heartbeat = emqx_keepalive:info(interval, KeepAlive), Heartbeat = emqx_keepalive:info(check_interval, KeepAlive),
Fun(keepalive, Heartbeat, keepalive, Channel). Fun(keepalive, Heartbeat, keepalive, Channel).
check_auth_state(Msg, #channel{connection_required = false} = Channel) -> check_auth_state(Msg, #channel{connection_required = false} = Channel) ->
@ -495,7 +495,7 @@ enrich_conninfo(
) -> ) ->
case Queries of case Queries of
#{<<"clientid">> := ClientId} -> #{<<"clientid">> := ClientId} ->
Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), Interval = emqx_keepalive:info(check_interval, KeepAlive),
NConnInfo = ConnInfo#{ NConnInfo = ConnInfo#{
clientid => ClientId, clientid => ClientId,
proto_name => <<"CoAP">>, proto_name => <<"CoAP">>,

View File

@ -19,12 +19,6 @@
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-type duration() :: non_neg_integer().
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-reflect_type([duration/0]).
%% config schema provides %% config schema provides
-export([namespace/0, fields/1, desc/1]). -export([namespace/0, fields/1, desc/1]).
@ -34,7 +28,7 @@ fields(coap) ->
[ [
{heartbeat, {heartbeat,
sc( sc(
duration(), emqx_schema:duration_s(),
#{ #{
default => <<"30s">>, default => <<"30s">>,
desc => ?DESC(coap_heartbeat) desc => ?DESC(coap_heartbeat)

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_coap, [ {application, emqx_gateway_coap, [
{description, "CoAP Gateway"}, {description, "CoAP Gateway"},
{vsn, "0.1.8"}, {vsn, "0.1.9"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -100,7 +100,7 @@ init_per_testcase(t_heartbeat, Config) ->
OldConf = emqx:get_raw_config([gateway, coap]), OldConf = emqx:get_raw_config([gateway, coap]),
{ok, _} = emqx_gateway_conf:update_gateway( {ok, _} = emqx_gateway_conf:update_gateway(
coap, coap,
OldConf#{<<"heartbeat">> => <<"800ms">>} OldConf#{<<"heartbeat">> => <<"1s">>}
), ),
[ [
{old_conf, OldConf}, {old_conf, OldConf},
@ -216,8 +216,9 @@ t_heartbeat(Config) ->
[], [],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
), ),
%% The minimum timeout time is 1 second.
timer:sleep(Heartbeat * 2), %% 1.5 * Heartbeat + 0.5 * Heartbeat(< 1s) = 1.5 * 1 + 1 = 2.5
timer:sleep(Heartbeat * 2 + 1000),
?assertEqual( ?assertEqual(
[], [],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)

View File

@ -715,7 +715,7 @@ ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
Channel; Channel;
ensure_keepalive_timer(Interval, Channel) -> ensure_keepalive_timer(Interval, Channel) ->
StatVal = emqx_gateway_conn:keepalive_stats(recv), StatVal = emqx_gateway_conn:keepalive_stats(recv),
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), Keepalive = emqx_keepalive:init(default, StatVal, Interval),
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
ensure_timer(Name, Channel = #channel{timers = Timers}) -> ensure_timer(Name, Channel = #channel{timers = Timers}) ->
@ -746,7 +746,7 @@ interval(force_close_idle, #channel{conninfo = #{idle_timeout := IdleTimeout}})
interval(force_close, _) -> interval(force_close, _) ->
15000; 15000;
interval(keepalive, #channel{keepalive = Keepalive}) -> interval(keepalive, #channel{keepalive = Keepalive}) ->
emqx_keepalive:info(interval, Keepalive). emqx_keepalive:info(check_interval, Keepalive).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Dispatch %% Dispatch

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_exproto, [ {application, emqx_gateway_exproto, [
{description, "ExProto Gateway"}, {description, "ExProto Gateway"},
{vsn, "0.1.11"}, {vsn, "0.1.12"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_gbt32960, [ {application, emqx_gateway_gbt32960, [
{description, "GBT32960 Gateway"}, {description, "GBT32960 Gateway"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -506,7 +506,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_timer, #channel{retx_interval = RetxIntv}) -> interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
RetxIntv. RetxIntv.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_jt808, [ {application, emqx_gateway_jt808, [
{description, "JT/T 808 Gateway"}, {description, "JT/T 808 Gateway"},
{vsn, "0.0.3"}, {vsn, "0.1.0"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -616,7 +616,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_timer, #channel{retx_interval = RetxIntv}) -> interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
RetxIntv. RetxIntv.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_mqttsn, [ {application, emqx_gateway_mqttsn, [
{description, "MQTT-SN Gateway"}, {description, "MQTT-SN Gateway"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -430,7 +430,7 @@ ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(0, Channel) -> ensure_keepalive_timer(0, Channel) ->
Channel; Channel;
ensure_keepalive_timer(Interval, Channel) -> ensure_keepalive_timer(Interval, Channel) ->
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))), Keepalive = emqx_keepalive:init(Interval),
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -2245,7 +2245,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. Channel#channel{timers = maps:remove(Name, Timers)}.
interval(keepalive, #channel{keepalive = KeepAlive}) -> interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_delivery, #channel{session = Session}) -> interval(retry_delivery, #channel{session = Session}) ->
emqx_mqttsn_session:info(retry_interval, Session); emqx_mqttsn_session:info(retry_interval, Session);
interval(expire_awaiting_rel, #channel{session = Session}) -> interval(expire_awaiting_rel, #channel{session = Session}) ->

View File

@ -1109,7 +1109,7 @@ t_keepalive(_Config) ->
[Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
%% will reset to max keepalive if keepalive > max keepalive %% will reset to max keepalive if keepalive > max keepalive
#{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid), #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))), ?assertMatch({keepalive, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))),
{ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
#{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]), #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]),

View File

@ -1,6 +1,6 @@
{application, emqx_redis, [ {application, emqx_redis, [
{description, "EMQX Redis Database Connector"}, {description, "EMQX Redis Database Connector"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -19,6 +19,8 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]). -export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]).
@ -231,7 +233,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) ->
is_unrecoverable_error(_) -> is_unrecoverable_error(_) ->
false. false.
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) ->
case eredis_cluster:pool_exists(PoolName) of case eredis_cluster:pool_exists(PoolName) of
true -> true ->
%% eredis_cluster has null slot even pool_exists when emqx start before redis cluster. %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
@ -240,26 +242,51 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
%% In this case, we can directly consider it as a disconnect and then proceed to reconnect. %% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
case eredis_cluster_monitor:get_all_pools(PoolName) of case eredis_cluster_monitor:get_all_pools(PoolName) of
[] -> [] ->
disconnected; ?status_disconnected;
[_ | _] -> [_ | _] ->
Health = eredis_cluster:ping_all(PoolName), do_cluster_status_check(PoolName, State)
status_result(Health)
end; end;
false -> false ->
disconnected ?status_disconnected
end; end;
on_get_status(_InstId, #{pool_name := PoolName}) -> on_get_status(_InstId, #{pool_name := PoolName} = State) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), HealthCheckResoults = emqx_resource_pool:health_check_workers(
status_result(Health). PoolName,
fun ?MODULE:do_get_status/1,
do_get_status(Conn) -> emqx_resource_pool:health_check_timeout(),
case eredis:q(Conn, ["PING"]) of #{return_values => true}
{ok, _} -> true; ),
_ -> false case HealthCheckResoults of
{ok, Results} ->
sum_worker_results(Results, State);
Error ->
{?status_disconnected, State, Error}
end. end.
status_result(_Status = true) -> connected; do_cluster_status_check(Pool, State) ->
status_result(_Status = false) -> connecting. Pongs = eredis_cluster:qa(Pool, [<<"PING">>]),
sum_worker_results(Pongs, State).
do_get_status(Conn) ->
eredis:q(Conn, ["PING"]).
sum_worker_results([], _State) ->
?status_connected;
sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) ->
?tp(emqx_redis_auth_required_error, #{}),
%% This requires user action to fix so we set the status to disconnected
{?status_disconnected, State, {unhealthy_target, Error}};
sum_worker_results([{ok, _} | Rest], State) ->
sum_worker_results(Rest, State);
sum_worker_results([Error | _Rest], State) ->
?SLOG(
warning,
#{
msg => "emqx_redis_check_status_error",
error => Error
}
),
{?status_connecting, State, Error}.
do_cmd(PoolName, cluster, {cmd, Command}) -> do_cmd(PoolName, cluster, {cmd, Command}) ->
eredis_cluster:q(PoolName, Command); eredis_cluster:q(PoolName, Command);

View File

@ -29,6 +29,9 @@
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
-define(TELEMETRY_PREFIX, emqx, resource). -define(TELEMETRY_PREFIX, emqx, resource).
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX),
{query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
).
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -2494,7 +2497,7 @@ t_expiration_retry(_Config) ->
resume_interval => 300 resume_interval => 300
} }
), ),
do_t_expiration_retry(). do_t_expiration_retry(#{is_batch => false}).
t_expiration_retry_batch(_Config) -> t_expiration_retry_batch(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync), emqx_connector_demo:set_callback_mode(always_sync),
@ -2511,20 +2514,17 @@ t_expiration_retry_batch(_Config) ->
resume_interval => 300 resume_interval => 300
} }
), ),
do_t_expiration_retry(). do_t_expiration_retry(#{is_batch => true}).
do_t_expiration_retry() -> do_t_expiration_retry(Context) ->
IsBatch = maps:get(is_batch, Context),
ResumeInterval = 300, ResumeInterval = 300,
?check_trace( ?check_trace(
#{timetrap => 10_000},
begin begin
ok = emqx_resource:simple_sync_query(?ID, block), ok = emqx_resource:simple_sync_query(?ID, block),
{ok, SRef0} = snabbkaffe:subscribe( TimeoutMS = 200,
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
1,
200
),
TimeoutMS = 100,
%% the request that expires must be first, so it's the %% the request that expires must be first, so it's the
%% head of the inflight table (and retriable). %% head of the inflight table (and retriable).
{ok, SRef1} = snabbkaffe:subscribe( {ok, SRef1} = snabbkaffe:subscribe(
@ -2542,6 +2542,8 @@ do_t_expiration_retry() ->
) )
) )
end), end),
%% This second message must be enqueued while the resource is blocked by the
%% previous message.
Pid1 = Pid1 =
spawn_link(fun() -> spawn_link(fun() ->
receive receive
@ -2556,22 +2558,33 @@ do_t_expiration_retry() ->
) )
) )
end), end),
?tp("waiting for first message to be appended to the queue", #{}),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),
?tp("waiting for first message to expire during blocked retries", #{}),
{ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_expired}),
%% Now we wait until the worker tries the second message at least once before
%% unblocking it.
Pid1 ! go, Pid1 ! go,
{ok, _} = snabbkaffe:receive_events(SRef0), ?tp("waiting for second message to be retried and be nacked while blocked", #{}),
case IsBatch of
false ->
{ok, _} = ?block_until(#{
?snk_kind := buffer_worker_flush_nack,
batch_or_query := ?QUERY(_, {inc_counter, 2}, _, _, _)
});
true ->
{ok, _} = ?block_until(#{
?snk_kind := buffer_worker_flush_nack,
batch_or_query := [?QUERY(_, {inc_counter, 2}, _, _, _) | _]
})
end,
{ok, _} = %% Bypass the buffer worker and unblock the resource.
?block_until( ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_expired}, ?tp("waiting for second message to be retried and be acked, unblocking", #{}),
ResumeInterval * 10 {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_inflight_succeeded}),
),
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 5
),
ok ok
end, end,

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [ {application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"}, {description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.2.2"}, {vsn, "5.2.3"},
{modules, [ {modules, [
emqx_utils, emqx_utils,
emqx_utils_api, emqx_utils_api,

View File

@ -65,6 +65,7 @@
flattermap/2, flattermap/2,
tcp_keepalive_opts/4, tcp_keepalive_opts/4,
format/1, format/1,
format/2,
call_first_defined/1, call_first_defined/1,
ntoa/1, ntoa/1,
foldl_while/3, foldl_while/3,
@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
format(Term) -> format(Term) ->
iolist_to_binary(io_lib:format("~0p", [Term])). iolist_to_binary(io_lib:format("~0p", [Term])).
format(Fmt, Args) ->
unicode:characters_to_binary(io_lib:format(Fmt, Args)).
-spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return(). -spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return().
call_first_defined([{Module, Function, Args} | Rest]) -> call_first_defined([{Module, Function, Args} | Rest]) ->
try try

View File

@ -0,0 +1,5 @@
Fix the flags check and error handling related to the Will message in the `CONNECT` packet.
See also:
- MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
- MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
- MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]

View File

@ -0,0 +1,7 @@
Upgrade ekka lib to 0.19.5
ekka 0.19.5 uses mria 0.8.8 that improves auto-heal functionality.
Previously, the auto-heal worked only when all core nodes were reachable again.
This update allows to apply auto-heal once the majority of core nodes are alive.
[Mria PR](https://github.com/emqx/mria/pull/180)

View File

@ -0,0 +1,4 @@
When an S3 Bridge is improperly configured, error messages now contain more informative and easy to read details.
## Breaking changes
* S3 Bridge configuration with invalid aggregated upload key template will no longer work. Before this change, such configuration was considered valid but the bridge would never work anyway.

View File

@ -0,0 +1 @@
Improved error handling for Redis connectors. Previously, Redis connectors of type single or sentinel would always encounter a timeout error during the connector test in the dashboard if no username or password was provided. This update ensures that users now receive an informative error message in such scenarios. Additionally, more detailed error information has been added for all Redis connector types to enhance diagnostics and troubleshooting.

View File

@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true},
{:ekka, github: "emqx/ekka", tag: "0.19.3", override: true}, {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
{:minirest, github: "emqx/minirest", tag: "1.4.3", override: true}, {:minirest, github: "emqx/minirest", tag: "1.4.3", override: true},

View File

@ -83,7 +83,7 @@
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
{minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.3"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.3"}}},

View File

@ -855,6 +855,15 @@ The default value 1.5 is following the MQTT 5.0 specification. This multiplier i
mqtt_keepalive_multiplier.label: mqtt_keepalive_multiplier.label:
"""Keep Alive Multiplier""" """Keep Alive Multiplier"""
mqtt_keepalive_check_interval.desc:
"""The frequency of checking for incoming MQTT packets determines how often the server will check for new MQTT packets.
If a certain amount of time passes without any packets being sent from the client, this time will be added up.
Once the accumulated time exceeds `keepalive-interval * keepalive-multiplier`, the connection will be terminated.
The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of `keepalive-interval / 2`."""
mqtt_keepalive_check_interval.label:
"""Keep Alive Check Interval"""
force_gc_bytes.desc: force_gc_bytes.desc:
"""GC the process after specified number of bytes have passed through.""" """GC the process after specified number of bytes have passed through."""