diff --git a/.ci/build_packages/Dockerfile b/.ci/build_packages/Dockerfile index 792acddf5..6f9a12159 100644 --- a/.ci/build_packages/Dockerfile +++ b/.ci/build_packages/Dockerfile @@ -7,6 +7,8 @@ COPY . /emqx WORKDIR /emqx +RUN rm -rf _build/${EMQX_NAME}/lib _build/${EMQX_NAME}-pkg/lib + RUN make ${EMQX_NAME}-zip || cat rebar3.crashdump RUN make ${EMQX_NAME}-pkg || cat rebar3.crashdump diff --git a/.ci/build_packages/tests.sh b/.ci/build_packages/tests.sh index bbc107d37..d5feda5f2 100755 --- a/.ci/build_packages/tests.sh +++ b/.ci/build_packages/tests.sh @@ -3,10 +3,23 @@ set -x -e -u export CODE_PATH=${CODE_PATH:-"/emqx"} export EMQX_NAME=${EMQX_NAME:-"emqx"} export PACKAGE_PATH="${CODE_PATH}/_packages/${EMQX_NAME}" -export RELUP_PACKAGE_PATH="${CODE_PATH}/relup_packages/${EMQX_NAME}" +export RELUP_PACKAGE_PATH="${CODE_PATH}/_upgrade_base" # export EMQX_NODE_NAME="emqx-on-$(uname -m)@127.0.0.1" # export EMQX_NODE_COOKIE=$(date +%s%N) +case "$(uname -m)" in + x86_64) + ARCH='amd64' + ;; + aarch64) + ARCH='arm64' + ;; + arm*) + ARCH=arm + ;; +esac +export ARCH + emqx_prepare(){ mkdir -p "${PACKAGE_PATH}" @@ -136,19 +149,19 @@ running_test(){ } relup_test(){ - TARGET_VERSION="$1" + TARGET_VERSION="$("$CODE_PATH"/pkg-vsn.sh)" if [ -d "${RELUP_PACKAGE_PATH}" ];then - cd "${RELUP_PACKAGE_PATH }" + cd "${RELUP_PACKAGE_PATH}" - for var in "${EMQX_NAME}"-*-"$(uname -m)".zip;do + for var in "${EMQX_NAME}"-*-"${ARCH}".zip;do packagename=$(basename "${var}") unzip "$packagename" ./emqx/bin/emqx start || ( tail emqx/log/emqx.log.1 && exit 1 ) ./emqx/bin/emqx_ctl status ./emqx/bin/emqx versions - cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${TARGET_VERSION}-$(uname -m)".zip ./emqx/releases + cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${TARGET_VERSION}-${ARCH}".zip ./emqx/releases ./emqx/bin/emqx install "${TARGET_VERSION}" - [ "$(./emqx/bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]")" = "${TARGET_VERSION}" ] || exit 1 + [ "$(./emqx/bin/emqx versions |grep permanent | awk '{print $2}')" = "${TARGET_VERSION}" ] || exit 1 ./emqx/bin/emqx_ctl status ./emqx/bin/emqx stop rm -rf emqx @@ -158,4 +171,4 @@ relup_test(){ emqx_prepare emqx_test -# relup_test +relup_test diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index f5fe3fe5c..ebae56b2b 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -3,10 +3,6 @@ name: Cross build packages on: schedule: - cron: '0 */6 * * *' - push: - tags: - - v* - - e* release: types: - published @@ -19,25 +15,34 @@ jobs: outputs: profiles: ${{ steps.set_profile.outputs.profiles}} + old_vsns: ${{ steps.set_profile.outputs.old_vsns}} steps: - uses: actions/checkout@v2 with: path: source + fetch-depth: 0 - name: set profile id: set_profile shell: bash run: | - if make -C source emqx-ee --dry-run > /dev/null 2>&1; then + cd source + vsn="$(./pkg-vsn.sh)" + pre_vsn="$(echo $vsn | grep -oE '^[0-9]+.[0-9]')" + if make emqx-ee --dry-run > /dev/null 2>&1; then + old_vsns="$(git tag -l "e$pre_vsn.[0-9]" | xargs echo -n | sed "s/e$vsn//")" + echo "::set-output name=old_vsns::$old_vsns" echo "::set-output name=profiles::[\"emqx-ee\"]" else + old_vsns="$(git tag -l "v$pre_vsn.[0-9]" | xargs echo -n | sed "s/v$vsn//")" + echo "::set-output name=old_vsns::$old_vsns" echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]" fi - name: get_all_deps if: endsWith(github.repository, 'emqx') run: | make -C source deps-all - zip -ryq source.zip source + zip -ryq source.zip source/* source/.[^.]* - name: get_all_deps if: endsWith(github.repository, 'enterprise') run: | @@ -45,7 +50,7 @@ jobs: git config --global credential.helper store echo "${{ secrets.CI_GIT_TOKEN }}" >> source/scripts/git-token make -C source deps-all - zip -ryq source.zip source + zip -ryq source.zip source/* source/.[^.]* - uses: actions/upload-artifact@v2 with: name: source @@ -251,35 +256,32 @@ jobs: path: . - name: unzip source code run: unzip -q source.zip - - name: downloads emqx zip packages + - name: downloads old emqx zip packages env: PROFILE: ${{ matrix.profile }} ARCH: ${{ matrix.arch }} SYSTEM: ${{ matrix.os }} + OLD_VSNS: ${{ needs.prepare.outputs.old_vsns }} run: | - set -e -u -x - cd source - if [ $PROFILE = "emqx" ];then broker="emqx-ce"; else broker="$PROFILE"; fi - if [ $PROFILE = "emqx-ee" ];then edition='enterprise'; else edition='opensource'; fi - - vsn="$(./pkg-vsn.sh)" - pre_vsn="$(echo $vsn | grep -oE '^[0-9]+.[0-9]')" - if [ $PROFILE = "emqx-ee" ]; then - old_vsns=($(git tag -l "e$pre_vsn.[0-9]" | sed "s/e$vsn//")) - else - old_vsns=($(git tag -l "v$pre_vsn.[0-9]" | sed "s/v$vsn//")) + set -e -x -u + broker=$PROFILE + if [ $PROFILE = "emqx" ];then + broker="emqx-ce" + fi + if [[ "$SYSTEM" =~ "raspbian*" ]];then + export ARCH="arm" fi - mkdir -p _upgrade_base - cd _upgrade_base - for tag in ${old_vsns[@]};do - if [ ! -z "$(echo $(curl -I -m 10 -o /dev/null -s -w %{http_code} https://s3-${{ secrets.AWS_DEFAULT_REGION }}.amazonaws.com/${{ secrets.AWS_S3_BUCKET }}/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip) | grep -oE "^[23]+")" ];then + mkdir -p source/_upgrade_base + cd source/_upgrade_base + old_vsns=($(echo $OLD_VSNS | tr ' ' ' ')) + for tag in ${old_vsns[@]}; do + if [ ! -z "$(echo $(curl -I -m 10 -o /dev/null -s -w %{http_code} https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip) | grep -oE "^[23]+")" ];then wget --no-verbose https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip wget --no-verbose https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip.sha256 echo "$(cat $PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip.sha256) $PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip" | sha256sum -c || exit 1 fi done - cd - - name: build emqx packages env: ERL_OTP: erl23.2.7.2-emqx-2 diff --git a/.github/workflows/run_cts_tests.yaml b/.github/workflows/run_cts_tests.yaml index c8c642af7..89d9df7de 100644 --- a/.github/workflows/run_cts_tests.yaml +++ b/.github/workflows/run_cts_tests.yaml @@ -5,9 +5,6 @@ on: tags: - v* - e* - release: - types: - - published pull_request: jobs: diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index 12f9da95c..0bf57dc39 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -5,9 +5,6 @@ on: tags: - v* - e* - release: - types: - - published pull_request: jobs: diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index d5a4a021e..3fbe88c40 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -5,9 +5,6 @@ on: tags: - v* - e* - release: - types: - - published pull_request: jobs: diff --git a/Makefile b/Makefile index 01fc40435..cc8cdb0db 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ $(shell $(CURDIR)/scripts/git-hooks-init.sh) -REBAR_VERSION = 3.14.3-emqx-7 +REBAR_VERSION = 3.14.3-emqx-8 REBAR = $(CURDIR)/rebar3 BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts diff --git a/README-CN.md b/README-CN.md index f6c5faf1e..7d2888327 100644 --- a/README-CN.md +++ b/README-CN.md @@ -7,6 +7,7 @@ [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Twitter-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech) [![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow)](https://askemq.com) +[![YouTube](https://img.shields.io/badge/Subscribe-EMQ%20中文-FF0000?logo=youtube)](https://www.youtube.com/channel/UCir_r04HIsLjf2qqyZ4A8Cg) [![最棒的物联网 MQTT 开源团队期待您的加入](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://careers.emqx.cn/) diff --git a/README-JP.md b/README-JP.md index 0219eb25e..b7afe8195 100644 --- a/README-JP.md +++ b/README-JP.md @@ -6,6 +6,7 @@ [![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx) [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Twitter-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech) +[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q) [![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) diff --git a/README-RU.md b/README-RU.md index 2dc5a6287..cddaba4a5 100644 --- a/README-RU.md +++ b/README-RU.md @@ -7,6 +7,7 @@ [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Follow-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech) [![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow?logo=github)](https://github.com/emqx/emqx/discussions) +[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q) [![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) diff --git a/README.md b/README.md index 34366c2ea..8d8ed8731 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx) [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Follow-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech) -[![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow?logo=github)](https://github.com/emqx/emqx/discussions) +[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q) [![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src index bf0ba78aa..7d784e3b2 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_jwt, [{description, "EMQ X Authentication with JWT"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_jwt_sup]}, {applications, [kernel,stdlib,jose]}, diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src b/apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src new file mode 100644 index 000000000..b9831bb6f --- /dev/null +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src @@ -0,0 +1,15 @@ +%% -*-: erlang -*- +{VSN, + [ + {"4.3.0", [ + {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.0", [ + {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl b/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl index 2d00903e3..b9d19bf57 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl @@ -140,7 +140,7 @@ handle_verify(JwsCompacted, State = #state{static = Static, remote = Remote}) -> try Jwks = case emqx_json:decode(jose_jws:peek_protected(JwsCompacted), [return_maps]) of - #{<<"kid">> := Kid} -> + #{<<"kid">> := Kid} when Remote /= undefined -> [J || J <- Remote, maps:get(<<"kid">>, J#jose_jwk.fields, undefined) =:= Kid]; _ -> Static end, @@ -150,7 +150,9 @@ handle_verify(JwsCompacted, {reply, do_verify(JwsCompacted, Jwks), State} end catch - _:_ -> + Class : Reason : Stk -> + ?LOG(error, "Handle JWK crashed: ~p, ~p, stacktrace: ~p~n", + [Class, Reason, Stk]), {reply, {error, invalid_signature}, State} end. @@ -186,8 +188,8 @@ do_verify(JwsCompacted, [Jwk|More]) -> {true, Payload, _Jws} -> Claims = emqx_json:decode(Payload, [return_maps]), case check_claims(Claims) of - false -> - {error, invalid_signature}; + {false, <<"exp">>} -> + {error, {invalid_signature, expired}}; NClaims -> {ok, NClaims} end; @@ -217,6 +219,6 @@ do_check_claim([{K, F}|More], Claims) -> {V, NClaims} -> case F(V) of true -> do_check_claim(More, NClaims); - _ -> false + _ -> {false, K} end end. diff --git a/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl index d0a4a34a0..d4f562b6f 100644 --- a/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl +++ b/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl @@ -33,6 +33,7 @@ groups() -> , t_check_claims , t_check_claims_clientid , t_check_claims_username + , t_check_claims_kid_in_header ]} ]. @@ -61,6 +62,12 @@ set_special_configs(emqx_auth_jwt) -> set_special_configs(_) -> ok. +sign(Payload, Header, Key) when is_map(Header) -> + Jwk = jose_jwk:from_oct(Key), + Jwt = emqx_json:encode(Payload), + {_, Token} = jose_jws:compact(jose_jwt:sign(Jwk, Header, Jwt)), + Token; + sign(Payload, Alg, Key) -> Jwk = jose_jwk:from_oct(Key), Jwt = emqx_json:encode(Payload), @@ -145,3 +152,15 @@ t_check_claims_username(_) -> Result3 = emqx_access_control:authenticate(Plain#{password => Jwt_Error}), ct:pal("Auth result for the invalid jwt: ~p~n", [Result3]), ?assertEqual({error, invalid_signature}, Result3). + +t_check_claims_kid_in_header(_) -> + application:set_env(emqx_auth_jwt, verify_claims, []), + Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external}, + Jwt = sign([{clientid, <<"client23">>}, + {username, <<"plain">>}, + {exp, os:system_time(seconds) + 3}], + #{<<"alg">> => <<"HS256">>, + <<"kid">> => <<"a_kid_str">>}, <<"emqxsecret">>), + Result0 = emqx_access_control:authenticate(Plain#{password => Jwt}), + ct:pal("Auth result: ~p~n", [Result0]), + ?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result0). diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 9454d25b4..06945afad 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,23 +1,12 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- {VSN, - [ {"4.3.2", - [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []} - ]}, - {<<"4.3.[0-1]">>, - [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} - , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []} - , {load_module, emqx_mgmt, brutal_purge, soft_purge, []} + [ {<<"4.3.[0-2]">>, + [ {restart_application, emqx_management} ]}, {<<".*">>, []} ], - [ - {"4.3.2", - [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []} - ]}, - {<<"4.3.[0-1]">>, - [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} - , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []} - , {load_module, emqx_mgmt, brutal_purge, soft_purge, []} + [ {<<"4.3.[0-2]">>, + [ {restart_application, emqx_management} ]}, {<<".*">>, []} ] diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 5dedc91be..e068c5384 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -239,22 +239,30 @@ pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) -> end end. -qs(<<"_gte_", Key/binary>>, Value, Type) -> - {binary_to_existing_atom(Key, utf8), '>=', to_type(Value, Type)}; -qs(<<"_lte_", Key/binary>>, Value, Type) -> - {binary_to_existing_atom(Key, utf8), '=<', to_type(Value, Type)}; -qs(<<"_like_", Key/binary>>, Value, Type) -> - {binary_to_existing_atom(Key, utf8), like, to_type(Value, Type)}; -qs(<<"_match_", Key/binary>>, Value, Type) -> - {binary_to_existing_atom(Key, utf8), match, to_type(Value, Type)}; -qs(Key, Value, Type) -> - {binary_to_existing_atom(Key, utf8), '=:=', to_type(Value, Type)}. - qs(K1, V1, K2, V2, Type) -> {Key, Op1, NV1} = qs(K1, V1, Type), {Key, Op2, NV2} = qs(K2, V2, Type), {Key, Op1, NV1, Op2, NV2}. +qs(K, Value0, Type) -> + try + qs(K, to_type(Value0, Type)) + catch + throw : bad_value_type -> + throw({bad_value_type, {K, Type, Value0}}) + end. + +qs(<<"_gte_", Key/binary>>, Value) -> + {binary_to_existing_atom(Key, utf8), '>=', Value}; +qs(<<"_lte_", Key/binary>>, Value) -> + {binary_to_existing_atom(Key, utf8), '=<', Value}; +qs(<<"_like_", Key/binary>>, Value) -> + {binary_to_existing_atom(Key, utf8), like, Value}; +qs(<<"_match_", Key/binary>>, Value) -> + {binary_to_existing_atom(Key, utf8), match, Value}; +qs(Key, Value) -> + {binary_to_existing_atom(Key, utf8), '=:=', Value}. + is_fuzzy_key(<<"_like_", _/binary>>) -> true; is_fuzzy_key(<<"_match_", _/binary>>) -> @@ -265,11 +273,19 @@ is_fuzzy_key(_) -> %%-------------------------------------------------------------------- %% Types -to_type(V, atom) -> to_atom(V); -to_type(V, integer) -> to_integer(V); -to_type(V, timestamp) -> to_timestamp(V); -to_type(V, ip) -> aton(V); -to_type(V, _) -> V. +to_type(V, TargetType) -> + try + to_type_(V, TargetType) + catch + _ : _ -> + throw(bad_value_type) + end. + +to_type_(V, atom) -> to_atom(V); +to_type_(V, integer) -> to_integer(V); +to_type_(V, timestamp) -> to_timestamp(V); +to_type_(V, ip) -> aton(V); +to_type_(V, _) -> V. to_atom(A) when is_atom(A) -> A; diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 90f0d3466..2fe6a5ccb 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -140,10 +140,14 @@ -define(format_fun, {?MODULE, format_channel_info}). list(Bindings, Params) when map_size(Bindings) == 0 -> - minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)}); + fence(fun() -> + emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun) + end); list(#{node := Node}, Params) when Node =:= node() -> - minirest:return({ok, emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun)}); + fence(fun() -> + emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun) + end); list(Bindings = #{node := Node}, Params) -> case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of @@ -151,6 +155,19 @@ list(Bindings = #{node := Node}, Params) -> Res -> Res end. +%% @private +fence(Func) -> + try + minirest:return({ok, Func()}) + catch + throw : {bad_value_type, {_Key, Type, Value}} -> + Reason = iolist_to_binary( + io_lib:format("Can't convert ~p to ~p type", + [Value, Type]) + ), + minirest:return({error, ?ERROR8, Reason}) + end. + lookup(#{node := Node, clientid := ClientId}, _Params) -> minirest:return({ok, emqx_mgmt:lookup_client(Node, {clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)}); diff --git a/apps/emqx_management/src/emqx_mgmt_api_data.erl b/apps/emqx_management/src/emqx_mgmt_api_data.erl index 83b8d0e28..e389a1313 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_data.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_data.erl @@ -110,7 +110,8 @@ get_list_exported() -> import(_Bindings, Params) -> case proplists:get_value(<<"filename">>, Params) of undefined -> - minirest:return({error, missing_required_params}); + Result = import_content(Params), + minirest:return(Result); Filename -> case proplists:get_value(<<"node">>, Params) of undefined -> @@ -127,11 +128,11 @@ import(_Bindings, Params) -> end. do_import(Filename) -> - FullFilename = filename:join([emqx:get_env(data_dir), Filename]), + FullFilename = fullname(Filename), emqx_mgmt_data_backup:import(FullFilename, "{}"). download(#{filename := Filename}, _Params) -> - FullFilename = filename:join([emqx:get_env(data_dir), Filename]), + FullFilename = fullname(Filename), case file:read_file(FullFilename) of {ok, Bin} -> {ok, #{filename => list_to_binary(Filename), @@ -145,7 +146,7 @@ upload(Bindings, Params) -> do_upload(_Bindings, #{<<"filename">> := Filename, <<"file">> := Bin}) -> - FullFilename = filename:join([emqx:get_env(data_dir), Filename]), + FullFilename = fullname(Filename), case file:write_file(FullFilename, Bin) of ok -> minirest:return({ok, [{node, node()}]}); @@ -153,18 +154,33 @@ do_upload(_Bindings, #{<<"filename">> := Filename, minirest:return({error, Reason}) end; do_upload(Bindings, Params = #{<<"file">> := _}) -> - Seconds = erlang:system_time(second), - {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), - Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), - do_upload(Bindings, Params#{<<"filename">> => Filename}); + do_upload(Bindings, Params#{<<"filename">> => tmp_filename()}); do_upload(_Bindings, _Params) -> minirest:return({error, missing_required_params}). delete(#{filename := Filename}, _Params) -> - FullFilename = filename:join([emqx:get_env(data_dir), Filename]), + FullFilename = fullname(Filename), case file:delete(FullFilename) of ok -> minirest:return(); {error, Reason} -> minirest:return({error, Reason}) end. + +import_content(Content) -> + File = dump_to_tmp_file(Content), + do_import(File). + +dump_to_tmp_file(Content) -> + Bin = emqx_json:encode(Content), + Filename = tmp_filename(), + ok = file:write_file(fullname(Filename), Bin), + Filename. + +fullname(Name) -> + filename:join(emqx:get_env(data_dir), Name). + +tmp_filename() -> + Seconds = erlang:system_time(second), + {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), + io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]). diff --git a/apps/emqx_management/src/emqx_mgmt_http.erl b/apps/emqx_management/src/emqx_mgmt_http.erl index a037da37f..da5a75027 100644 --- a/apps/emqx_management/src/emqx_mgmt_http.erl +++ b/apps/emqx_management/src/emqx_mgmt_http.erl @@ -119,12 +119,18 @@ authorize_appid(Req) -> _ -> false end. +-ifdef(EMQX_ENTERPRISE). +filter(_) -> + true. +-else. filter(#{app := emqx_modules}) -> true; filter(#{app := App}) -> case emqx_plugins:find_plugin(App) of false -> false; Plugin -> Plugin#plugin.active end. +-endif. + format(Port) when is_integer(Port) -> io_lib:format("0.0.0.0:~w", [Port]); diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index bf3727ff5..d4d284e69 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -553,6 +553,23 @@ t_data(_) -> application:stop(emqx_dahboard), ok. +t_data_import_content(_) -> + ok = emqx_rule_registry:mnesia(boot), + ok = emqx_dashboard_admin:mnesia(boot), + application:ensure_all_started(emqx_rule_engine), + application:ensure_all_started(emqx_dashboard), + {ok, Data} = request_api(post, api_path(["data","export"]), [], auth_header_(), [#{}]), + #{<<"filename">> := Filename} = emqx_ct_http:get_http_data(Data), + Dir = emqx:get_env(data_dir), + {ok, Bin} = file:read_file(filename:join(Dir, Filename)), + Content = emqx_json:decode(Bin), + %% TODO: enable when 5.0 if we are still using data export/import + %?assertMatch({ok, "{\"code\":0}"}, request_api(post, api_path(["data","import"]), [], auth_header_(), Content)), + ?assertMatch({ok, "{\"message\":\"5.0\",\"code\":\"unsupported_version\"}"}, + request_api(post, api_path(["data","import"]), [], auth_header_(), Content)), + application:stop(emqx_rule_engine), + application:stop(emqx_dahboard). + request_api(Method, Url, Auth) -> request_api(Method, Url, [], Auth, []). diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index 3d7db1b02..0e4e53dc8 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 499664fe1..a6a1a6c84 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,17 +1,13 @@ %% -*-: erlang -*- {VSN, [ - {"4.3.0", [ - {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} - ]}, - {<<".*">>, []} + {<<"4.3.[0-1]">>, [ + {restart_application, emqx_sn} + ]} ], [ - {"4.3.0", [ - {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} - ]}, - {<<".*">>, []} + {<<"4.3.[0-1]">>, [ + {restart_application, emqx_sn} + ]} ] }. diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index d39e8b34f..9575523f8 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -43,7 +43,8 @@ start(_Type, _Args) -> Addr = application:get_env(emqx_sn, port, 1884), GwId = application:get_env(emqx_sn, gateway_id, 1), - {ok, Sup} = emqx_sn_sup:start_link(Addr, GwId), + PredefTopics = application:get_env(emqx_sn, predefined, []), + {ok, Sup} = emqx_sn_sup:start_link(Addr, GwId, PredefTopics), start_listeners(), {ok, Sup}. @@ -57,13 +58,7 @@ stop(_State) -> -spec start_listeners() -> ok. start_listeners() -> - PredefTopics = application:get_env(emqx_sn, predefined, []), - ListenCfs = [begin - TabName = tabname(Proto, ListenOn), - {ok, RegistryPid} = emqx_sn_sup:start_registry_proc(emqx_sn_sup, TabName, PredefTopics), - {Proto, ListenOn, [{registry, {TabName, RegistryPid}} | Options]} - end || {Proto, ListenOn, Options} <- listeners_confs()], - lists:foreach(fun start_listener/1, ListenCfs). + lists:foreach(fun start_listener/1, listeners_confs()). -spec start_listener(listener()) -> ok. start_listener({Proto, ListenOn, Options}) -> @@ -151,7 +146,3 @@ format({Addr, Port}) when is_list(Addr) -> io_lib:format("~s:~w", [Addr, Port]); format({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). - -tabname(Proto, ListenOn) -> - list_to_atom(lists:flatten(["emqx_sn_registry__", atom_to_list(Proto), "_", format(ListenOn)])). - diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 96f849974..2339961cf 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -82,7 +82,6 @@ sockname :: {inet:ip_address(), inet:port()}, peername :: {inet:ip_address(), inet:port()}, channel :: maybe(emqx_channel:channel()), - registry :: emqx_sn_registry:registry(), clientid :: maybe(binary()), username :: maybe(binary()), password :: maybe(binary()), @@ -147,7 +146,6 @@ kick(GwPid) -> init([{_, SockPid, Sock}, Peername, Options]) -> GwId = proplists:get_value(gateway_id, Options), - Registry = proplists:get_value(registry, Options), Username = proplists:get_value(username, Options, undefined), Password = proplists:get_value(password, Options, undefined), EnableQos3 = proplists:get_value(enable_qos3, Options, false), @@ -165,7 +163,6 @@ init([{_, SockPid, Sock}, Peername, Options]) -> sockname = Sockname, peername = Peername, channel = Channel, - registry = Registry, asleep_timer = emqx_sn_asleep_timer:init(), enable_stats = EnableStats, enable_qos3 = EnableQos3, @@ -205,9 +202,9 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, topic_id_type = TopicIdType }, TopicId, _MsgId, Data)}, - State = #state{clientid = ClientId, registry = Registry}) -> + State = #state{clientid = ClientId}) -> TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of - false -> emqx_sn_registry:lookup_topic(Registry, self(), TopicId); + false -> emqx_sn_registry:lookup_topic(ClientId, TopicId); true -> <> end, _ = case TopicName =/= undefined of @@ -292,9 +289,9 @@ wait_for_will_msg(EventType, EventContent, State) -> handle_event(EventType, EventContent, wait_for_will_msg, State). connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, - State = #state{clientid = ClientId, registry = Registry}) -> + State = #state{clientid = ClientId}) -> State0 = - case emqx_sn_registry:register_topic(Registry, self(), TopicName) of + case emqx_sn_registry:register_topic(ClientId, TopicName) of TopicId when is_integer(TopicId) -> ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); @@ -580,13 +577,16 @@ handle_event(EventType, EventContent, StateName, State) -> [StateName, {EventType, EventContent}]), {keep_state, State}. -terminate(Reason, _StateName, #state{channel = Channel, - registry = Registry}) -> - emqx_sn_registry:unregister_topic(Registry, self()), - case Channel =:= undefined of - true -> ok; - false -> emqx_channel:terminate(Reason, Channel) - end. +terminate(Reason, _StateName, #state{channel = Channel}) -> + ClientId = emqx_channel:info(clientid, Channel), + case Reason of + {shutdown, takeovered} -> + ok; + _ -> + emqx_sn_registry:unregister_topic(ClientId) + end, + emqx_channel:terminate(Reason, Channel), + ok. code_change(_Vsn, StateName, State, _Extra) -> {ok, StateName, State}. @@ -719,11 +719,12 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); -mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) -> +mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> NewPacketId = if QoS =:= ?QOS_0 -> 0; true -> PacketId end, - {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, self(), Topic) of + ClientId = emqx_channel:info(clientid, Channel), + {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; TopicId when is_integer(TopicId) -> @@ -844,14 +845,13 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, peername = Peername, - registry = Registry, channel = Channel}) -> emqx_logger:set_metadata_clientid(ClientId), #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags, NChannel = case CleanStart of true -> emqx_channel:terminate(normal, Channel), - emqx_sn_registry:unregister_topic(Registry, self()), + emqx_sn_registry:unregister_topic(ClientId), emqx_channel:init(#{socktype => udp, sockname => Sockname, peername => Peername, @@ -864,8 +864,9 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, do_connect(ClientId, CleanStart, Will, Duration, NState). handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, - State=#state{registry = Registry}) -> - case emqx_sn_registry:register_topic(Registry, self(), TopicName) of + State=#state{channel = Channel}) -> + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:register_topic(ClientId, TopicName) of {error, too_large} -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, ?SN_INVALID_TOPIC_ID, @@ -879,8 +880,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, end; handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId, - State = #state{registry = Registry}) -> - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + State = #state{channel = Channel}) -> + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, TopicId, @@ -909,8 +911,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) -> proto_unsubscribe(TopicId, MsgId, State); handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId, - State = #state{registry = Registry}) -> - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + State = #state{channel = Channel}) -> + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}; PredefinedTopic -> @@ -932,10 +935,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, - State=#state{registry = Registry}) -> + State=#state{channel = Channel}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID, State)}; @@ -946,7 +950,7 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), - <> = STopicName , + <> = STopicName, case emqx_topic:wildcard(STopicName) of true -> {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_NOT_SUPPORTED, @@ -974,12 +978,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) -> ok. do_puback(TopicId, MsgId, ReturnCode, StateName, - State=#state{registry = Registry}) -> + State=#state{channel = Channel}) -> case ReturnCode of ?SN_RC_ACCEPTED -> handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); ?SN_RC_INVALID_TOPIC_ID -> - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, State}; TopicName -> %%notice that this TopicName maybe normal or predefined, @@ -1068,9 +1073,10 @@ handle_outgoing(Packets, State) when is_list(Packets) -> end, State, Packets); handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), - State = #state{registry = Registry}) -> + State = #state{channel = Channel}) -> ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]), - TopicId = emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName), + ClientId = emqx_channel:info(clientid, Channel), + TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of true -> register_and_notify_client(PubPkt, State); false -> send_message(mqtt2sn(PubPkt, State), State) @@ -1094,10 +1100,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, - State = #state{registry = Registry, pending_topic_ids = Pendings}) -> + State = #state{pending_topic_ids = Pendings, channel = Channel}) -> MsgId = message_id(PacketId), #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, - TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName), + ClientId = emqx_channel:info(clientid, Channel), + TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index fa17ebc27..4a3b22585 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -23,16 +23,16 @@ -define(LOG(Level, Format, Args), emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). --export([ start_link/2 - , stop/1 +-export([ start_link/1 + , stop/0 ]). --export([ register_topic/3 - , unregister_topic/2 +-export([ register_topic/2 + , unregister_topic/1 ]). --export([ lookup_topic/3 - , lookup_topic_id/3 +-export([ lookup_topic/2 + , lookup_topic_id/2 ]). %% gen_server callbacks @@ -46,25 +46,45 @@ -define(TAB, ?MODULE). --record(state, {tab, max_predef_topic_id = 0}). +-record(state, {max_predef_topic_id = 0}). --type(registry() :: {ets:tab(), pid()}). +-record(emqx_sn_registry, {key, value}). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + + +%% @doc Create or replicate tables. +-spec(mnesia(boot | copy) -> ok). +mnesia(boot) -> + %% Optimize storage + StoreProps = [{ets, [{read_concurrency, true}]}], + ok = ekka_mnesia:create_table(?MODULE, [ + {attributes, record_info(fields, emqx_sn_registry)}, + {ram_copies, [node()]}, + {storage_properties, StoreProps}]); + +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?MODULE, ram_copies). %%----------------------------------------------------------------------------- --spec(start_link(atom(), list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). -start_link(Tab, PredefTopics) -> - gen_server:start_link(?MODULE, [Tab, PredefTopics], []). +-spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). +start_link(PredefTopics) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). --spec(stop(registry()) -> ok). -stop({_Tab, Pid}) -> - gen_server:stop(Pid, normal, infinity). +-spec(stop() -> ok). +stop() -> + gen_server:stop(?MODULE, normal, infinity). --spec(register_topic(registry(), pid(), binary()) -> integer() | {error, term()}). -register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) -> +-spec(register_topic(binary(), binary()) -> integer() | {error, term()}). +register_topic(ClientId, TopicName) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - gen_server:call(Pid, {register, ClientPid, TopicName}); + gen_server:call(?MODULE, {register, ClientId, TopicName}); %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not %% relevant in case of subscriptions to a short topic name or to a topic @@ -72,22 +92,22 @@ register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) -> true -> {error, wildcard_topic} end. --spec(lookup_topic(registry(), pid(), pos_integer()) -> undefined | binary()). -lookup_topic({Tab, _Pid}, ClientPid, TopicId) when is_integer(TopicId) -> - case lookup_element(Tab, {predef, TopicId}, 2) of +-spec(lookup_topic(binary(), pos_integer()) -> undefined | binary()). +lookup_topic(ClientId, TopicId) when is_integer(TopicId) -> + case lookup_element(?TAB, {predef, TopicId}, 3) of undefined -> - lookup_element(Tab, {ClientPid, TopicId}, 2); + lookup_element(?TAB, {ClientId, TopicId}, 3); Topic -> Topic end. --spec(lookup_topic_id(registry(), pid(), binary()) +-spec(lookup_topic_id(binary(), binary()) -> undefined | pos_integer() | {predef, integer()}). -lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) -> - case lookup_element(Tab, {predef, TopicName}, 2) of +lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) -> + case lookup_element(?TAB, {predef, TopicName}, 3) of undefined -> - lookup_element(Tab, {ClientPid, TopicName}, 2); + lookup_element(?TAB, {ClientId, TopicName}, 3); TopicId -> {predef, TopicId} end. @@ -96,47 +116,59 @@ lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) -> lookup_element(Tab, Key, Pos) -> try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end. --spec(unregister_topic(registry(), pid()) -> ok). -unregister_topic({_Tab, Pid}, ClientPid) -> - gen_server:call(Pid, {unregister, ClientPid}). +-spec(unregister_topic(binary()) -> ok). +unregister_topic(ClientId) -> + gen_server:call(?MODULE, {unregister, ClientId}). %%----------------------------------------------------------------------------- -init([Tab, PredefTopics]) -> +init([PredefTopics]) -> %% {predef, TopicId} -> TopicName %% {predef, TopicName} -> TopicId - %% {ClientPid, TopicId} -> TopicName - %% {ClientPid, TopicName} -> TopicId - _ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]), + %% {ClientId, TopicId} -> TopicName + %% {ClientId, TopicName} -> TopicId MaxPredefId = lists:foldl( fun({TopicId, TopicName}, AccId) -> - _ = ets:insert(Tab, {{predef, TopicId}, TopicName}), - _ = ets:insert(Tab, {{predef, TopicName}, TopicId}), + mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, + value = TopicName}), + mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, + value = TopicId}), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), - {ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}. + {ok, #state{max_predef_topic_id = MaxPredefId}}. -handle_call({register, ClientPid, TopicName}, _From, - State = #state{tab = Tab, max_predef_topic_id = PredefId}) -> - case lookup_topic_id({Tab, self()}, ClientPid, TopicName) of +handle_call({register, ClientId, TopicName}, _From, + State = #state{max_predef_topic_id = PredefId}) -> + case lookup_topic_id(ClientId, TopicName) of {predef, PredefTopicId} when is_integer(PredefTopicId) -> {reply, PredefTopicId, State}; TopicId when is_integer(TopicId) -> {reply, TopicId, State}; undefined -> - case next_topic_id(Tab, PredefId, ClientPid) of + case next_topic_id(?TAB, PredefId, ClientId) of TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> - _ = ets:insert(Tab, {{ClientPid, next_topic_id}, TopicId + 1}), - _ = ets:insert(Tab, {{ClientPid, TopicName}, TopicId}), - _ = ets:insert(Tab, {{ClientPid, TopicId}, TopicName}), - {reply, TopicId, State} + Fun = fun() -> + mnesia:write(#emqx_sn_registry{key = {ClientId, next_topic_id}, + value = TopicId + 1}), + mnesia:write(#emqx_sn_registry{key = {ClientId, TopicName}, + value = TopicId}), + mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, + value = TopicName}) + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + {reply, TopicId, State}; + {aborted, Error} -> + {reply, {error, Error}, State} + end end end; -handle_call({unregister, ClientPid}, _From, State = #state{tab = Tab}) -> - ets:match_delete(Tab, {{ClientPid, '_'}, '_'}), +handle_call({unregister, ClientId}, _From, State) -> + Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), + lists:foreach(fun(R) -> mnesia:dirty_delete_object(R) end, Registry), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -159,8 +191,8 @@ code_change(_OldVsn, State, _Extra) -> %%----------------------------------------------------------------------------- -next_topic_id(Tab, PredefId, ClientPid) -> - case ets:lookup(Tab, {ClientPid, next_topic_id}) of - [{_, Id}] -> Id; +next_topic_id(Tab, PredefId, ClientId) -> + case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of + [#emqx_sn_registry{value = Id}] -> Id; [] -> PredefId + 1 end. diff --git a/apps/emqx_sn/src/emqx_sn_sup.erl b/apps/emqx_sn/src/emqx_sn_sup.erl index 817aa4d06..3d4fe602f 100644 --- a/apps/emqx_sn/src/emqx_sn_sup.erl +++ b/apps/emqx_sn/src/emqx_sn_sup.erl @@ -18,32 +18,26 @@ -behaviour(supervisor). --export([ start_link/2 - , start_registry_proc/3 +-export([ start_link/3 , init/1 ]). -start_registry_proc(Sup, TabName, PredefTopics) -> - Registry = #{id => TabName, - start => {emqx_sn_registry, start_link, [TabName, PredefTopics]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_sn_registry]}, - handle_ret(supervisor:start_child(Sup, Registry)). +start_link(Addr, GwId, PredefTopics) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [Addr, GwId, PredefTopics]). -start_link(Addr, GwId) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [Addr, GwId]). - -init([{_Ip, Port}, GwId]) -> +init([{_Ip, Port}, GwId, PredefTopics]) -> Broadcast = #{id => emqx_sn_broadcast, start => {emqx_sn_broadcast, start_link, [GwId, Port]}, restart => permanent, shutdown => brutal_kill, type => worker, modules => [emqx_sn_broadcast]}, - {ok, {{one_for_one, 10, 3600}, [Broadcast]}}. + Registry = #{id => emqx_sn_registry, + start => {emqx_sn_registry, start_link, [PredefTopics]}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [emqx_sn_registry]}, + {ok, {{one_for_one, 10, 3600}, [Broadcast, Registry]}}. -handle_ret({ok, Pid, _Info}) -> {ok, Pid}; -handle_ret(Ret) -> Ret. diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 2972571be..fcc2f8b38 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -1084,7 +1084,7 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) -> {ok, C} = emqtt:start_link(), {ok, _} = emqtt:connect(C), {ok, _} = emqtt:publish(C, TopicName1, Payload1, QoS), - timer:sleep(500), + timer:sleep(100), ok = emqtt:disconnect(C), timer:sleep(50), @@ -1278,6 +1278,7 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) -> CleanSession = 0, ReturnCode = 0, send_register_msg(Socket, TopicName_tom, MsgId1), + timer:sleep(50), TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, diff --git a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl index 8cce4592a..8d320d8ed 100644 --- a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl @@ -16,12 +16,9 @@ -module(emqx_sn_registry_SUITE). --import(proplists, [get_value/2]). - -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_sn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(REGISTRY, emqx_sn_registry). @@ -44,84 +41,81 @@ end_per_suite(_Config) -> ok. init_per_testcase(_TestCase, Config) -> + ekka_mnesia:start(), + emqx_sn_registry:mnesia(boot), + mnesia:clear_table(emqx_sn_registry), PredefTopics = application:get_env(emqx_sn, predefined, []), - TabName = emqx_sn_registry, - {ok, Pid} = ?REGISTRY:start_link(TabName, PredefTopics), - [{registray, {TabName, Pid}} | Config]. + {ok, _Pid} = ?REGISTRY:start_link(PredefTopics), + Config. end_per_testcase(_TestCase, Config) -> - ?REGISTRY:stop(get_value(registray, Config)), + ?REGISTRY:stop(), Config. %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- -t_register(Config) -> - Registry = get_value(registray, Config), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)), - emqx_sn_registry:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)). +t_register(_Config) -> + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), + emqx_sn_registry:unregister_topic(<<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). -t_register_case2(Config) -> - Registry = get_value(registray, Config), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic3">>)), - ?REGISTRY:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)). +t_register_case2(_Config) -> + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic3">>)), + ?REGISTRY:unregister_topic(<<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). -t_reach_maximum(Config) -> - Registry = get_value(registray, Config), - register_a_lot(Registry, ?MAX_PREDEF_ID+1, 16#ffff), - ?assertEqual({error, too_large}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicABC">>)), +t_reach_maximum(_Config) -> + register_a_lot(?MAX_PREDEF_ID+1, 16#ffff), + ?assertEqual({error, too_large}, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicABC">>)), Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+1])), Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+2])), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic1)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic2)), - ?REGISTRY:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic2)). + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)), + ?REGISTRY:unregister_topic(<<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)). -t_register_case4(Config) -> - Registry = get_value(registray, Config), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicA">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicB">>)), - ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicC">>)), - ?REGISTRY:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicD">>)). +t_register_case4(_Config) -> + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicA">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicB">>)), + ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicC">>)), + ?REGISTRY:unregister_topic(<<"ClientId">>), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicD">>)). -t_deny_wildcard_topic(Config) -> - Registry = get_value(registray, Config), - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"/TopicA/#">>)), - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"/+/TopicB">>)). +t_deny_wildcard_topic(_Config) -> + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)). %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- -register_a_lot(_, Max, Max) -> +register_a_lot(Max, Max) -> ok; -register_a_lot(Registry, N, Max) when N < Max -> +register_a_lot(N, Max) when N < Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), - ?assertEqual(N, ?REGISTRY:register_topic(Registry, <<"ClientId">>, Topic)), - register_a_lot(Registry, N+1, Max). + ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), + register_a_lot(N+1, Max). diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src index b68816579..97624a900 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_web_hook, [{description, "EMQ X WebHook Plugin"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_web_hook_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index 0c7b8ebf3..b92284b64 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -2,9 +2,15 @@ {VSN, [ + {"4.3.0", [ + {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ], [ + {"4.3.0", [ + {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ] }. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index c88f8f39b..f8e4d8f08 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -353,7 +353,7 @@ pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). get_ssl_opts(Opts, ResId) -> - [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Opts, "rules", ResId)}]. + emqx_plugin_libs_ssl:save_files_return_opts(Opts, "rules", ResId). test_http_connect(Conf) -> Url = fun() -> maps:get(<<"url">>, Conf) end, diff --git a/bin/nodetool b/bin/nodetool index 0e89ac278..c3bfe5b3c 100755 --- a/bin/nodetool +++ b/bin/nodetool @@ -63,7 +63,6 @@ main(Args) -> %% a "pong" io:format("pong\n"); ["stop"] -> - rpc:call(TargetNode, emqx_plugins, unload, [], 60000), io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]); ["restart", "-config", ConfigFile | _RestArgs1] -> io:format("~p\n", [rpc:call(TargetNode, emqx, restart, [ConfigFile], 60000)]); diff --git a/deploy/charts/emqx/README.md b/deploy/charts/emqx/README.md index 1ba1e3266..535ccb8e1 100644 --- a/deploy/charts/emqx/README.md +++ b/deploy/charts/emqx/README.md @@ -37,6 +37,7 @@ The following table lists the configurable parameters of the emqx chart and thei | `image.repository` | EMQ X Image name |emqx/emqx| | `image.pullPolicy` | The image pull policy |IfNotPresent| | `image.pullSecrets ` | The image pull secrets |`[]` (does not add image pull secrets to deployed pods)| +| `recreatePods` | Forces the recreation of pods during upgrades, which can be useful to always apply the most recent configuration. | false | | `persistence.enabled` | Enable EMQX persistence using PVC |false| | `persistence.storageClass` | Storage class of backing PVC |`nil` (uses alpha storage class annotation)| | `persistence.existingClaim` | EMQ X data Persistent Volume existing claim name, evaluated as a template |""| diff --git a/deploy/charts/emqx/templates/StatefulSet.yaml b/deploy/charts/emqx/templates/StatefulSet.yaml index 3a385732a..4cad21569 100644 --- a/deploy/charts/emqx/templates/StatefulSet.yaml +++ b/deploy/charts/emqx/templates/StatefulSet.yaml @@ -47,6 +47,10 @@ spec: version: {{ .Chart.AppVersion }} app.kubernetes.io/name: {{ include "emqx.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} + {{- if .Values.recreatePods }} + annotations: + checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum | quote }} + {{- end }} spec: volumes: - name: emqx-loaded-plugins diff --git a/deploy/charts/emqx/templates/rbac.yaml b/deploy/charts/emqx/templates/rbac.yaml index f1536836b..87cd18178 100644 --- a/deploy/charts/emqx/templates/rbac.yaml +++ b/deploy/charts/emqx/templates/rbac.yaml @@ -5,7 +5,11 @@ metadata: name: {{ include "emqx.fullname" . }} --- kind: Role +{{- if semverCompare ">=1.17-0" .Capabilities.KubeVersion.GitVersion }} +apiVersion: rbac.authorization.k8s.io/v1 +{{- else }} apiVersion: rbac.authorization.k8s.io/v1beta1 +{{- end }} metadata: namespace: {{ .Release.Namespace }} name: {{ include "emqx.fullname" . }} @@ -20,7 +24,11 @@ rules: - list --- kind: RoleBinding +{{- if semverCompare ">=1.17-0" .Capabilities.KubeVersion.GitVersion }} +apiVersion: rbac.authorization.k8s.io/v1 +{{- else }} apiVersion: rbac.authorization.k8s.io/v1beta1 +{{- end }} metadata: namespace: {{ .Release.Namespace }} name: {{ include "emqx.fullname" . }} diff --git a/deploy/charts/emqx/values.yaml b/deploy/charts/emqx/values.yaml index 34302e09a..0e6f24a66 100644 --- a/deploy/charts/emqx/values.yaml +++ b/deploy/charts/emqx/values.yaml @@ -14,6 +14,9 @@ image: # pullSecrets: # - myRegistryKeySecretName +## Forces the recreation of pods during helm upgrades. This can be useful to update configuration values even if the container image did not change. +recreatePods: false + persistence: enabled: false size: 20Mi diff --git a/etc/emqx.conf b/etc/emqx.conf index c7c044c99..f6627bf1c 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -453,6 +453,13 @@ log.file = emqx.log ## Default: No Limit #log.chars_limit = 8192 +## Maximum depth for Erlang term log formatting +## and Erlang process message queue inspection. +## +## Value: Integer or 'unlimited' (without quotes) +## Default: 20 +#log.max_depth = 20 + ## Log formatter ## Value: text | json #log.formatter = text diff --git a/rebar.config b/rebar.config index f8baec832..6c1e3c593 100644 --- a/rebar.config +++ b/rebar.config @@ -50,7 +50,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript new file mode 100755 index 000000000..6a78012d2 --- /dev/null +++ b/scripts/update_appup.escript @@ -0,0 +1,75 @@ +#!/usr/bin/env -S escript -c +%% A script that adds changed modules to the corresponding appup files + +main(_Args) -> + ChangedFiles = string:lexemes(os:cmd("git diff --name-only origin/master..HEAD"), "\n"), + AppModules0 = lists:filtermap(fun filter_erlang_modules/1, ChangedFiles), + %% emqx_app must always be included as we bump version number in emqx_release.hrl for each release + AppModules1 = [{emqx, emqx_app} | AppModules0], + AppModules = group_modules(AppModules1), + io:format("Changed modules: ~p~n", [AppModules]), + _ = maps:map(fun process_app/2, AppModules), + ok. + +process_app(App, Modules) -> + AppupFiles = filelib:wildcard(lists:concat(["{src,apps,lib-*}/**/", App, ".appup.src"])), + case AppupFiles of + [AppupFile] -> + update_appup(AppupFile, Modules); + [] -> + io:format("~nWARNING: Please create an stub appup src file for ~p~n", [App]) + end. + +filter_erlang_modules(Filename) -> + case lists:reverse(filename:split(Filename)) of + [Module, "src"] -> + erl_basename("emqx", Module); + [Module, "src", App|_] -> + erl_basename(App, Module); + [Module, _, "src", App|_] -> + erl_basename(App, Module); + _ -> + false + end. + +erl_basename(App, Name) -> + case filename:basename(Name, ".erl") of + Name -> false; + Module -> {true, {list_to_atom(App), list_to_atom(Module)}} + end. + +group_modules(L) -> + lists:foldl(fun({App, Mod}, Acc) -> + maps:update_with(App, fun(Tl) -> [Mod|Tl] end, [Mod], Acc) + end, #{}, L). + +update_appup(File, Modules) -> + io:format("~nUpdating appup: ~p~n", [File]), + {_, Upgrade0, Downgrade0} = read_appup(File), + Upgrade = update_actions(Modules, Upgrade0), + Downgrade = update_actions(Modules, Downgrade0), + IOList = io_lib:format("%% -*- mode: erlang -*- +{VSN,~n ~p,~n ~p}.~n", [Upgrade, Downgrade]), + ok = file:write_file(File, IOList). + +update_actions(Modules, Versions) -> + lists:map(fun(L) -> do_update_actions(Modules, L) end, Versions). + +do_update_actions(_, Ret = {<<".*">>, _}) -> + Ret; +do_update_actions(Modules, {Vsn, Actions}) -> + {Vsn, add_modules(Modules, Actions)}. + +add_modules(NewModules, OldActions) -> + OldModules = lists:map(fun(It) -> element(2, It) end, OldActions), + Modules = NewModules -- OldModules, + OldActions ++ [{load_module, M, brutal_purge, soft_purge, []} || M <- Modules]. + +read_appup(File) -> + {ok, Bin0} = file:read_file(File), + %% Hack: + Bin1 = re:replace(Bin0, "VSN", "\"VSN\""), + TmpFile = filename:join("/tmp", filename:basename(File)), + ok = file:write_file(TmpFile, Bin1), + {ok, [Terms]} = file:consult(TmpFile), + Terms. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 98fa9ece2..b51a7f3b7 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,76 +1,69 @@ %% -*- mode: erlang -*- {VSN, - [ - {"4.3.2", [ - {load_module, emqx_http_lib, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", [ - {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_frame, brutal_purge, soft_purge, []}, - {load_module, emqx_cm, brutal_purge, soft_purge, []}, - {load_module, emqx_congestion, brutal_purge, soft_purge, []}, - {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_plugins, brutal_purge, soft_purge, []}, - {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, - {load_module, emqx_http_lib, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, - {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_congestion, brutal_purge, soft_purge, []}, - {load_module, emqx_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_frame, brutal_purge, soft_purge, []}, - {load_module, emqx_trie, brutal_purge, soft_purge, []}, - {load_module, emqx_cm, brutal_purge, soft_purge, []}, - {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_plugins, brutal_purge, soft_purge, []}, - {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}, - {load_module, emqx_http_lib, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ], - [ - {"4.3.2", [ - {load_module, emqx_http_lib, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", [ - {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_frame, brutal_purge, soft_purge, []}, - {load_module, emqx_cm, brutal_purge, soft_purge, []}, - {load_module, emqx_congestion, brutal_purge, soft_purge, []}, - {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_plugins, brutal_purge, soft_purge, []}, - {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, - {load_module, emqx_http_lib, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, - {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_connection, brutal_purge, soft_purge, []}, - {load_module, emqx_congestion, brutal_purge, soft_purge, []}, - {load_module, emqx_frame, brutal_purge, soft_purge, []}, - {load_module, emqx_trie, brutal_purge, soft_purge, []}, - {load_module, emqx_cm, brutal_purge, soft_purge, []}, - {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_plugins, brutal_purge, soft_purge, []}, - {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, - %% Just load the module. We don't need to change the 'messages.retained' - %% and 'messages.retained' counter type. - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_http_lib, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ] -}. + [{"4.3.2", + [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + {"4.3.1", + [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_congestion,brutal_purge,soft_purge,[]}, + {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, + {"4.3.0", + [{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_congestion,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_trie,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}], + [{"4.3.2", + [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + {"4.3.1", + [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_congestion,brutal_purge,soft_purge,[]}, + {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, + {"4.3.0", + [{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_congestion,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_trie,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}]}. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 3a818b9a7..e3cbff692 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -407,7 +407,8 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), case emqx_packet:check(Packet) of ok -> TopicFilters0 = parse_topic_filters(TopicFilters), - TupleTopicFilters0 = check_sub_acls(TopicFilters0, Channel), + TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0), + TupleTopicFilters0 = check_sub_acls(TopicFilters1, Channel), case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso lists:any(fun({_TopicFilter, ReasonCode}) -> ReasonCode =:= ?RC_NOT_AUTHORIZED @@ -419,8 +420,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), _Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More); _Fun(TupleList, []) -> TupleList end, - TopicFilters1 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0], - TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1), + TopicFilters2 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0], TopicFilters3 = run_hooks('client.subscribe', [ClientInfo, Properties], TopicFilters2), diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 0e897f0b3..ab91c02b4 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -349,6 +349,7 @@ ensure_stats_timer(_Timeout, State) -> State. -compile({inline, [cancel_stats_timer/1]}). cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -> + ?tp(debug, cancel_stats_timer, #{}), ok = emqx_misc:cancel_timer(TRef), State#state{stats_timer = undefined}; cancel_stats_timer(State) -> State. diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index c2de5fe57..6ef3d2d21 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -34,6 +34,8 @@ , apply_configs/1 ]). +-export([funlog/2]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -267,8 +269,7 @@ do_generate_configs(App) -> true -> Schema = cuttlefish_schema:files([SchemaFile]), Conf = cuttlefish_conf:file(ConfFile), - LogFun = fun(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]) end, - cuttlefish_generator:map(Schema, Conf, undefined, LogFun); + cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2); false -> error({schema_not_found, SchemaFile}) end. @@ -411,3 +412,7 @@ plugin_type(protocol) -> protocol; plugin_type(backend) -> backend; plugin_type(bridge) -> bridge; plugin_type(_) -> feature. + + +funlog(Key, Value) -> + ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]). diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/emqx_mqtt_protocol_v5_SUITE.erl similarity index 96% rename from test/mqtt_protocol_v5_SUITE.erl rename to test/emqx_mqtt_protocol_v5_SUITE.erl index 0956a07fc..8ce35b50c 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(mqtt_protocol_v5_SUITE). +-module(emqx_mqtt_protocol_v5_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -import(lists, [nth/2]). @@ -37,6 +38,7 @@ init_per_suite(Config) -> %% Meck emqtt ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]), %% Start Apps + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. @@ -44,6 +46,19 @@ end_per_suite(_Config) -> ok = meck:unload(emqtt), emqx_ct_helpers:stop_apps([]). +init_per_testcase(TestCase, Config) -> + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase(init, Config); + _ -> Config + end. + +end_per_testcase(TestCase, Config) -> + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase('end', Config); + false -> ok + end, + Config. + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- @@ -273,21 +288,33 @@ t_connect_limit_timeout(_) -> emqx_zone:set_env(external, publish_limit, undefined), meck:unload(proplists). -t_connect_emit_stats_timeout(_) -> - IdleTimeout = 2000, - emqx_zone:set_env(external, idle_timeout, IdleTimeout), +t_connect_emit_stats_timeout(init, Config) -> + NewIdleTimeout = 1000, + OldIdleTimeout = emqx_zone:get_env(external, idle_timeout), + emqx_zone:set_env(external, idle_timeout, NewIdleTimeout), + ok = snabbkaffe:start_trace(), + [{idle_timeout, NewIdleTimeout}, {old_idle_timeout, OldIdleTimeout} | Config]; +t_connect_emit_stats_timeout('end', Config) -> + snabbkaffe:stop(), + {_, OldIdleTimeout} = lists:keyfind(old_idle_timeout, 1, Config), + emqx_zone:set_env(external, idle_timeout, OldIdleTimeout), + ok. +t_connect_emit_stats_timeout(Config) -> + {_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config), {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, _} = emqtt:connect(Client), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), - ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))), - timer:sleep(IdleTimeout), + ?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0), ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))), ok = emqtt:disconnect(Client). %% [MQTT-3.1.2-22] t_connect_keepalive_timeout(_) -> + %% Prevent the emqtt client bringing us down on the disconnect. + process_flag(trap_exit, true), + Keepalive = 2, {ok, Client} = emqtt:start_link([{proto_ver, v5},