diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 33b4d1e7b..befa37912 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -468,7 +468,7 @@ jobs: -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \ -H "Accept: application/vnd.github.v3+json" \ -X POST \ - -d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \ + -d "{\"ref\":\"v1.0.3\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \ "https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches" - name: update repo.emqx.io if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx' @@ -477,7 +477,7 @@ jobs: -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \ -H "Accept: application/vnd.github.v3+json" \ -X POST \ - -d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \ + -d "{\"ref\":\"v1.0.3\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \ "https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches" - name: update homebrew packages if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx' @@ -487,7 +487,7 @@ jobs: -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \ -H "Accept: application/vnd.github.v3+json" \ -X POST \ - -d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \ + -d "{\"ref\":\"v1.0.3\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \ "https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches" fi - uses: geekyeggo/delete-artifact@v1 diff --git a/.github/workflows/run_automate_tests.yaml b/.github/workflows/run_automate_tests.yaml index 70a5f08a1..e654e87c2 100644 --- a/.github/workflows/run_automate_tests.yaml +++ b/.github/workflows/run_automate_tests.yaml @@ -1,5 +1,5 @@ name: Integration Test Suites - + on: push: tags: @@ -12,18 +12,30 @@ jobs: build: runs-on: ubuntu-latest outputs: + imgname: ${{ steps.build_docker.outputs.imgname}} version: ${{ steps.build_docker.outputs.version}} steps: - uses: actions/checkout@v2 - name: build docker id: build_docker run: | + if [ -f EMQX_ENTERPRISE ]; then + echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials + git config --global credential.helper store + echo "${{ secrets.CI_GIT_TOKEN }}" >> scripts/git-token + make deps-emqx-ee + fi make docker echo "::set-output name=version::$(./pkg-vsn.sh)" + if [ -f EMQX_ENTERPRISE ]; then + echo "::set-output name=imgname::emqx-ee" + else + echo "::set-output name=imgname::emqx" + fi - uses: actions/upload-artifact@v2 with: name: emqx-docker-image-zip - path: _packages/emqx/emqx-docker-${{ steps.build_docker.outputs.version }}.zip + path: _packages/${{ steps.build_docker.outputs.imgname }}/${{ steps.build_docker.outputs.imgname }}-docker-${{ steps.build_docker.outputs.version }}.zip webhook: runs-on: ubuntu-latest @@ -43,14 +55,15 @@ jobs: path: /tmp - name: load docker image env: + imgname: ${{ needs.build.outputs.imgname}} version: ${{ needs.build.outputs.version }} run: | - unzip -q /tmp/emqx-docker-${version}.zip -d /tmp - docker load < /tmp/emqx-docker-${version} + unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp + docker load < /tmp/${imgname}-docker-${version} - name: docker compose up timeout-minutes: 5 env: - TARGET: emqx/emqx + TARGET: emqx/${{ needs.build.outputs.imgname }} EMQX_TAG: ${{ needs.build.outputs.version }} run: | docker-compose \ @@ -142,14 +155,15 @@ jobs: path: /tmp - name: load docker image env: + imgname: ${{ needs.build.outputs.imgname }} version: ${{ needs.build.outputs.version }} run: | - unzip -q /tmp/emqx-docker-${version}.zip -d /tmp - docker load < /tmp/emqx-docker-${version} + unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp + docker load < /tmp/${imgname}-docker-${version} - name: docker compose up timeout-minutes: 5 env: - TARGET: emqx/emqx + TARGET: emqx/${{ needs.build.outputs.imgname }} EMQX_TAG: ${{ needs.build.outputs.version }} MYSQL_TAG: ${{ matrix.mysql_tag }} run: | @@ -248,14 +262,15 @@ jobs: path: /tmp - name: load docker image env: + imgname: ${{ needs.build.outputs.imgname }} version: ${{ needs.build.outputs.version }} run: | - unzip -q /tmp/emqx-docker-${version}.zip -d /tmp - docker load < /tmp/emqx-docker-${version} + unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp + docker load < /tmp/${imgname}-docker-${version} - name: docker compose up timeout-minutes: 5 env: - TARGET: emqx/emqx + TARGET: emqx/${{ needs.build.outputs.imgname }} EMQX_TAG: ${{ needs.build.outputs.version }} PGSQL_TAG: ${{ matrix.pgsql_tag }} run: | @@ -343,14 +358,15 @@ jobs: path: /tmp - name: load docker image env: + imgname: ${{ needs.build.outputs.imgname }} version: ${{ needs.build.outputs.version }} run: | - unzip -q /tmp/emqx-docker-${version}.zip -d /tmp - docker load < /tmp/emqx-docker-${version} + unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp + docker load < /tmp/${imgname}-docker-${version} - name: docker compose up timeout-minutes: 5 env: - TARGET: emqx/emqx + TARGET: emqx/${{ needs.build.outputs.imgname }} EMQX_TAG: ${{ needs.build.outputs.version }} MYSQL_TAG: 8 run: | diff --git a/apps/emqx_auth_mongo/rebar.config b/apps/emqx_auth_mongo/rebar.config index f44e69543..78442c00b 100644 --- a/apps/emqx_auth_mongo/rebar.config +++ b/apps/emqx_auth_mongo/rebar.config @@ -1,6 +1,6 @@ {deps, %% NOTE: mind poolboy version when updating mongodb-erlang version - [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.7"}}}, + [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}}, %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git %% (which has overflow_ttl feature added). %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07). diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 1463334b4..e50724d6d 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,13 +1,13 @@ %% -*- mode: erlang -*- {VSN, - [ {<<"4\\.3\\.[0-9]+">>, + [ {<<"4\\.3\\.[0-7]+">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} ]}, {<<".*">>, []} ], - [ {<<"4\\.3\\.[0-9]+">>, + [ {<<"4\\.3\\.[0-7]+">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index db5dd47d0..95f5121cd 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -191,10 +191,8 @@ clients(["show", ClientId]) -> if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> - case emqx_cm:kick_session(bin(ClientId)) of - ok -> emqx_ctl:print("ok~n"); - _ -> emqx_ctl:print("Not Found.~n") - end; + ok = emqx_cm:kick_session(bin(ClientId)), + emqx_ctl:print("ok~n"); clients(_) -> emqx_ctl:usage([{"clients list", "List all clients"}, diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 6bac9b4c7..77d46b744 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -158,9 +158,9 @@ t_clients_cmd(_) -> timer:sleep(300), emqx_mgmt_cli:clients(["list"]), ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "client12")), - ?assertEqual((emqx_mgmt_cli:clients(["kick", "client12"])), "ok~n"), + ?assertEqual("ok~n", emqx_mgmt_cli:clients(["kick", "client12"])), timer:sleep(500), - ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "Not Found")), + ?assertEqual("ok~n", emqx_mgmt_cli:clients(["kick", "client12"])), receive {'EXIT', T, _} -> ok diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index a739fa3c7..e45acfd42 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -223,8 +223,8 @@ t_clients(_) -> timer:sleep(300), - {ok, NotFound0} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()), - ?assertEqual(?ERROR12, get(<<"code">>, NotFound0)), + {ok, Ok1} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()), + ?assertEqual(?SUCCESS, get(<<"code">>, Ok1)), {ok, Clients6} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()), ?assertEqual(1, maps:get(<<"count">>, get(<<"meta">>, Clients6))), diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 5ef13587a..79aefdb85 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -57,7 +57,7 @@ type => string, default => <<"5s">>, title => #{en => <<"Request Timeout">>, - zh => <<"请求超时时间时间"/utf8>>}, + zh => <<"请求超时时间"/utf8>>}, description => #{en => <<"Request Timeout In Seconds">>, zh => <<"请求超时时间"/utf8>>}}, pool_size => #{order => 4, diff --git a/bin/emqx b/bin/emqx index ff12afcac..0662d4a46 100755 --- a/bin/emqx +++ b/bin/emqx @@ -20,6 +20,41 @@ mkdir -p "$RUNNER_LOG_DIR" # Make sure data directory exists mkdir -p "$RUNNER_DATA_DIR" +export ROOTDIR="$RUNNER_ROOT_DIR" +export ERTS_DIR="$ROOTDIR/erts-$ERTS_VSN" +export BINDIR="$ERTS_DIR/bin" +export EMU="beam" +export PROGNAME="erl" +DYNLIBS_DIR="$RUNNER_ROOT_DIR/dynlibs" +ERTS_LIB_DIR="$ERTS_DIR/../lib" +MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME" + +# Echo to stderr on errors +echoerr() { echo "$*" 1>&2; } + +check_eralng_start() { + "$BINDIR/$PROGNAME" -noshell -boot "$REL_DIR/start_clean" -s crypto start -s init stop +} + +if ! check_eralng_start >/dev/null 2>&1; then + BUILT_ON="$(head -1 "${REL_DIR}/BUILT_ON")" + ## failed to start, might be due to missing libs, try to be portable + export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH" + if ! check_eralng_start; then + ## it's hopeless + echoerr "FATAL: Unable to start Erlang (with libcrypto)." + echoerr "Please make sure it's running on the correct platform with all required dependencies." + echoerr "This EMQ X release is built for $BUILT_ON" + exit 1 + fi + echoerr "WARNING: There seem to be missing dynamic libs from the OS. Using libs from ${DYNLIBS_DIR}" +fi + +## backward compatible +if [ -d "$ERTS_DIR/lib" ]; then + export LD_LIBRARY_PATH="$ERTS_DIR/lib:$LD_LIBRARY_PATH" +fi + # cuttlefish try to read environment variables starting with "EMQX_" export CUTTLEFISH_ENV_OVERRIDE_PREFIX='EMQX_' @@ -120,9 +155,6 @@ if [ "$ULIMIT_F" -lt 1024 ]; then echo "!!!!" fi -# Echo to stderr on errors -echoerr() { echo "$@" 1>&2; } - # By default, use cuttlefish to generate app.config and vm.args CUTTLEFISH="${USE_CUTTLEFISH:-yes}" @@ -364,15 +396,6 @@ else PROTO_DIST_ARG="-proto_dist $PROTO_DIST" fi -export ROOTDIR="$RUNNER_ROOT_DIR" -export ERTS_DIR="$ROOTDIR/erts-$ERTS_VSN" -export BINDIR="$ERTS_DIR/bin" -export EMU="beam" -export PROGNAME="erl" -export LD_LIBRARY_PATH="$ERTS_DIR/lib:$LD_LIBRARY_PATH" -ERTS_LIB_DIR="$ERTS_DIR/../lib" -MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME" - cd "$ROOTDIR" # User can specify an sname without @hostname diff --git a/build b/build index be7813e66..be4f88672 100755 --- a/build +++ b/build @@ -15,18 +15,7 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}" export PKG_VSN -if [ "$(uname -s)" = 'Darwin' ]; then - SYSTEM=macos -elif [ "$(uname -s)" = 'Linux' ]; then - if grep -q -i 'centos' /etc/*-release; then - DIST='centos' - VERSION_ID="$(rpm --eval '%{centos_ver}')" - else - DIST="$(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g')" - VERSION_ID="$(sed -n '/^VERSION_ID=/p' /etc/os-release | sed -r 's/VERSION_ID=(.*)/\1/g' | sed 's/"//g')" - fi - SYSTEM="$(echo "${DIST}${VERSION_ID}" | sed -r 's/([a-zA-Z]*)-.*/\1/g')" -fi +SYSTEM="$(./scripts/get-distro.sh)" ARCH="$(uname -m)" case "$ARCH" in @@ -46,8 +35,8 @@ export ARCH ## Support RPM and Debian based linux systems ## if [ "$(uname -s)" = 'Linux' ]; then - case "${DIST:-}" in - ubuntu|debian|raspbian) + case "${SYSTEM:-}" in + ubuntu*|debian*|raspbian*) PKGERDIR='deb' ;; *) @@ -98,6 +87,18 @@ make_relup() { ./rebar3 as "$PROFILE" relup --relname emqx --relvsn "${PKG_VSN}" } +cp_dyn_libs() { + local rel_dir="$1" + local target_dir="${rel_dir}/dynlibs" + if ! [ "$(uname -s)" = 'Linux' ]; then + return 0; + fi + mkdir -p "$target_dir" + while read -r so_file; do + cp -L "$so_file" "$target_dir/" + done < <(find "$rel_dir" -type f \( -name "*.so*" -o -name "beam.smp" \) -print0 | xargs -0 ldd | grep -E '^\s+.*=>\s(/lib|/usr)' | awk '{print $3}') +} + ## make_zip turns .tar.gz into a .zip with a slightly different name. ## It assumes the .tar.gz has been built -- relies on Makefile dependency make_zip() { @@ -117,6 +118,9 @@ make_zip() { local zipball zipball="${pkgpath}/${PROFILE}-${SYSTEM}-${PKG_VSN}-${ARCH}.zip" tar zxf "${tarball}" -C "${tard}/emqx" + ## try to be portable for zip packages. + ## for DEB and RPM packages the dependencies are resoved by yum and apt + cp_dyn_libs "${tard}/emqx" (cd "${tard}" && zip -qr - emqx) > "${zipball}" } diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index 9e0ba9d5b..e362c6b73 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -41,7 +41,7 @@ LABEL org.label-schema.docker.dockerfile="Dockerfile" \ org.label-schema.url="https://emqx.io" \ org.label-schema.vcs-type="Git" \ org.label-schema.vcs-url="https://github.com/emqx/emqx" \ - maintainer="Raymond M Mouthaan , Huang Rui , EMQ X Team " + maintainer="EMQ X Team " ARG QEMU_ARCH=x86_64 ARG EMQX_NAME=emqx diff --git a/docker.mk b/docker.mk index e2fe61d36..22f2b6e4f 100644 --- a/docker.mk +++ b/docker.mk @@ -1,8 +1,10 @@ #!/usr/bin/make -f # -*- makefile -*- -## default globals -TARGET ?= emqx/emqx +## default globals. +## when built with `make docker` command the default profile is either emqx or emqx-ee (for enterprise) +## or the TARGET varialbe can be set beforehand to force a different name +TARGET ?= emqx/$(PROFILE) QEMU_ARCH ?= x86_64 ARCH ?= amd64 QEMU_VERSION ?= v5.0.0-2 @@ -37,7 +39,7 @@ docker-prepare: # enable experimental to use docker manifest command @echo '{ "experimental": "enabled" }' | tee $$HOME/.docker/config.json # enable experimental - @echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50 }' | tee /etc/docker/daemon.json + @echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50 }' | tee /etc/docker/daemon.json @service docker restart .PHONY: docker-build @@ -85,7 +87,7 @@ docker-tag: .PHONY: docker-save docker-save: - @echo "DOCKER SAVE: Save Docker image." + @echo "DOCKER SAVE: Save Docker image." @mkdir -p _packages/$(EMQX_NAME) @@ -94,7 +96,7 @@ docker-save: zip -r -m $(EMQX_NAME)-docker-$(PKG_VSN).zip $(EMQX_NAME)-docker-$(PKG_VSN); \ mv ./$(EMQX_NAME)-docker-$(PKG_VSN).zip _packages/$(EMQX_NAME)/$(EMQX_NAME)-docker-$(PKG_VSN).zip; \ fi - + @for arch in $(ARCH_LIST); do \ if [ -n "$$(docker images -q $(TARGET):$(PKG_VSN)-$(OS)-$${arch})" ]; then \ docker save $(TARGET):$(PKG_VSN)-$(OS)-$${arch} > $(EMQX_NAME)-docker-$(PKG_VSN)-$(OS)-$${arch}; \ @@ -105,8 +107,8 @@ docker-save: .PHONY: docker-push docker-push: - @echo "DOCKER PUSH: Push Docker image."; - @echo "DOCKER PUSH: pushing - $(TARGET):$(PKG_VSN)."; + @echo "DOCKER PUSH: Push Docker image."; + @echo "DOCKER PUSH: pushing - $(TARGET):$(PKG_VSN)."; @if [ -n "$$(docker images -q $(TARGET):$(PKG_VSN))" ]; then \ docker push $(TARGET):$(PKG_VSN); \ @@ -131,7 +133,7 @@ docker-manifest-list: fi; \ done; \ eval $$version; \ - eval $$latest; + eval $$latest; for arch in $(ARCH_LIST); do \ case $${arch} in \ @@ -166,10 +168,10 @@ docker-manifest-list: fi; \ ;; \ esac; \ - done; + done; docker manifest inspect $(TARGET):$(PKG_VSN) - docker manifest push $(TARGET):$(PKG_VSN); + docker manifest push $(TARGET):$(PKG_VSN); docker manifest inspect $(TARGET):latest docker manifest push $(TARGET):latest; diff --git a/etc/BUILT_ON b/etc/BUILT_ON index 2997223fa..43a77ec87 100644 --- a/etc/BUILT_ON +++ b/etc/BUILT_ON @@ -1 +1 @@ -{{built_on_arch}} +{{built_on_platform}} diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 833451de1..c89dde010 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.3.8"}). +-define(EMQX_RELEASE, {opensource, "4.3.9"}). -else. diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src index 4dc02511c..902585ffb 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src @@ -1,20 +1,18 @@ %% -*- mode: erlang -*- {VSN, - [ {<<"4.3.[0-9]">>, + [ {<<".*">>, %% load all plugins %% NOTE: this depends on the fact that emqx_dashboard is always %% the last application gets upgraded [ {apply, {emqx_rule_engine, load_providers, []}} , {restart_application, emqx_dashboard} , {apply, {emqx_plugins, load, []}} - ]}, - {<<".*">>, []} + ]} ], - [ {<<"4.3.[0-9]">>, + [ {<<".*">>, [ {apply, {emqx_rule_engine, load_providers, []}} , {restart_application, emqx_dashboard} , {apply, {emqx_plugins, load, []}} - ]}, - {<<".*">>, []} + ]} ] }. diff --git a/rebar.config.erl b/rebar.config.erl index 901027d2d..1000a2c92 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -173,11 +173,24 @@ relx(Vsn, RelType, PkgType) -> , {vm_args,false} , {release, {emqx, Vsn}, relx_apps(RelType)} , {overlay, relx_overlay(RelType)} - , {overlay_vars, [ {built_on_arch, rebar_utils:get_arch()} + , {overlay_vars, [ {built_on_platform, built_on()} , {emqx_description, emqx_description(RelType, IsEnterprise)} | overlay_vars(RelType, PkgType, IsEnterprise)]} ]. +built_on() -> + On = rebar_utils:get_arch(), + case distro() of + false -> On; + Distro -> On ++ "-" ++ Distro + end. + +distro() -> + case os:type() of + {unix, _} -> string:strip(os:cmd("scripts/get-distro.sh"), both, $\n); + _ -> false + end. + emqx_description(cloud, true) -> "EMQ X Enterprise"; emqx_description(cloud, false) -> "EMQ X Broker"; emqx_description(edge, _) -> "EMQ X Edge". diff --git a/scripts/get-distro.sh b/scripts/get-distro.sh new file mode 100755 index 000000000..ae52abba3 --- /dev/null +++ b/scripts/get-distro.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +## This script prints Linux distro name and its version number +## e.g. macos, centos8, ubuntu20.04 + +set -euo pipefail + +if [ "$(uname -s)" = 'Darwin' ]; then + echo 'macos' +elif [ "$(uname -s)" = 'Linux' ]; then + if grep -q -i 'centos' /etc/*-release; then + DIST='centos' + VERSION_ID="$(rpm --eval '%{centos_ver}')" + else + DIST="$(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g')" + VERSION_ID="$(sed -n '/^VERSION_ID=/p' /etc/os-release | sed -r 's/VERSION_ID=(.*)/\1/g' | sed 's/"//g')" + fi + echo "${DIST}${VERSION_ID}" | sed -r 's/([a-zA-Z]*)-.*/\1/g' +fi diff --git a/src/emqx.appup.src b/src/emqx.appup.src index d4f9d43ce..326c0aaf0 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,21 +1,27 @@ %% -*- mode: erlang -*- {VSN, [{"4.3.9", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -24,7 +30,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, @@ -34,7 +42,8 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -46,7 +55,8 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -59,7 +69,8 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, @@ -137,21 +148,27 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.9", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -160,7 +177,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -170,7 +189,8 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -182,7 +202,8 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -195,7 +216,8 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index e3cbff692..7bfef472d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -977,8 +977,11 @@ handle_info({sock_closed, Reason}, Channel = Shutdown -> Shutdown end; -handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?LOG(error, "Unexpected sock_closed: ~p", [Reason]), +handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> + %% Since sock_closed messages can be generated multiple times, + %% we can simply ignore errors of this type in the disconnected state. + %% e.g. when the socket send function returns an error, there is already + %% a tcp_closed delivered to the process mailbox {ok, Channel}; handle_info(clean_acl_cache, Channel) -> diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 61982f569..23f078568 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -72,7 +72,7 @@ ]). %% Internal export --export([stats_fun/0]). +-export([stats_fun/0, clean_down/1]). -type(chan_pid() :: pid()). @@ -93,7 +93,9 @@ %% Server name -define(CM, ?MODULE). --define(T_TAKEOVER, 15000). +-define(T_KICK, 5_000). +-define(T_GET_INFO, 5_000). +-define(T_TAKEOVER, 15_000). %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). @@ -164,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_info(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Update infos of the channel. -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()). @@ -189,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Set channel's stats. -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()). @@ -257,7 +259,7 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), + ?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -269,77 +271,113 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> + %% TODO: if takeover times out, maybe kill the old? Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, ConnMod, ChanPid, Session} end; - takeover_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]). + rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). %% @doc Discard all the sessions identified by the ClientId. -spec(discard_session(emqx_types:clientid()) -> ok). discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; - ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -do_discard_session(ClientId, Pid) -> +%% @private Kick a local stale session to force it step down. +%% If failed to kick (e.g. timeout) force a kill. +%% Keeping the stale pid around, or returning error or raise an exception +%% benefits nobody. +-spec kick_or_kill(kick | discard, module(), pid()) -> ok. +kick_or_kill(Action, ConnMod, Pid) -> try - discard_session(ClientId, Pid) + %% this is essentailly a gen_server:call implemented in emqx_connection + %% and emqx_ws_connection. + %% the handle_call is implemented in emqx_channel + ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) catch _ : noproc -> % emqx_ws_connection: call - ?tp(debug, "session_already_gone", #{pid => Pid}), - ok; + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); _ : {noproc, _} -> % emqx_connection: gen_server:call - ?tp(debug, "session_already_gone", #{pid => Pid}), - ok; - _ : {'EXIT', {noproc, _}} -> % rpc_call/3 - ?tp(debug, "session_already_gone", #{pid => Pid}), - ok; + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + _ : {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); _ : {{shutdown, _}, _} -> - ?tp(debug, "session_already_shutdown", #{pid => Pid}), - ok; + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "session_kick_timeout", + #{pid => Pid, + action => Action, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid); _ : Error : St -> - ?tp(error, "failed_to_discard_session", - #{pid => Pid, reason => Error, stacktrace=>St}) + ?tp(error, "session_kick_exception", + #{pid => Pid, + action => Action, + reason => Error, + stacktrace => St, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid) end. -discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chann_conn_mod(ClientId, ChanPid) of - undefined -> ok; - ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard, ?T_TAKEOVER) - end; +force_kill(Pid) -> + exit(Pid, kill), + ok. + +stale_channel_info(Pid) -> + process_info(Pid, [status, message_queue_len, current_stacktrace]). discard_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]). + kick_session(discard, ClientId, ChanPid). + +kick_session(ClientId, ChanPid) -> + kick_session(kick, ClientId, ChanPid). + +%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). +kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> + case get_chann_conn_mod(ClientId, ChanPid) of + undefined -> + %% already deregistered + ok; + ConnMod when is_atom(ConnMod) -> + ok = kick_or_kill(Action, ConnMod, ChanPid) + end; +kick_session(Action, ClientId, ChanPid) -> + %% call remote node on the old APIs because we do not know if they have upgraded + %% to have kick_session/3 + Function = case Action of + discard -> discard_session; + kick -> kick_session + end, + try + rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK) + catch + Error : Reason -> + %% This should mostly be RPC failures. + %% However, if the node is still running the old version + %% code (prior to emqx app 4.3.10) some of the RPC handler + %% exceptions may get propagated to a new version node + ?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p", + [node(ChanPid), Action, Error, Reason]) + end. kick_session(ClientId) -> case lookup_channels(ClientId) of - [] -> {error, not_found}; - [ChanPid] -> - kick_session(ClientId, ChanPid); + [] -> + ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]), + ok; ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), - lists:foreach(fun(StalePid) -> - catch discard_session(ClientId, StalePid) - end, StalePids), - kick_session(ClientId, ChanPid) + case length(ChanPids) > 1 of + true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]); + false -> ok + end, + lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) end. -kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(ClientId, ChanPid) of - #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick, ?T_TAKEOVER); - undefined -> - {error, not_found} - end; - -kick_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]). - %% @doc Is clean start? % is_clean_start(#{clean_start := false}) -> false; % is_clean_start(_Attrs) -> true. @@ -375,10 +413,16 @@ lookup_channels(local, ClientId) -> [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of - {badrpc, Reason} -> error(Reason); - Res -> Res +rpc_call(Node, Fun, Args, Timeout) -> + case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of + {badrpc, Reason} -> + %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler + %% should catch all exceptions and always return 'ok'. + %% This leaves 'badrpc' only possible when there is problem + %% calling the remote node. + error({badrpc, Reason}); + Res -> + Res end. %% @private @@ -411,7 +455,7 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), + ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> @@ -447,5 +491,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chann_conn_mod(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 3c891240a..acafeb36f 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -32,6 +32,12 @@ conn_mod => emqx_connection, receive_maximum => 100}}). +-define(WAIT(PATTERN, TIMEOUT, RET), + fun() -> + receive PATTERN -> RET + after TIMEOUT -> error({timeout, ?LINE}) end + end()). + %%-------------------------------------------------------------------- %% CT callbacks %%-------------------------------------------------------------------- @@ -180,25 +186,95 @@ t_open_session_race_condition(_) -> ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync ?assertEqual([], emqx_cm:lookup_channels(ClientId)). -t_discard_session(_) -> +t_kick_session_discard_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_discard_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_discard_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_discard_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_discard_noproc(_) -> + test_kick_session(discard, noproc). + +t_kick_session_kick_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_kick_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_kick_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_kick_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_kick_noproc(_) -> + test_kick_session(discard, noproc). + +test_kick_session(Action, Reason) -> ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, - ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), + FakeSessionFun = + fun Loop() -> + receive + {'$gen_call', From, A} when A =:= kick orelse + A =:= discard -> + case Reason of + normal -> + gen_server:reply(From, ok); + timeout -> + %% no response to the call + Loop(); + _ -> + exit(Reason) + end; + Msg -> + ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), + Loop() + end + end, + {Pid1, _} = spawn_monitor(FakeSessionFun), + {Pid2, _} = spawn_monitor(FakeSessionFun), + ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo), + ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), + case Reason of + noproc -> exit(Pid1, kill), exit(Pid2, kill); + _ -> ok + end, + ok = case Action of + kick -> emqx_cm:kick_session(ClientId); + discard -> emqx_cm:discard_session(ClientId) + end, + case Reason =:= timeout orelse Reason =:= noproc of + true -> + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)); + false -> + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) + end, + ok = flush_emqx_pool(), + ?assertEqual([], emqx_cm:lookup_channels(ClientId)). - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), - ok = emqx_cm:discard_session(ClientId), - ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), - ok = emqx_cm:discard_session(ClientId), - ok = emqx_cm:unregister_channel(ClientId), - ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), - ok = emqx_cm:discard_session(ClientId), - ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end), - ok = emqx_cm:discard_session(ClientId), - ok = emqx_cm:unregister_channel(ClientId), - ok = meck:unload(emqx_connection). +%% Channel deregistration is delegated to emqx_pool as a sync tasks. +%% The emqx_pool is pool of workers, and there is no way to know +%% which worker was picked for the last deregistration task. +%% This help function creates a large enough number of async tasks +%% to sync with the pool workers. +%% The number of tasks should be large enough to ensure all workers have +%% the chance to work on at least one of the tasks. +flush_emqx_pool() -> + Self = self(), + L = lists:seq(1, 1000), + lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), + lists:foreach(fun(I) -> receive {done, I} -> ok end end, L). t_discard_session_race(_) -> ClientId = rand_client_id(), @@ -231,27 +307,6 @@ t_takeover_session(_) -> {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). -t_kick_session(_) -> - Info = #{conninfo := ConnInfo} = ?ChanInfo, - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end), - {error, not_found} = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), - test = emqx_cm:kick_session(<<"clientid">>), - erlang:spawn_link( - fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), - - timer:sleep(1000) - end), - ct:sleep(100), - test = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = meck:unload(emqx_connection). - t_all_channels(_) -> ?assertEqual(true, is_list(emqx_cm:all_channels())).