Merge pull request #9639 from zmstone/1229-sync-release-v44-to-main-v4.4

1229 sync release v44 to main v4.4
This commit is contained in:
Zaiming (Stone) Shi 2022-12-29 15:31:38 +01:00 committed by GitHub
commit 23c786b18a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 404 additions and 79 deletions

View File

@ -55,10 +55,18 @@ runs:
fi fi
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH" git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH"
cd "$OTP_SOURCE_PATH" cd "$OTP_SOURCE_PATH"
if [ "$(arch)" = arm64 ]; then
export LDFLAGS="-L$(brew --prefix unixodbc)/lib"
export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include"
fi
./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH" ./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH"
make -j$(nproc) make -j$(nproc)
rm -rf "$OTP_INSTALL_PATH" rm -rf "$OTP_INSTALL_PATH"
make install make install
if [ "$(arch)" = arm64 ]; then
unset LDFLAGS
unset CC
fi
- name: build - name: build
env: env:
HOMEBREW_NO_AUTO_UPDATE: 1 HOMEBREW_NO_AUTO_UPDATE: 1

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_exhook, {application, emqx_exhook,
[{description, "EMQ X Extension for Hook"}, [{description, "EMQ X Extension for Hook"},
{vsn, "4.4.5"}, {vsn, "4.4.6"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{mod, {emqx_exhook_app, []}}, {mod, {emqx_exhook_app, []}},

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.4.4", [{"4.4.5",[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, [{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]},
{"4.4.3", {"4.4.3",
@ -22,7 +23,8 @@
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
{update,emqx_exhook_mngr,{advanced,["4.4.0"]}}]}, {update,emqx_exhook_mngr,{advanced,["4.4.0"]}}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.4.4", [{"4.4.5",[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, [{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]},
{"4.4.3", {"4.4.3",

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine, {application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"}, [{description, "EMQ X Rule Engine"},
{vsn, "4.4.12"}, % strict semver, bump manually! {vsn, "4.4.13"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]}, {registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]},
{applications, [kernel,stdlib,rulesql,getopt,jose]}, {applications, [kernel,stdlib,rulesql,getopt,jose]},

View File

@ -1,13 +1,13 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.4.11",[ [{"4.4.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {"4.4.11",
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} {apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}},
]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.10", {"4.4.10",
[{add_module,emqx_rule_engine_jwt}, [{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_worker},
@ -213,13 +213,13 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.4.11",[ [{"4.4.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {"4.4.11",
{apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}}, [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.10", {"4.4.10",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}}, {apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}},

View File

@ -698,6 +698,7 @@ init_resource(Module, OnCreate, ResId, Config) ->
ResParams = #resource_params{id = ResId, ResParams = #resource_params{id = ResId,
params = Params, params = Params,
status = #{is_alive => true}}, status = #{is_alive => true}},
maybe_resource_down(ResId, clear),
emqx_rule_registry:add_resource_params(ResParams). emqx_rule_registry:add_resource_params(ResParams).
init_resource_with_retrier(Module, OnCreate, ResId, Config) -> init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
@ -706,6 +707,7 @@ init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
ResParams = #resource_params{id = ResId, ResParams = #resource_params{id = ResId,
params = Params, params = Params,
status = #{is_alive => true}}, status = #{is_alive => true}},
maybe_resource_down(ResId, clear),
emqx_rule_registry:add_resource_params(ResParams). emqx_rule_registry:add_resource_params(ResParams).
init_action(Module, OnCreate, ActionInstId, Params) -> init_action(Module, OnCreate, ActionInstId, Params) ->
@ -726,12 +728,10 @@ init_action(Module, OnCreate, ActionInstId, Params) ->
end. end.
clear_resource(_Module, undefined, ResId, Type) -> clear_resource(_Module, undefined, ResId, Type) ->
Name = alarm_name_of_resource_down(Type, ResId), clear_resource_down(ResId, Type),
_ = emqx_alarm:deactivate(Name),
ok = emqx_rule_registry:remove_resource_params(ResId); ok = emqx_rule_registry:remove_resource_params(ResId);
clear_resource(Module, Destroy, ResId, Type) -> clear_resource(Module, Destroy, ResId, Type) ->
Name = alarm_name_of_resource_down(Type, ResId), clear_resource_down(ResId, Type),
_ = emqx_alarm:deactivate(Name),
case emqx_rule_registry:find_resource_params(ResId) of case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource_params{params = Params}} -> {ok, #resource_params{params = Params}} ->
?RAISE(Module:Destroy(ResId, Params), ?RAISE(Module:Destroy(ResId, Params),
@ -779,14 +779,10 @@ fetch_resource_status(Module, OnStatus, ResId) ->
case Module:OnStatus(ResId, Params) of case Module:OnStatus(ResId, Params) of
#{is_alive := LastIsAlive} = Status -> Status; #{is_alive := LastIsAlive} = Status -> Status;
#{is_alive := true} = Status -> #{is_alive := true} = Status ->
{ok, Type} = find_type(ResId), maybe_resource_down(ResId, clear),
Name = alarm_name_of_resource_down(Type, ResId),
emqx_alarm:deactivate(Name),
Status; Status;
#{is_alive := false} = Status -> #{is_alive := false} = Status ->
{ok, Type} = find_type(ResId), maybe_resource_down(ResId, alarm),
Name = alarm_name_of_resource_down(Type, ResId),
emqx_alarm:activate(Name, #{id => ResId, type => Type}),
Status Status
end end
catch _Error:Reason:STrace -> catch _Error:Reason:STrace ->
@ -824,9 +820,23 @@ refresh_actions(Actions, Pred) ->
end end
end, Actions). end, Actions).
find_type(ResId) -> maybe_resource_down(ResId, AlarmOrClear) ->
{ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId), case emqx_rule_registry:find_resource(ResId) of
{ok, Type}. {ok, #resource{type = Type}} ->
_ = case AlarmOrClear of
alarm -> alarm_resource_down(ResId, Type);
clear -> clear_resource_down(ResId, Type)
end,
ok;
not_found ->
ok
end.
alarm_resource_down(ResId, Type) ->
emqx_alarm:activate(alarm_name_of_resource_down(Type, ResId),
#{id => ResId, type => Type}).
clear_resource_down(ResId, Type) ->
emqx_alarm:deactivate(alarm_name_of_resource_down(Type, ResId)).
alarm_name_of_resource_down(Type, ResId) -> alarm_name_of_resource_down(Type, ResId) ->
unicode:characters_to_binary(io_lib:format("resource/~ts/~ts/down", [Type, ResId])). unicode:characters_to_binary(io_lib:format("resource/~ts/~ts/down", [Type, ResId])).

View File

@ -89,13 +89,17 @@ end_per_testcase(_, Config) ->
Config. Config.
common_init_per_testcase() -> common_init_per_testcase() ->
AlarmOpts = [{actions, [log, publish]}, {size_limit, 1000}, {validity_period, 86400}],
{ok, _} = emqx_alarm:start_link(AlarmOpts),
{ok, _} = emqx_rule_monitor:start_link(). {ok, _} = emqx_rule_monitor:start_link().
common_end_per_testcases() -> common_end_per_testcases() ->
ok = emqx_alarm:stop(),
emqx_rule_monitor:erase_retry_interval(), emqx_rule_monitor:erase_retry_interval(),
emqx_rule_monitor:stop(). emqx_rule_monitor:stop().
t_restart_resource(_) -> t_restart_resource(_) ->
ct:pal("emqx_alarm: ~p", [sys:get_state(whereis(emqx_alarm))]),
ok = emqx_rule_registry:register_resource_types( ok = emqx_rule_registry:register_resource_types(
[#resource_type{ [#resource_type{
name = test_res_1, name = test_res_1,

View File

@ -21,7 +21,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(PORT, 9876). -define(PORT, 29876).
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).

2
build
View File

@ -69,6 +69,8 @@ make_rel() {
relup_db() { relup_db() {
case "$PROFILE" in case "$PROFILE" in
*edge*)
;;
*-ee*) *-ee*)
./scripts/relup-base-vsns.escript "$@" ./data/relup-paths-ee.eterm ./scripts/relup-base-vsns.escript "$@" ./data/relup-paths-ee.eterm
;; ;;

View File

@ -25,3 +25,14 @@
- Fixed EMQX Helm Chart can not set JSON type value for EMQX Broker configuration items [#9504](https://github.com/emqx/emqx/pull/9504). - Fixed EMQX Helm Chart can not set JSON type value for EMQX Broker configuration items [#9504](https://github.com/emqx/emqx/pull/9504).
- When resource creation is too slow, there may be some temporary probing connections left [#9539](https://github.com/emqx/emqx/pull/9539). - When resource creation is too slow, there may be some temporary probing connections left [#9539](https://github.com/emqx/emqx/pull/9539).
- After a reconnect, the unacknowledged QoS1/QoS2 messages in non-clean session were not retransmitted periodically as before the reconnect [#9627](https://github.com/emqx/emqx/pull/9627).
The configuration `zone.<zone-name>.retry_interval` specifies the retransmission interval of
unacknowledged QoS1/QoS2 messages (defaults to 30s).
Prior to this fix, unacknowledged messages buffered in the session are re-sent only once after session take-over, but not retried at configured interval.
- The expired 'awaiting_rel' queue is not cleared after persistent session MQTT client disconnected [#9627](https://github.com/emqx/emqx/pull/9627).
Before this change, if the 'awaiting_rel' queue is full when the MQTT client reconnect
to the broker and publish a QoS2 message, the client will get disconnected by the broker
with reason code RC_RECEIVE_MAXIMUM_EXCEEDED(0x93), even if the packet IDs in the 'awaiting_rel'
queue have already expired.

View File

@ -26,3 +26,11 @@
- 修复 EMQX Helm Chart 无法配置 value 为 JSON 类型的 EMQX Broker 配置项 [#9504](https://github.com/emqx/emqx/pull/9504)。 - 修复 EMQX Helm Chart 无法配置 value 为 JSON 类型的 EMQX Broker 配置项 [#9504](https://github.com/emqx/emqx/pull/9504)。
- 当创建资源过慢的情况下,有可能会残留一些用来探活的临时的连接 [#9539](https://github.com/emqx/emqx/pull/9539)。 - 当创建资源过慢的情况下,有可能会残留一些用来探活的临时的连接 [#9539](https://github.com/emqx/emqx/pull/9539)。
- 持久会话的 MQTT 客户端重新连接 emqx 之后,未被确认过的 QoS1/QoS2 消息不再周期性重发 [#9627](https://github.com/emqx/emqx/pull/9627)。
`zone.<zone-name>.retry_interval` 配置指定了没有被确认过的 QoS1/QoS2 消息的重发间隔,(默认为 30s)。在这个修复之前,
当持久会话的 MQTT 客户端重新连接 emqx 之后emqx 会将队列中缓存的未被确认过的消息重发一次,但是不会按配置的时间间隔重试。
- 持久会话的 MQTT 客户端断连之后,已经过期的 'awaiting_rel' 队列没有清除 [#9627](https://github.com/emqx/emqx/pull/9627)。
在这个改动之前,在客户端重连并且发布 QoS2 消息的时候,如果 'awaiting_rel' 队列已满,此客户端会被服务器以
RC_RECEIVE_MAXIMUM_EXCEEDED(0x93) 错误码断开连接,即使这时候 'awaiting_rel' 队列里面的报文 ID 已经过期了。

View File

@ -51,6 +51,12 @@
<<"4.4.5">>,<<"4.4.6">>,<<"4.4.7">>,<<"4.4.8">>,<<"4.4.9">>, <<"4.4.5">>,<<"4.4.6">>,<<"4.4.7">>,<<"4.4.8">>,<<"4.4.9">>,
<<"4.4.10">>,<<"4.4.11">>], <<"4.4.10">>,<<"4.4.11">>],
otp => <<"24.3.4.2-1">>}}. otp => <<"24.3.4.2-1">>}}.
{<<"4.4.13">>,
#{from_versions =>
[<<"4.4.0">>,<<"4.4.1">>,<<"4.4.2">>,<<"4.4.3">>,<<"4.4.4">>,
<<"4.4.5">>,<<"4.4.6">>,<<"4.4.7">>,<<"4.4.8">>,<<"4.4.9">>,
<<"4.4.10">>,<<"4.4.11">>,<<"4.4.12">>],
otp => <<"24.3.4.2-1">>}}.
{<<"4.5.0">>, {<<"4.5.0">>,
#{from_versions => [<<"4.4.8">>,<<"4.4.9">>,<<"4.4.10">>, #{from_versions => [<<"4.4.8">>,<<"4.4.9">>,<<"4.4.10">>,
<<"4.4.11">>], <<"4.4.11">>],

View File

@ -13,8 +13,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes # This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version. # to the chart and its templates, including the app version.
version: 4.4.12 version: 4.4.13
# This is the version number of the application being deployed. This version number should be # This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. # incremented each time you make changes to the application.
appVersion: 4.4.12 appVersion: 4.4.13

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE). -ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.12-alpha.2"}). -define(EMQX_RELEASE, {opensource, "4.4.13-alpha.1"}).
-else. -else.

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard, {application, emqx_dashboard,
[{description, "EMQX Web Dashboard"}, [{description, "EMQX Web Dashboard"},
{vsn, "4.4.11"}, % strict semver, bump manually! {vsn, "4.4.12"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_dashboard_sup]}, {registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest]}, {applications, [kernel,stdlib,mnesia,minirest]},

View File

@ -60,9 +60,9 @@
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"} , {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.3"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.15"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.17"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
]}. ]}.

View File

@ -21,9 +21,16 @@ REL_DIR="${1}"
PKSC12_FILE="$HOME/developer-id-application.p12" PKSC12_FILE="$HOME/developer-id-application.p12"
base64 --decode > "${PKSC12_FILE}" <<<"${APPLE_DEVELOPER_ID_BUNDLE}" base64 --decode > "${PKSC12_FILE}" <<<"${APPLE_DEVELOPER_ID_BUNDLE}"
KEYCHAIN='emqx.keychain-db' KEYCHAIN="emqx-$(date +%s).keychain-db"
KEYCHAIN_PASSWORD="$(openssl rand -base64 32)" KEYCHAIN_PASSWORD="$(openssl rand -base64 32)"
trap cleanup EXIT
function cleanup {
set +e
security delete-keychain "${KEYCHAIN}" 2>/dev/null
}
security create-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}" security create-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}"
security set-keychain-settings -lut 21600 "${KEYCHAIN}" security set-keychain-settings -lut 21600 "${KEYCHAIN}"
security unlock-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}" security unlock-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}"
@ -64,3 +71,5 @@ for f in \
; do ; do
find "${REL_DIR}"/lib/ -name "$f" -exec codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime {} \; find "${REL_DIR}"/lib/ -name "$f" -exec codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime {} \;
done done
cleanup

View File

@ -46,6 +46,10 @@ case "${EDITION}" in
GIT_TAG_PREFIX="e" GIT_TAG_PREFIX="e"
RELUP_PATH_FILE="./data/relup-paths-ee.eterm" RELUP_PATH_FILE="./data/relup-paths-ee.eterm"
;; ;;
*edge*)
# no relup for emqx-edge
exit 0
;;
*) *)
GIT_TAG_PREFIX="v" GIT_TAG_PREFIX="v"
RELUP_PATH_FILE="./data/relup-paths.eterm" RELUP_PATH_FILE="./data/relup-paths.eterm"

View File

@ -6,7 +6,7 @@
%% the emqx `release' version, which in turn is comprised of several %% the emqx `release' version, which in turn is comprised of several
%% apps, one of which is this. See `emqx_release.hrl' for more %% apps, one of which is this. See `emqx_release.hrl' for more
%% info. %% info.
{vsn, "4.4.12"}, % strict semver, bump manually! {vsn, "4.4.13"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ kernel {applications, [ kernel

View File

@ -1,7 +1,12 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.4.11", [{"4.4.12",
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
@ -27,7 +32,8 @@
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.9", {"4.4.9",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -52,7 +58,8 @@
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.8", {"4.4.8",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -78,7 +85,8 @@
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.7", {"4.4.7",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -104,7 +112,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.6", {"4.4.6",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -130,7 +139,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.5", {"4.4.5",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -158,7 +168,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.4", {"4.4.4",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -193,7 +204,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.3", {"4.4.3",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -235,7 +247,8 @@
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}, {load_module,emqx_relup},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.2", {"4.4.2",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -278,7 +291,8 @@
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}, {load_module,emqx_relup},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.1", {"4.4.1",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -326,7 +340,8 @@
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{add_module,emqx_relup}, {add_module,emqx_relup},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.0", {"4.4.0",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
{add_module,emqx_ocsp_cache}, {add_module,emqx_ocsp_cache},
@ -376,9 +391,15 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.4.11", [{"4.4.12",
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{load_module,emqx_relup,brutal_purge,soft_purge,[]}, [{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},

View File

@ -56,9 +56,12 @@
, clear_keepalive/1 , clear_keepalive/1
]). ]).
%% Exports for CT
-export([set_field/3]). -export([set_field/3]).
-ifdef(TEST).
-export([ensure_timer/3]).
-endif.
-import(emqx_misc, -import(emqx_misc,
[ run_fold/3 [ run_fold/3
, pipeline/3 , pipeline/3
@ -622,20 +625,20 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
NChannel = ensure_quota(PubRes, Channel), NChannel = ensure_quota(PubRes, Channel),
handle_out(puback, {PacketId, RC}, NChannel); handle_out(puback, {PacketId, RC}, NChannel);
do_publish(PacketId, Msg = #message{qos = ?QOS_2}, do_publish(PacketId, Msg = #message{qos = ?QOS_2}, Channel0) ->
Channel = #channel{session = Session}) -> #channel{session = Session} = NChannel = maybe_clean_expired_awaiting_rel(Channel0),
case emqx_session:publish(PacketId, Msg, Session) of case emqx_session:publish(PacketId, Msg, Session) of
{ok, PubRes, NSession} -> {ok, PubRes, NSession} ->
RC = puback_reason_code(PubRes), RC = puback_reason_code(PubRes),
NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}), NChannel1 = ensure_timer(await_timer, NChannel#channel{session = NSession}),
NChannel2 = ensure_quota(PubRes, NChannel1), NChannel2 = ensure_quota(PubRes, NChannel1),
handle_out(pubrec, {PacketId, RC}, NChannel2); handle_out(pubrec, {PacketId, RC}, NChannel2);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = emqx_metrics:inc('packets.publish.inuse'), ok = emqx_metrics:inc('packets.publish.inuse'),
handle_out(pubrec, {PacketId, RC}, Channel); handle_out(pubrec, {PacketId, RC}, NChannel);
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
ok = emqx_metrics:inc('packets.publish.dropped'), ok = emqx_metrics:inc('packets.publish.dropped'),
handle_out(disconnect, RC, Channel) handle_out(disconnect, RC, NChannel)
end. end.
ensure_quota(_, Channel = #channel{quota = undefined}) -> ensure_quota(_, Channel = #channel{quota = undefined}) ->
@ -841,7 +844,6 @@ handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = Conn
[ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)],
AckProps AckProps
), ),
return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps), return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
ensure_keepalive(NAckProps, Channel)); ensure_keepalive(NAckProps, Channel));
@ -923,7 +925,7 @@ return_connack(AckPacket, Channel) ->
}, },
{Packets, NChannel1} = do_deliver(Publishes, NChannel), {Packets, NChannel1} = do_deliver(Publishes, NChannel),
Outgoing = [{outgoing, Packets} || length(Packets) > 0], Outgoing = [{outgoing, Packets} || length(Packets) > 0],
{ok, Replies ++ Outgoing, NChannel1} {ok, Replies ++ Outgoing, ensure_timer(retry_timer, NChannel1)}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1127,17 +1129,8 @@ handle_timeout(_TRef, retry_delivery,
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
end; end;
handle_timeout(_TRef, expire_awaiting_rel, handle_timeout(_TRef, expire_awaiting_rel, Channel) ->
Channel = #channel{conn_state = disconnected}) -> {ok, clean_expired_awaiting_rel(Channel)};
{ok, Channel};
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{session = Session}) ->
case emqx_session:expire(awaiting_rel, Session) of
{ok, NSession} ->
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
{ok, Timeout, NSession} ->
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
end;
handle_timeout(_TRef, expire_session, Channel) -> handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel); shutdown(expired, Channel);
@ -1182,6 +1175,26 @@ reset_timer(Name, Time, Channel) ->
clean_timer(Name, Channel = #channel{timers = Timers}) -> clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. Channel#channel{timers = maps:remove(Name, Timers)}.
is_timer_alive(Name, #channel{timers = Timers}) ->
case maps:find(Name, Timers) of
error -> false;
{ok, _TRef} -> true
end.
maybe_clean_expired_awaiting_rel(Channel) ->
case is_timer_alive(await_timer, Channel) of
true -> Channel;
false -> clean_expired_awaiting_rel(Channel)
end.
clean_expired_awaiting_rel(Channel = #channel{session = Session}) ->
case emqx_session:expire(awaiting_rel, Session) of
{ok, NSession} ->
clean_timer(await_timer, Channel#channel{session = NSession});
{ok, Timeout, NSession} ->
reset_timer(await_timer, Timeout, Channel#channel{session = NSession})
end.
-spec interval(channel_timer(), channel()) -> timeout(). -spec interval(channel_timer(), channel()) -> timeout().
interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(interval, KeepAlive);
@ -1878,10 +1891,6 @@ is_disconnect_event_enabled(discarded) ->
is_disconnect_event_enabled(takeovered) -> is_disconnect_event_enabled(takeovered) ->
emqx:get_env(client_disconnect_takeovered, false). emqx:get_env(client_disconnect_takeovered, false).
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------
set_field(Name, Value, Channel) -> set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)), Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
setelement(Pos+1, Channel, Value). setelement(Pos+1, Channel, Value).

View File

@ -558,7 +558,8 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
clean_down({ChanPid, ClientId}) -> clean_down({ChanPid, ClientId}) ->
do_unregister_channel({ClientId, ChanPid}). do_unregister_channel({ClientId, ChanPid}),
?tp(debug, emqx_cm_clean_down, #{client_id => ClientId}).
stats_fun() -> stats_fun() ->
lists:foreach(fun update_stats/1, ?CHAN_STATS). lists:foreach(fun update_stats/1, ?CHAN_STATS).

View File

@ -31,12 +31,14 @@
post_release_upgrade(FromRelVsn, _) -> post_release_upgrade(FromRelVsn, _) ->
{_, CurrRelVsn} = ?EMQX_RELEASE, {_, CurrRelVsn} = ?EMQX_RELEASE,
?INFO("emqx has been upgraded from ~s to ~s!", [FromRelVsn, CurrRelVsn]), ?INFO("emqx has been upgraded from ~s to ~s!", [FromRelVsn, CurrRelVsn]),
maybe_refresh_jwt_module(FromRelVsn),
reload_components(). reload_components().
%% What to do after downgraded to an old release vsn. %% What to do after downgraded to an old release vsn.
post_release_downgrade(ToRelVsn, _) -> post_release_downgrade(ToRelVsn, _) ->
{_, CurrRelVsn} = ?EMQX_RELEASE, {_, CurrRelVsn} = ?EMQX_RELEASE,
?INFO("emqx has been downgraded from ~s to ~s!", [CurrRelVsn, ToRelVsn]), ?INFO("emqx has been downgraded from ~s to ~s!", [CurrRelVsn, ToRelVsn]),
maybe_refresh_jwt_module(ToRelVsn),
reload_components(). reload_components().
-ifdef(EMQX_ENTERPRISE). -ifdef(EMQX_ENTERPRISE).
@ -73,3 +75,21 @@ load_plugins() ->
true -> emqx_plugins:force_load(); true -> emqx_plugins:force_load();
false -> emqx_plugins:load() false -> emqx_plugins:load()
end. end.
-ifdef(EMQX_ENTERPRISE).
maybe_refresh_jwt_module(Release) when Release =:= "4.4.0"
orelse Release =:= "4.4.1"
orelse Release =:= "4.4.2"
orelse Release =:= "4.4.3" ->
_ = emqx:unhook('client.authenticate', fun emqx_auth_jwt:check/3),
_ = emqx:unhook('client.authenticate', fun emqx_auth_jwt:check_auth/3),
emqx_modules:refresh_module(jwt_authentication);
maybe_refresh_jwt_module(_) ->
ok.
-else.
maybe_refresh_jwt_module(_) ->
ok.
-endif.

View File

@ -205,7 +205,8 @@ t_handle_in_qos2_publish_with_error_return(_) ->
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}), Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
Channel = channel(#{conn_state => connected, session => Session}), Channel = emqx_channel:ensure_timer(await_timer, 2000,
channel(#{conn_state => connected, session => Session})),
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} = {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
emqx_channel:handle_in(Publish1, Channel), emqx_channel:handle_in(Publish1, Channel),

View File

@ -41,12 +41,24 @@
<<"TopicA/#">> <<"TopicA/#">>
]). ]).
-define(do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
receive
{tcp, SOCKET, PACKET} -> EXPRESS
after TIMEOUT ->
ct:fail({receive_timeout, TIMEOUT})
end).
-define(receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
fun() ->
?do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS)
end()).
all() -> all() ->
[{group, mqttv3}, [{group, mqttv3},
{group, mqttv4}, {group, mqttv4},
{group, mqttv5}, {group, mqttv5},
{group, others} {group, others},
{group, bugfixes}
]. ].
groups() -> groups() ->
@ -73,6 +85,10 @@ groups() ->
t_certcn_as_clientid_default_config_tls, t_certcn_as_clientid_default_config_tls,
t_certcn_as_clientid_tlsv1_3, t_certcn_as_clientid_tlsv1_3,
t_certcn_as_clientid_tlsv1_2 t_certcn_as_clientid_tlsv1_2
]},
{bugfixes, [non_parallel_tests],
[ t_qos2_no_pubrel_received
, t_retry_timer_after_session_taken_over
]} ]}
]. ].
@ -84,6 +100,18 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
init_per_testcase(Func, Cfg) ->
maybe_run_fun(setup, Func, Cfg).
end_per_testcase(Func, Cfg) ->
maybe_run_fun(teardown, Func, Cfg).
maybe_run_fun(Tag, Func, Cfg) ->
try ?MODULE:Func(Tag, Cfg)
catch
error:undef -> Cfg
end.
set_special_confs(emqx) -> set_special_confs(emqx) ->
emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]); emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]);
set_special_confs(_) -> set_special_confs(_) ->
@ -297,12 +325,193 @@ t_certcn_as_clientid_tlsv1_3(_) ->
t_certcn_as_clientid_tlsv1_2(_) -> t_certcn_as_clientid_tlsv1_2(_) ->
tls_certcn_as_clientid('tlsv1.2'). tls_certcn_as_clientid('tlsv1.2').
t_qos2_no_pubrel_received(setup, Cfg) ->
OldMaxAwaitRel = emqx_zone:get_env(external, max_awaiting_rel),
OldWaitTimeout = emqx_zone:get_env(external, await_rel_timeout),
emqx_zone:set_env(external, max_awaiting_rel, 2),
emqx_zone:set_env(external, await_rel_timeout, 1),
[{old_max_await_rel, OldMaxAwaitRel},
{old_wait_timeout, OldWaitTimeout} | Cfg];
t_qos2_no_pubrel_received(teardown, Cfg) ->
OldMaxAwaitRel = ?config(old_max_await_rel, Cfg),
OldWaitTimeout = ?config(old_wait_timeout, Cfg),
emqx_zone:set_env(external, max_awaiting_rel, OldMaxAwaitRel),
emqx_zone:set_env(external, await_rel_timeout, OldWaitTimeout).
t_qos2_no_pubrel_received(_) ->
%% [The Scenario]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client --- PUB: QoS2 ---> EMQX
%% Client --- PUB: QoS2 ---> EMQX
%% Client --- ... ---> EMQX
%% Client <---- PUBREC ---- EMQX
%% Client <---- PUBREC ---- EMQX
%% Client <---- ... ---- EMQX
%% Client --- PUBREL --X--> EMQX (PUBREL not received)
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX
%%
%% [A few hours later..]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client --- PUB: QoS2 ---> EMQX
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX (we should clear the awaiting_rel queue but it is still full).
ct:pal("1. reconnect after awaiting_rel is cleared"),
qos2_no_pubrel_received(fun
(1) -> timer:sleep(1500); %% reconnect 1.5s later, ensure the await_rel_timeout triggered
(2) -> ok
end),
ct:pal("2. reconnect before awaiting_rel is cleared"),
qos2_no_pubrel_received(fun
(1) -> ok; %% reconnect as fast as possiable, ensure the await_rel_timeout NOT triggered
(2) -> timer:sleep(1500) %% send msgs 1.5s later, ensure the await_rel_timeout triggered
end).
qos2_no_pubrel_received(Actions) ->
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Topic = <<"t/foo">>,
{ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
gen_tcp:send(Sock, make_connect_packet(ClientId, false)),
timer:sleep(100),
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 100, <<"foo">>)),
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 101, <<"foo">>)),
%% Send the 3rd publish with id 103 will got disconnected:
%% - "Dropped the qos2 packet 103 due to awaiting_rel is full".
?assertMatch(ok, gen_tcp:send(Sock, make_publish_packet(Topic, 2, 103, <<"foo">>))),
%% not the connections should be closed by the server due to ?RC_RECEIVE_MAXIMUM_EXCEEDED
receive
{tcp_closed, Sock} -> ok
after 500 ->
ct:fail({wait_tcp_close_timeout, 500})
end,
%% before reconnecting
action_point(Actions, 1),
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
%% after connected and before sending any msgs
action_point(Actions, 2),
ok = gen_tcp:send(Sock1, make_publish_packet(Topic, 2, 104, <<"foo">>)),
receive
{tcp_closed, Sock1} ->
%% this is the bug, the wait_rel queue should have expired but not cleared
ct:fail(unexpected_disconnect)
after 500 -> ok
end,
gen_tcp:close(Sock1),
ok.
t_retry_timer_after_session_taken_over(setup, Cfg) ->
%emqx_logger:set_log_level(debug),
OldRetryInterval = emqx_zone:get_env(external, retry_interval),
emqx_zone:set_env(external, retry_interval, 1),
[{old_retry_interval, OldRetryInterval} | Cfg];
t_retry_timer_after_session_taken_over(teardown, Cfg) ->
%emqx_logger:set_log_level(warning),
OldRetryInterval = ?config(old_retry_interval, Cfg),
emqx_zone:set_env(external, retry_interval, OldRetryInterval).
t_retry_timer_after_session_taken_over(_) ->
%% [The Scenario]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client <--- PUB: QoS1 --- EMQX
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
%% Client <---- PUB: QoS1 ---- EMQX (resend the PUBLISH msg)
%% Client --- DISCONN ---> EMQX
%% [A few seconds later..]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client <--- PUB: QoS1 --- EMQX (resume session and resend the inflight messages)
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
TcpOpts = [binary, {active, true}],
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Topic = <<"t/foo">>,
Payload = <<"DO NOT REPLY!">>,
%% CONNECT
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
ok = gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
?receive_tcp_packet(1000, Sock1, _, ok),
%% SUBSCRIBE
ok = gen_tcp:send(Sock1, make_subscribe_packet(Topic, 2, 1)),
?receive_tcp_packet(200, Sock1, _, ok),
emqx_broker:publish(emqx_message:make(<<"publisher">>, 1, Topic, Payload)),
%% note that here we don't reply the publish with puback
?receive_tcp_packet(200, Sock1, PubPacket1,
begin
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
emqx_frame:parse(PubPacket1))
end),
ok = gen_tcp:close(Sock1),
timer:sleep(100),
%% CONNECT again
{ok, Sock2} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
ConnAckRem = ?do_receive_tcp_packet(200, Sock2, ConnPack2, begin
ConnAck = iolist_to_binary(make_connack_packet(?CONNACK_ACCEPT, 1)),
ct:pal("--- connack: ~p, got: ~p", [ConnAck, ConnPack2]),
<<ConnAck:4/binary, Rem/binary>> = ConnPack2,
Rem
end),
%% emqx should resend the non-ACKed messages now
PubPacket2 = case ConnAckRem of
<<>> ->
?receive_tcp_packet(200, Sock2, Packet, Packet);
_ ->
ConnAckRem
end,
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
emqx_frame:parse(PubPacket2)),
%% ... and emqx should resend the message 1s later, as we didn't ACK it.
?receive_tcp_packet(1200, Sock2, Packet,
begin
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
emqx_frame:parse(Packet))
end),
%% we ACK it now, then emqx should stop the resending
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
receive
{tcp, Sock2, _PACKET} ->
{ok, MqttPacket, <<>>, _} = emqx_frame:parse(_PACKET),
?assertNotMatch(?PUBLISH_PACKET(_), MqttPacket)
after 1200 -> ok
end,
ok = gen_tcp:close(Sock1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
action_point(Action, N) ->
Action(N).
make_connect_packet(ClientId, CleanStart) ->
emqx_frame:serialize(?CONNECT_PACKET(
#mqtt_packet_connect{
proto_name = <<"MQTT">>,
clean_start = CleanStart,
keepalive = 0,
clientid = ClientId
}
)).
make_connack_packet(Code, SP) ->
emqx_frame:serialize(?CONNACK_PACKET(Code, SP)).
make_publish_packet(Topic, QoS, PacketId, Payload) ->
emqx_frame:serialize(
?PUBLISH_PACKET(QoS, Topic, PacketId, Payload)).
make_subscribe_packet(TopicFileter, QoS, PacketId) ->
emqx_frame:serialize(
?SUBSCRIBE_PACKET(PacketId, [{TopicFileter, #{rh => 0, rap => 0, nl => 0, qos => QoS}}])).
recv_msgs(Count) -> recv_msgs(Count) ->
recv_msgs(Count, []). recv_msgs(Count, []).