diff --git a/.github/actions/package-macos/action.yaml b/.github/actions/package-macos/action.yaml index 7e9a3cf9d..99b225576 100644 --- a/.github/actions/package-macos/action.yaml +++ b/.github/actions/package-macos/action.yaml @@ -55,10 +55,18 @@ runs: fi git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH" cd "$OTP_SOURCE_PATH" + if [ "$(arch)" = arm64 ]; then + export LDFLAGS="-L$(brew --prefix unixodbc)/lib" + export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include" + fi ./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH" make -j$(nproc) rm -rf "$OTP_INSTALL_PATH" make install + if [ "$(arch)" = arm64 ]; then + unset LDFLAGS + unset CC + fi - name: build env: HOMEBREW_NO_AUTO_UPDATE: 1 diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index edf1701be..839d83b96 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.4.5"}, + {vsn, "4.4.6"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index cc9ca9af7..b0be62eb5 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,7 +1,8 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.4", + [{"4.4.5",[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]}, + {"4.4.4", [{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]}, {"4.4.3", @@ -22,7 +23,8 @@ {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, {update,emqx_exhook_mngr,{advanced,["4.4.0"]}}]}, {<<".*">>,[]}], - [{"4.4.4", + [{"4.4.5",[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]}, + {"4.4.4", [{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}]}, {"4.4.3", diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 9996a2e46..8c1feb42f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.4.12"}, % strict semver, bump manually! + {vsn, "4.4.13"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]}, {applications, [kernel,stdlib,rulesql,getopt,jose]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 4b392eb01..c3273d701 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,13 +1,13 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.11",[ - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} - ]}, + [{"4.4.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {"4.4.11", + [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.10", [{add_module,emqx_rule_engine_jwt}, {add_module,emqx_rule_engine_jwt_worker}, @@ -213,13 +213,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.4.11",[ - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}}, - {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} - ]}, + [{"4.4.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {"4.4.11", + [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.10", [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f3e1651d4..6e8f00329 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -698,6 +698,7 @@ init_resource(Module, OnCreate, ResId, Config) -> ResParams = #resource_params{id = ResId, params = Params, status = #{is_alive => true}}, + maybe_resource_down(ResId, clear), emqx_rule_registry:add_resource_params(ResParams). init_resource_with_retrier(Module, OnCreate, ResId, Config) -> @@ -706,6 +707,7 @@ init_resource_with_retrier(Module, OnCreate, ResId, Config) -> ResParams = #resource_params{id = ResId, params = Params, status = #{is_alive => true}}, + maybe_resource_down(ResId, clear), emqx_rule_registry:add_resource_params(ResParams). init_action(Module, OnCreate, ActionInstId, Params) -> @@ -726,12 +728,10 @@ init_action(Module, OnCreate, ActionInstId, Params) -> end. clear_resource(_Module, undefined, ResId, Type) -> - Name = alarm_name_of_resource_down(Type, ResId), - _ = emqx_alarm:deactivate(Name), + clear_resource_down(ResId, Type), ok = emqx_rule_registry:remove_resource_params(ResId); clear_resource(Module, Destroy, ResId, Type) -> - Name = alarm_name_of_resource_down(Type, ResId), - _ = emqx_alarm:deactivate(Name), + clear_resource_down(ResId, Type), case emqx_rule_registry:find_resource_params(ResId) of {ok, #resource_params{params = Params}} -> ?RAISE(Module:Destroy(ResId, Params), @@ -779,14 +779,10 @@ fetch_resource_status(Module, OnStatus, ResId) -> case Module:OnStatus(ResId, Params) of #{is_alive := LastIsAlive} = Status -> Status; #{is_alive := true} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:deactivate(Name), + maybe_resource_down(ResId, clear), Status; #{is_alive := false} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:activate(Name, #{id => ResId, type => Type}), + maybe_resource_down(ResId, alarm), Status end catch _Error:Reason:STrace -> @@ -824,9 +820,23 @@ refresh_actions(Actions, Pred) -> end end, Actions). -find_type(ResId) -> - {ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId), - {ok, Type}. +maybe_resource_down(ResId, AlarmOrClear) -> + case emqx_rule_registry:find_resource(ResId) of + {ok, #resource{type = Type}} -> + _ = case AlarmOrClear of + alarm -> alarm_resource_down(ResId, Type); + clear -> clear_resource_down(ResId, Type) + end, + ok; + not_found -> + ok + end. + +alarm_resource_down(ResId, Type) -> + emqx_alarm:activate(alarm_name_of_resource_down(Type, ResId), + #{id => ResId, type => Type}). +clear_resource_down(ResId, Type) -> + emqx_alarm:deactivate(alarm_name_of_resource_down(Type, ResId)). alarm_name_of_resource_down(Type, ResId) -> unicode:characters_to_binary(io_lib:format("resource/~ts/~ts/down", [Type, ResId])). diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index fac76d235..b3a44ff34 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -89,13 +89,17 @@ end_per_testcase(_, Config) -> Config. common_init_per_testcase() -> + AlarmOpts = [{actions, [log, publish]}, {size_limit, 1000}, {validity_period, 86400}], + {ok, _} = emqx_alarm:start_link(AlarmOpts), {ok, _} = emqx_rule_monitor:start_link(). common_end_per_testcases() -> + ok = emqx_alarm:stop(), emqx_rule_monitor:erase_retry_interval(), emqx_rule_monitor:stop(). t_restart_resource(_) -> + ct:pal("emqx_alarm: ~p", [sys:get_state(whereis(emqx_alarm))]), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1, diff --git a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl index ae3f72254..c924b1d71 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl @@ -21,7 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). --define(PORT, 9876). +-define(PORT, 29876). all() -> emqx_ct:all(?MODULE). diff --git a/build b/build index d3c9164eb..9cc68e531 100755 --- a/build +++ b/build @@ -69,6 +69,8 @@ make_rel() { relup_db() { case "$PROFILE" in + *edge*) + ;; *-ee*) ./scripts/relup-base-vsns.escript "$@" ./data/relup-paths-ee.eterm ;; diff --git a/changes/v4.4.12-en.md b/changes/v4.4.12-en.md index 3df067609..0c1e02ce2 100644 --- a/changes/v4.4.12-en.md +++ b/changes/v4.4.12-en.md @@ -25,3 +25,14 @@ - Fixed EMQX Helm Chart can not set JSON type value for EMQX Broker configuration items [#9504](https://github.com/emqx/emqx/pull/9504). - When resource creation is too slow, there may be some temporary probing connections left [#9539](https://github.com/emqx/emqx/pull/9539). + +- After a reconnect, the unacknowledged QoS1/QoS2 messages in non-clean session were not retransmitted periodically as before the reconnect [#9627](https://github.com/emqx/emqx/pull/9627). + The configuration `zone..retry_interval` specifies the retransmission interval of + unacknowledged QoS1/QoS2 messages (defaults to 30s). + Prior to this fix, unacknowledged messages buffered in the session are re-sent only once after session take-over, but not retried at configured interval. + +- The expired 'awaiting_rel' queue is not cleared after persistent session MQTT client disconnected [#9627](https://github.com/emqx/emqx/pull/9627). + Before this change, if the 'awaiting_rel' queue is full when the MQTT client reconnect + to the broker and publish a QoS2 message, the client will get disconnected by the broker + with reason code RC_RECEIVE_MAXIMUM_EXCEEDED(0x93), even if the packet IDs in the 'awaiting_rel' + queue have already expired. diff --git a/changes/v4.4.12-zh.md b/changes/v4.4.12-zh.md index d986aca93..89463faef 100644 --- a/changes/v4.4.12-zh.md +++ b/changes/v4.4.12-zh.md @@ -26,3 +26,11 @@ - 修复 EMQX Helm Chart 无法配置 value 为 JSON 类型的 EMQX Broker 配置项 [#9504](https://github.com/emqx/emqx/pull/9504)。 - 当创建资源过慢的情况下,有可能会残留一些用来探活的临时的连接 [#9539](https://github.com/emqx/emqx/pull/9539)。 + +- 持久会话的 MQTT 客户端重新连接 emqx 之后,未被确认过的 QoS1/QoS2 消息不再周期性重发 [#9627](https://github.com/emqx/emqx/pull/9627)。 + `zone..retry_interval` 配置指定了没有被确认过的 QoS1/QoS2 消息的重发间隔,(默认为 30s)。在这个修复之前, + 当持久会话的 MQTT 客户端重新连接 emqx 之后,emqx 会将队列中缓存的未被确认过的消息重发一次,但是不会按配置的时间间隔重试。 + +- 持久会话的 MQTT 客户端断连之后,已经过期的 'awaiting_rel' 队列没有清除 [#9627](https://github.com/emqx/emqx/pull/9627)。 + 在这个改动之前,在客户端重连并且发布 QoS2 消息的时候,如果 'awaiting_rel' 队列已满,此客户端会被服务器以 + RC_RECEIVE_MAXIMUM_EXCEEDED(0x93) 错误码断开连接,即使这时候 'awaiting_rel' 队列里面的报文 ID 已经过期了。 diff --git a/data/relup-paths.eterm b/data/relup-paths.eterm index a1f7e912c..e03f4d839 100644 --- a/data/relup-paths.eterm +++ b/data/relup-paths.eterm @@ -51,6 +51,12 @@ <<"4.4.5">>,<<"4.4.6">>,<<"4.4.7">>,<<"4.4.8">>,<<"4.4.9">>, <<"4.4.10">>,<<"4.4.11">>], otp => <<"24.3.4.2-1">>}}. +{<<"4.4.13">>, + #{from_versions => + [<<"4.4.0">>,<<"4.4.1">>,<<"4.4.2">>,<<"4.4.3">>,<<"4.4.4">>, + <<"4.4.5">>,<<"4.4.6">>,<<"4.4.7">>,<<"4.4.8">>,<<"4.4.9">>, + <<"4.4.10">>,<<"4.4.11">>,<<"4.4.12">>], + otp => <<"24.3.4.2-1">>}}. {<<"4.5.0">>, #{from_versions => [<<"4.4.8">>,<<"4.4.9">>,<<"4.4.10">>, <<"4.4.11">>], diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index a4f2e1a6d..523793eef 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -13,8 +13,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 4.4.12 +version: 4.4.13 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 4.4.12 +appVersion: 4.4.13 diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 929adf705..250c76e71 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.4.12-alpha.2"}). +-define(EMQX_RELEASE, {opensource, "4.4.13-alpha.1"}). -else. diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src index c9c273344..38a5ee5a2 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src @@ -1,6 +1,6 @@ {application, emqx_dashboard, [{description, "EMQX Web Dashboard"}, - {vsn, "4.4.11"}, % strict semver, bump manually! + {vsn, "4.4.12"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel,stdlib,mnesia,minirest]}, diff --git a/rebar.config b/rebar.config index 64e9f2dec..fbb49ce9d 100644 --- a/rebar.config +++ b/rebar.config @@ -60,9 +60,9 @@ , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.3"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}} - , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.15"}}} + , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.17"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} ]}. diff --git a/scripts/macos-sign-binaries.sh b/scripts/macos-sign-binaries.sh index 135730694..b41c8088d 100755 --- a/scripts/macos-sign-binaries.sh +++ b/scripts/macos-sign-binaries.sh @@ -21,9 +21,16 @@ REL_DIR="${1}" PKSC12_FILE="$HOME/developer-id-application.p12" base64 --decode > "${PKSC12_FILE}" <<<"${APPLE_DEVELOPER_ID_BUNDLE}" -KEYCHAIN='emqx.keychain-db' +KEYCHAIN="emqx-$(date +%s).keychain-db" KEYCHAIN_PASSWORD="$(openssl rand -base64 32)" +trap cleanup EXIT + +function cleanup { + set +e + security delete-keychain "${KEYCHAIN}" 2>/dev/null +} + security create-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}" security set-keychain-settings -lut 21600 "${KEYCHAIN}" security unlock-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}" @@ -64,3 +71,5 @@ for f in \ ; do find "${REL_DIR}"/lib/ -name "$f" -exec codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime {} \; done + +cleanup diff --git a/scripts/relup-base-vsns.sh b/scripts/relup-base-vsns.sh index 1aa2b3288..ba19c7d05 100755 --- a/scripts/relup-base-vsns.sh +++ b/scripts/relup-base-vsns.sh @@ -46,6 +46,10 @@ case "${EDITION}" in GIT_TAG_PREFIX="e" RELUP_PATH_FILE="./data/relup-paths-ee.eterm" ;; + *edge*) + # no relup for emqx-edge + exit 0 + ;; *) GIT_TAG_PREFIX="v" RELUP_PATH_FILE="./data/relup-paths.eterm" diff --git a/src/emqx.app.src b/src/emqx.app.src index 47b9cd084..6d9aeb76f 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -6,7 +6,7 @@ %% the emqx `release' version, which in turn is comprised of several %% apps, one of which is this. See `emqx_release.hrl' for more %% info. - {vsn, "4.4.12"}, % strict semver, bump manually! + {vsn, "4.4.13"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [ kernel diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b41e751d6..4aada41b8 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,7 +1,12 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.11", + [{"4.4.12", + [{load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {"4.4.11", [{add_module,emqx_cover}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -27,7 +32,8 @@ {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.9", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -52,7 +58,8 @@ {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.8", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -78,7 +85,8 @@ {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.7", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -104,7 +112,8 @@ {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.6", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -130,7 +139,8 @@ {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.5", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -158,7 +168,8 @@ {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.4", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -193,7 +204,8 @@ {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.3", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -235,7 +247,8 @@ {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.2", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -278,7 +291,8 @@ {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.1", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -326,7 +340,8 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {add_module,emqx_relup}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.0", [{add_module,emqx_cover}, {add_module,emqx_ocsp_cache}, @@ -376,9 +391,15 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, - {apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]}, + {apply,{application,set_env, + [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {<<".*">>,[]}], - [{"4.4.11", + [{"4.4.12", + [{load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {"4.4.11", [{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 791433b37..daaec16e1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -56,9 +56,12 @@ , clear_keepalive/1 ]). -%% Exports for CT -export([set_field/3]). +-ifdef(TEST). +-export([ensure_timer/3]). +-endif. + -import(emqx_misc, [ run_fold/3 , pipeline/3 @@ -622,20 +625,20 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) -> NChannel = ensure_quota(PubRes, Channel), handle_out(puback, {PacketId, RC}, NChannel); -do_publish(PacketId, Msg = #message{qos = ?QOS_2}, - Channel = #channel{session = Session}) -> +do_publish(PacketId, Msg = #message{qos = ?QOS_2}, Channel0) -> + #channel{session = Session} = NChannel = maybe_clean_expired_awaiting_rel(Channel0), case emqx_session:publish(PacketId, Msg, Session) of {ok, PubRes, NSession} -> RC = puback_reason_code(PubRes), - NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}), + NChannel1 = ensure_timer(await_timer, NChannel#channel{session = NSession}), NChannel2 = ensure_quota(PubRes, NChannel1), handle_out(pubrec, {PacketId, RC}, NChannel2); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ok = emqx_metrics:inc('packets.publish.inuse'), - handle_out(pubrec, {PacketId, RC}, Channel); + handle_out(pubrec, {PacketId, RC}, NChannel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> ok = emqx_metrics:inc('packets.publish.dropped'), - handle_out(disconnect, RC, Channel) + handle_out(disconnect, RC, NChannel) end. ensure_quota(_, Channel = #channel{quota = undefined}) -> @@ -841,7 +844,6 @@ handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = Conn [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps ), - return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps), ensure_keepalive(NAckProps, Channel)); @@ -923,7 +925,7 @@ return_connack(AckPacket, Channel) -> }, {Packets, NChannel1} = do_deliver(Publishes, NChannel), Outgoing = [{outgoing, Packets} || length(Packets) > 0], - {ok, Replies ++ Outgoing, NChannel1} + {ok, Replies ++ Outgoing, ensure_timer(retry_timer, NChannel1)} end. %%-------------------------------------------------------------------- @@ -1127,17 +1129,8 @@ handle_timeout(_TRef, retry_delivery, handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) end; -handle_timeout(_TRef, expire_awaiting_rel, - Channel = #channel{conn_state = disconnected}) -> - {ok, Channel}; -handle_timeout(_TRef, expire_awaiting_rel, - Channel = #channel{session = Session}) -> - case emqx_session:expire(awaiting_rel, Session) of - {ok, NSession} -> - {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; - {ok, Timeout, NSession} -> - {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})} - end; +handle_timeout(_TRef, expire_awaiting_rel, Channel) -> + {ok, clean_expired_awaiting_rel(Channel)}; handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); @@ -1182,6 +1175,26 @@ reset_timer(Name, Time, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. +is_timer_alive(Name, #channel{timers = Timers}) -> + case maps:find(Name, Timers) of + error -> false; + {ok, _TRef} -> true + end. + +maybe_clean_expired_awaiting_rel(Channel) -> + case is_timer_alive(await_timer, Channel) of + true -> Channel; + false -> clean_expired_awaiting_rel(Channel) + end. + +clean_expired_awaiting_rel(Channel = #channel{session = Session}) -> + case emqx_session:expire(awaiting_rel, Session) of + {ok, NSession} -> + clean_timer(await_timer, Channel#channel{session = NSession}); + {ok, Timeout, NSession} -> + reset_timer(await_timer, Timeout, Channel#channel{session = NSession}) + end. + -spec interval(channel_timer(), channel()) -> timeout(). interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); @@ -1878,10 +1891,6 @@ is_disconnect_event_enabled(discarded) -> is_disconnect_event_enabled(takeovered) -> emqx:get_env(client_disconnect_takeovered, false). -%%-------------------------------------------------------------------- -%% For CT tests -%%-------------------------------------------------------------------- - set_field(Name, Value, Channel) -> Pos = emqx_misc:index_of(Name, record_info(fields, channel)), setelement(Pos+1, Channel, Value). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index f40e803e8..7983c0be5 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -558,7 +558,8 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- clean_down({ChanPid, ClientId}) -> - do_unregister_channel({ClientId, ChanPid}). + do_unregister_channel({ClientId, ChanPid}), + ?tp(debug, emqx_cm_clean_down, #{client_id => ClientId}). stats_fun() -> lists:foreach(fun update_stats/1, ?CHAN_STATS). diff --git a/src/emqx_relup.erl b/src/emqx_relup.erl index 182a60c7b..a85079753 100644 --- a/src/emqx_relup.erl +++ b/src/emqx_relup.erl @@ -31,12 +31,14 @@ post_release_upgrade(FromRelVsn, _) -> {_, CurrRelVsn} = ?EMQX_RELEASE, ?INFO("emqx has been upgraded from ~s to ~s!", [FromRelVsn, CurrRelVsn]), + maybe_refresh_jwt_module(FromRelVsn), reload_components(). %% What to do after downgraded to an old release vsn. post_release_downgrade(ToRelVsn, _) -> {_, CurrRelVsn} = ?EMQX_RELEASE, ?INFO("emqx has been downgraded from ~s to ~s!", [CurrRelVsn, ToRelVsn]), + maybe_refresh_jwt_module(ToRelVsn), reload_components(). -ifdef(EMQX_ENTERPRISE). @@ -73,3 +75,21 @@ load_plugins() -> true -> emqx_plugins:force_load(); false -> emqx_plugins:load() end. + +-ifdef(EMQX_ENTERPRISE). +maybe_refresh_jwt_module(Release) when Release =:= "4.4.0" + orelse Release =:= "4.4.1" + orelse Release =:= "4.4.2" + orelse Release =:= "4.4.3" -> + _ = emqx:unhook('client.authenticate', fun emqx_auth_jwt:check/3), + _ = emqx:unhook('client.authenticate', fun emqx_auth_jwt:check_auth/3), + emqx_modules:refresh_module(jwt_authentication); +maybe_refresh_jwt_module(_) -> + ok. + +-else. + +maybe_refresh_jwt_module(_) -> + ok. + +-endif. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 6ccbcc16b..211edc2b1 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -205,7 +205,8 @@ t_handle_in_qos2_publish_with_error_return(_) -> ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}), - Channel = channel(#{conn_state => connected, session => Session}), + Channel = emqx_channel:ensure_timer(await_timer, 2000, + channel(#{conn_state => connected, session => Session})), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} = emqx_channel:handle_in(Publish1, Channel), diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index d45e7a8cf..83fe09e13 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -41,12 +41,24 @@ <<"TopicA/#">> ]). +-define(do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS), + receive + {tcp, SOCKET, PACKET} -> EXPRESS + after TIMEOUT -> + ct:fail({receive_timeout, TIMEOUT}) + end). + +-define(receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS), + fun() -> + ?do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS) + end()). all() -> [{group, mqttv3}, {group, mqttv4}, {group, mqttv5}, - {group, others} + {group, others}, + {group, bugfixes} ]. groups() -> @@ -73,6 +85,10 @@ groups() -> t_certcn_as_clientid_default_config_tls, t_certcn_as_clientid_tlsv1_3, t_certcn_as_clientid_tlsv1_2 + ]}, + {bugfixes, [non_parallel_tests], + [ t_qos2_no_pubrel_received + , t_retry_timer_after_session_taken_over ]} ]. @@ -84,6 +100,18 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +init_per_testcase(Func, Cfg) -> + maybe_run_fun(setup, Func, Cfg). + +end_per_testcase(Func, Cfg) -> + maybe_run_fun(teardown, Func, Cfg). + +maybe_run_fun(Tag, Func, Cfg) -> + try ?MODULE:Func(Tag, Cfg) + catch + error:undef -> Cfg + end. + set_special_confs(emqx) -> emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]); set_special_confs(_) -> @@ -297,12 +325,193 @@ t_certcn_as_clientid_tlsv1_3(_) -> t_certcn_as_clientid_tlsv1_2(_) -> tls_certcn_as_clientid('tlsv1.2'). +t_qos2_no_pubrel_received(setup, Cfg) -> + OldMaxAwaitRel = emqx_zone:get_env(external, max_awaiting_rel), + OldWaitTimeout = emqx_zone:get_env(external, await_rel_timeout), + emqx_zone:set_env(external, max_awaiting_rel, 2), + emqx_zone:set_env(external, await_rel_timeout, 1), + [{old_max_await_rel, OldMaxAwaitRel}, + {old_wait_timeout, OldWaitTimeout} | Cfg]; +t_qos2_no_pubrel_received(teardown, Cfg) -> + OldMaxAwaitRel = ?config(old_max_await_rel, Cfg), + OldWaitTimeout = ?config(old_wait_timeout, Cfg), + emqx_zone:set_env(external, max_awaiting_rel, OldMaxAwaitRel), + emqx_zone:set_env(external, await_rel_timeout, OldWaitTimeout). +t_qos2_no_pubrel_received(_) -> + %% [The Scenario]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client --- PUB: QoS2 ---> EMQX + %% Client --- PUB: QoS2 ---> EMQX + %% Client --- ... ---> EMQX + %% Client <---- PUBREC ---- EMQX + %% Client <---- PUBREC ---- EMQX + %% Client <---- ... ---- EMQX + %% Client --- PUBREL --X--> EMQX (PUBREL not received) + %% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX + %% + %% [A few hours later..]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client --- PUB: QoS2 ---> EMQX + %% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX (we should clear the awaiting_rel queue but it is still full). + ct:pal("1. reconnect after awaiting_rel is cleared"), + qos2_no_pubrel_received(fun + (1) -> timer:sleep(1500); %% reconnect 1.5s later, ensure the await_rel_timeout triggered + (2) -> ok + end), + ct:pal("2. reconnect before awaiting_rel is cleared"), + qos2_no_pubrel_received(fun + (1) -> ok; %% reconnect as fast as possiable, ensure the await_rel_timeout NOT triggered + (2) -> timer:sleep(1500) %% send msgs 1.5s later, ensure the await_rel_timeout triggered + end). +qos2_no_pubrel_received(Actions) -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Topic = <<"t/foo">>, + {ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [], 3000), + gen_tcp:send(Sock, make_connect_packet(ClientId, false)), + timer:sleep(100), + ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 100, <<"foo">>)), + ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 101, <<"foo">>)), + %% Send the 3rd publish with id 103 will got disconnected: + %% - "Dropped the qos2 packet 103 due to awaiting_rel is full". + ?assertMatch(ok, gen_tcp:send(Sock, make_publish_packet(Topic, 2, 103, <<"foo">>))), + %% not the connections should be closed by the server due to ?RC_RECEIVE_MAXIMUM_EXCEEDED + receive + {tcp_closed, Sock} -> ok + after 500 -> + ct:fail({wait_tcp_close_timeout, 500}) + end, + + %% before reconnecting + action_point(Actions, 1), + + {ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, [], 3000), + gen_tcp:send(Sock1, make_connect_packet(ClientId, false)), + + %% after connected and before sending any msgs + action_point(Actions, 2), + + ok = gen_tcp:send(Sock1, make_publish_packet(Topic, 2, 104, <<"foo">>)), + + receive + {tcp_closed, Sock1} -> + %% this is the bug, the wait_rel queue should have expired but not cleared + ct:fail(unexpected_disconnect) + after 500 -> ok + end, + gen_tcp:close(Sock1), + ok. + +t_retry_timer_after_session_taken_over(setup, Cfg) -> + %emqx_logger:set_log_level(debug), + OldRetryInterval = emqx_zone:get_env(external, retry_interval), + emqx_zone:set_env(external, retry_interval, 1), + [{old_retry_interval, OldRetryInterval} | Cfg]; +t_retry_timer_after_session_taken_over(teardown, Cfg) -> + %emqx_logger:set_log_level(warning), + OldRetryInterval = ?config(old_retry_interval, Cfg), + emqx_zone:set_env(external, retry_interval, OldRetryInterval). +t_retry_timer_after_session_taken_over(_) -> + %% [The Scenario]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client <--- PUB: QoS1 --- EMQX + %% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK) + %% Client <---- PUB: QoS1 ---- EMQX (resend the PUBLISH msg) + %% Client --- DISCONN ---> EMQX + %% [A few seconds later..]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client <--- PUB: QoS1 --- EMQX (resume session and resend the inflight messages) + %% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK) + TcpOpts = [binary, {active, true}], + + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Topic = <<"t/foo">>, + Payload = <<"DO NOT REPLY!">>, + + %% CONNECT + {ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000), + ok = gen_tcp:send(Sock1, make_connect_packet(ClientId, false)), + ?receive_tcp_packet(1000, Sock1, _, ok), + + %% SUBSCRIBE + ok = gen_tcp:send(Sock1, make_subscribe_packet(Topic, 2, 1)), + ?receive_tcp_packet(200, Sock1, _, ok), + + emqx_broker:publish(emqx_message:make(<<"publisher">>, 1, Topic, Payload)), + %% note that here we don't reply the publish with puback + ?receive_tcp_packet(200, Sock1, PubPacket1, + begin + ?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _}, + emqx_frame:parse(PubPacket1)) + end), + + ok = gen_tcp:close(Sock1), + timer:sleep(100), + + %% CONNECT again + {ok, Sock2} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000), + ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)), + ConnAckRem = ?do_receive_tcp_packet(200, Sock2, ConnPack2, begin + ConnAck = iolist_to_binary(make_connack_packet(?CONNACK_ACCEPT, 1)), + ct:pal("--- connack: ~p, got: ~p", [ConnAck, ConnPack2]), + <> = ConnPack2, + Rem + end), + + %% emqx should resend the non-ACKed messages now + PubPacket2 = case ConnAckRem of + <<>> -> + ?receive_tcp_packet(200, Sock2, Packet, Packet); + _ -> + ConnAckRem + end, + ?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _}, + emqx_frame:parse(PubPacket2)), + + %% ... and emqx should resend the message 1s later, as we didn't ACK it. + ?receive_tcp_packet(1200, Sock2, Packet, + begin + ?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _}, + emqx_frame:parse(Packet)) + end), + + %% we ACK it now, then emqx should stop the resending + ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)), + receive + {tcp, Sock2, _PACKET} -> + {ok, MqttPacket, <<>>, _} = emqx_frame:parse(_PACKET), + ?assertNotMatch(?PUBLISH_PACKET(_), MqttPacket) + after 1200 -> ok + end, + ok = gen_tcp:close(Sock1). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- +action_point(Action, N) -> + Action(N). + +make_connect_packet(ClientId, CleanStart) -> + emqx_frame:serialize(?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_name = <<"MQTT">>, + clean_start = CleanStart, + keepalive = 0, + clientid = ClientId + } + )). +make_connack_packet(Code, SP) -> + emqx_frame:serialize(?CONNACK_PACKET(Code, SP)). + +make_publish_packet(Topic, QoS, PacketId, Payload) -> + emqx_frame:serialize( + ?PUBLISH_PACKET(QoS, Topic, PacketId, Payload)). + +make_subscribe_packet(TopicFileter, QoS, PacketId) -> + emqx_frame:serialize( + ?SUBSCRIBE_PACKET(PacketId, [{TopicFileter, #{rh => 0, rap => 0, nl => 0, qos => QoS}}])). + recv_msgs(Count) -> recv_msgs(Count, []).