Merge remote-tracking branch 'origin/release-v43' into release-v44

This commit is contained in:
Zaiming (Stone) Shi 2022-11-03 21:58:25 +01:00
commit 279046c51e
13 changed files with 255 additions and 83 deletions

View File

@ -1,8 +1,4 @@
name: 'Detect profiles'
inputs:
ci_git_token:
required: true
type: string
outputs:
profiles:
description: 'Detected profiles'
@ -14,7 +10,6 @@ runs:
- id: detect-profiles
shell: bash
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
if [ -d source ]; then
## source code downloaded
cd source
@ -24,11 +19,9 @@ runs:
exit 1
fi
if [ -f 'EMQX_ENTERPRISE' ]; then
echo "::set-output name=profiles::[\"emqx-ee\"]"
echo "https://ci%40emqx.io:${{ inputs.ci_git_token }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
echo "profiles=[\"emqx-ee\"]" >> $GITHUB_OUTPUT
echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV
else
echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
echo "profiles=[\"emqx\", \"emqx-edge\"]" >> $GITHUB_OUTPUT
echo "EMQX_NAME=emqx" >> $GITHUB_ENV
fi

View File

@ -27,27 +27,23 @@ runs:
shell: bash
run: |
brew update
brew install curl zip unzip gnu-sed kerl coreutils unixodbc freetds openssl@1.1
brew install curl zip unzip gnu-sed coreutils unixodbc freetds openssl@1.1
echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH
echo "/usr/local/bin" >> $GITHUB_PATH
- uses: actions/cache@v2
- uses: actions/cache@v3
id: cache
with:
path: ~/.kerl/${{ inputs.otp }}
path: /opt/erlang/${{ inputs.otp }}
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
restore-keys: |
otp-install-${{ inputs.otp }}-${{ inputs.os }}
- name: build erlang
if: steps.cache.outputs.cache-hit != 'true'
shell: bash
env:
KERL_BUILD_BACKEND: git
OTP_GITHUB_URL: https://github.com/emqx/otp
KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit
run: |
kerl update releases
kerl build ${{ inputs.otp }}
kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }}
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git $HOME/otp-${{ inputs.otp }}
cd $HOME/otp-${{ inputs.otp }}
./configure --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit --prefix=/opt/erlang/${{ inputs.otp }}
make -j$(nproc)
sudo make install
- name: build
env:
AUTO_INSTALL_BUILD_DEPS: 1
@ -60,7 +56,7 @@ runs:
APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ inputs.apple_developer_id_bundle_password }}
shell: bash
run: |
. $HOME/.kerl/${{ inputs.otp }}/activate
export PATH="/opt/erlang/${{ inputs.otp }}/bin:$PATH"
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
make ${EMQX_NAME}-zip

View File

@ -26,14 +26,12 @@ jobs:
profiles: ${{ steps.detect-profiles.outputs.profiles}}
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
path: source
fetch-depth: 0
- id: detect-profiles
- name: detect-profiles
id: detect-profiles
uses: ./source/.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- name: get_all_deps
if: endsWith(github.repository, 'emqx')
run: |
@ -97,7 +95,7 @@ jobs:
echo "EQMX installed"
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx uninstall
echo "EQMX uninstaled"
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
with:
name: ${{ matrix.profile }}-windows
path: source/_packages/${{ matrix.profile }}/.
@ -114,7 +112,7 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/download-artifact@v2
- uses: actions/download-artifact@v3
with:
name: source
path: .
@ -123,10 +121,7 @@ jobs:
ln -s . source
unzip -q source.zip
rm source source.zip
- id: detect-profiles
uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- uses: ./.github/actions/detect-profiles
- uses: ./.github/actions/package-macos
with:
otp: ${{ matrix.otp }}
@ -135,7 +130,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
with:
name: ${EMQX_NAME}-${{ matrix.otp }}
path: _packages/${EMQX_NAME}/.

View File

@ -36,12 +36,15 @@ jobs:
steps:
- uses: actions/checkout@v1
- uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- name: fix-git-unsafe-repository
run: git config --global --add safe.directory /__w/emqx/emqx
- uses: actions/cache@v2
- uses: ./.github/actions/detect-profiles
- name: ensure access to github
if: endsWith(github.repository, 'enterprise')
run: |
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
- uses: actions/cache@v3
with:
# dialyzer PLTs
path: ~/.cache/rebar3/
@ -54,7 +57,7 @@ jobs:
run: make ${EMQX_NAME}-zip
- name: build deb/rpm packages
run: make ${EMQX_NAME}-pkg
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
if: failure()
with:
name: rebar3.crashdump
@ -64,7 +67,7 @@ jobs:
export CODE_PATH="$GITHUB_WORKSPACE"
.ci/build_packages/tests.sh "${EMQX_NAME}" zip
.ci/build_packages/tests.sh "${EMQX_NAME}" pkg
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: ${{ matrix.os }}
path: _packages/**/*.zip
@ -116,10 +119,13 @@ jobs:
- macos-11
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- name: ensure access to github
if: endsWith(github.repository, 'enterprise')
run: |
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
- uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- uses: ./.github/actions/package-macos
with:
otp: ${{ matrix.otp }}
@ -128,13 +134,12 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
if: failure()
with:
name: rebar3.crashdump
path: ./rebar3.crashdump
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: macos
path: _packages/**/*.zip

View File

@ -13,14 +13,9 @@ jobs:
profiles: ${{ steps.detect-profiles.outputs.profiles}}
steps:
- uses: actions/checkout@v2
with:
path: source
fetch-depth: 0
- uses: actions/checkout@v3
- id: detect-profiles
uses: ./source/.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
uses: ./.github/actions/detect-profiles
upload:
runs-on: ubuntu-20.04
@ -59,12 +54,10 @@ jobs:
-X POST \
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }}
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: actions/checkout@v3
- name: get version
id: version
run: echo "::set-output name=version::$(./pkg-vsn.sh)"
run: echo "version=$(./pkg-vsn.sh)" >> $GITHUB_OUTPUT
- uses: emqx/push-helm-action@v1
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
with:

View File

@ -277,7 +277,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
_ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok,
init_resource, InitArgs)
catch throw : Reason ->
?LOG(error, "create_resource failed: ~0p", [Reason])
?LOG_SENSITIVE(warning, "create_resource failed: ~0p", [Reason])
end,
{ok, Resource};
no_retry ->
@ -285,6 +285,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
_ = ?CLUSTER_CALL(init_resource, InitArgs),
{ok, Resource}
catch throw : Reason ->
?LOG_SENSITIVE(error, "create_resource failed: ~0p", [Reason]),
{error, Reason}
end
end;

View File

@ -136,16 +136,22 @@ t_preproc_sql5(_) ->
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
t_if_contains_placeholder(_) ->
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)),
ok.
TestTab =
[ {true, "${a}"}
, {true, "${a}${b}"}
, {true, "${a},${b},${c}"}
, {true, "a:${a}"}
, {true, "a:${a},b:${b}"}
, {true, "abc${ab}"}
, {true, "a${ab${c}${e"}
, {false, "a"}
, {false, "abc$"}
, {false, "abc${"}
, {false, "abc${a"}
, {false, "abc${ab"}
, {false, "a${ab${c${e"}
],
lists:foreach(fun({Expected, InputStr}) ->
?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(InputStr)),
?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(iolist_to_binary(InputStr)))
end, TestTab).

View File

@ -56,7 +56,10 @@ groups() ->
{rulesql_select_events, [],
[ t_sqlparse_event_client_connected_01
, t_sqlparse_event_client_connected_02
, t_sqlparse_event_client_disconnected
, t_sqlparse_event_client_disconnected_normal
, t_sqlparse_event_client_disconnected_kicked
, t_sqlparse_event_client_disconnected_discarded
, t_sqlparse_event_client_disconnected_takeovered
, t_sqlparse_event_session_subscribed
, t_sqlparse_event_session_unsubscribed
, t_sqlparse_event_message_delivered
@ -145,6 +148,12 @@ end_per_group(_Groupname, _Config) ->
%% Testcase specific setup/teardown
%%------------------------------------------------------------------------------
init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
application:set_env(emqx, client_disconnect_discarded, true),
Config;
init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
application:set_env(emqx, client_disconnect_takeovered, true),
Config;
init_per_testcase(_TestCase, Config) ->
init_events_counters(),
ok = emqx_rule_registry:register_resource_types(
@ -152,6 +161,12 @@ init_per_testcase(_TestCase, Config) ->
%ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]),
Config.
end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
application:set_env(emqx, client_disconnect_takeovered, false), %% back to default
Config;
end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
application:set_env(emqx, client_disconnect_discarded, false), %% back to default
Config;
end_per_testcase(_TestCase, _Config) ->
ok.
@ -523,9 +538,118 @@ t_sqlparse_event_client_connected_02(_Config) ->
emqx_rule_registry:remove_rule(TopicRule).
%% FROM $events/client_disconnected
t_sqlparse_event_client_disconnected(_Config) ->
%% TODO
ok.
t_sqlparse_event_client_disconnected_normal(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/normal">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
emqtt:disconnect(Client1),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_normal)
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlparse_event_client_disconnected_kicked(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/kicked">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(ClientRecvRepub),
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
emqx_cm:kick_session(<<"emqx">>),
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_kicked)
end,
emqtt:stop(ClientRecvRepub),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlparse_event_client_disconnected_discarded(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/discarded">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(ClientRecvRepub),
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]),
{ok, _} = emqtt:connect(Client2),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_discarded)
end,
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlparse_event_client_disconnected_takeovered(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/takeovered">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(ClientRecvRepub),
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]),
{ok, _} = emqtt:connect(Client2),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_discarded)
end,
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
emqx_rule_registry:remove_rule(TopicRule).
%% FROM $events/session_subscribed
t_sqlparse_event_session_subscribed(_Config) ->

View File

@ -36,9 +36,16 @@
For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`,
meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend.
- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267).
Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session).
Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios.
## Bug fixes
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185).
This is to avoid displaying floats like `0.30000000000000004` on the dashboard.

View File

@ -32,9 +32,14 @@
例如,`acl_order = jwt,http`,可以用于保证 `jwt` 这个模块总是排在 `http` 的前面,
也就是说,在对客户端进行 ACL 检查时,如果 JWT 不存在(或者没有定义 ACL那么回退到使用 HTTP。
- 为更多类型的 `client.disconnected` 事件(计数器触发)提供可配置项 [#9267](https://github.com/emqx/emqx/pull/9267)。
此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发,
但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。
可将 `broker.client_disconnect_discarded``broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。
## 修复
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。
避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。

View File

@ -2483,6 +2483,16 @@ broker.route_batch_clean = off
## - false: disable trie path compaction
# broker.perf.trie_compaction = true
## Enable client disconnect event will be triggered by which reasons.
## Value: on | off
## `takeover`: session was takenover by another client with same client ID. (clean_session = false)
## Default: off
## `discard`: session was takeover by another client with same client ID. (clean_session = true)
## Default: off
##
# broker.client_disconnect_discarded = off
# broker.client_disconnect_takeovered = off
## CONFIG_SECTION_BGN=sys_mon ==================================================
## Enable Long GC monitoring. Disable if the value is 0.

View File

@ -2596,6 +2596,20 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Configuration of disconnected event reason.
%% `takeover`: session was takenover by another client with same client ID. (clean_session = false)
%% `discard`: session was takeover by another client with same client ID. (clean_session = true)
{mapping, "broker.client_disconnect_discarded", "emqx.client_disconnect_discarded", [
{default, off},
{datatype, flag}
]}.
{mapping, "broker.client_disconnect_takeovered", "emqx.client_disconnect_takeovered", [
{default, off},
{datatype, flag}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -997,7 +997,13 @@ handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
%% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
handle_call({takeover, 'begin'}, Channel = #channel{
session = Session,
conninfo = #{clientid := ClientId}
}) ->
?tp(debug,
emqx_channel_takeover_begin,
#{clientid => ClientId}),
reply(Session, Channel#channel{takeover = true});
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
@ -1736,7 +1742,16 @@ parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
%%--------------------------------------------------------------------
%% Ensure disconnected
%% Maybe & Ensure disconnected
ensure_disconnected(connected, Reason, Channel)
when Reason =:= discarded orelse Reason =:= takeovered ->
case is_disconnect_event_enabled(Reason) of
true -> ensure_disconnected(Reason, Channel);
false -> Channel
end;
ensure_disconnected(_, _, Channel) ->
Channel.
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
@ -1833,12 +1848,15 @@ shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Packet, Channel) ->
{shutdown, Reason, Reply, Packet, Channel}.
%% mqtt v5 connected sessions
disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5
= #channel{conn_state = connected}) ->
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).
NChannel = ensure_disconnected(connected, Reason, Channel),
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
disconnect_and_shutdown(Reason, Reply, Channel= #channel{conn_state = ConnState}) ->
NChannel = ensure_disconnected(ConnState, Reason, Channel),
shutdown(Reason, Reply, NChannel).
sp(true) -> 1;
sp(false) -> 0.
@ -1846,6 +1864,11 @@ sp(false) -> 0.
flag(true) -> 1;
flag(false) -> 0.
is_disconnect_event_enabled(discarded) ->
emqx:get_env(client_disconnect_discarded, false);
is_disconnect_event_enabled(takeovered) ->
emqx:get_env(client_disconnect_takeovered, false).
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------