Merge pull request #9267 from JimMoen/1028-feat-disconnect-event-type
feat: options for trigger disconnected events by different reasons
This commit is contained in:
commit
9f47ee09fe
|
@ -56,7 +56,10 @@ groups() ->
|
|||
{rulesql_select_events, [],
|
||||
[ t_sqlparse_event_client_connected_01
|
||||
, t_sqlparse_event_client_connected_02
|
||||
, t_sqlparse_event_client_disconnected
|
||||
, t_sqlparse_event_client_disconnected_normal
|
||||
, t_sqlparse_event_client_disconnected_kicked
|
||||
, t_sqlparse_event_client_disconnected_discarded
|
||||
, t_sqlparse_event_client_disconnected_takeovered
|
||||
, t_sqlparse_event_session_subscribed
|
||||
, t_sqlparse_event_session_unsubscribed
|
||||
, t_sqlparse_event_message_delivered
|
||||
|
@ -145,6 +148,12 @@ end_per_group(_Groupname, _Config) ->
|
|||
%% Testcase specific setup/teardown
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
|
||||
application:set_env(emqx, client_disconnect_discarded, true),
|
||||
Config;
|
||||
init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
|
||||
application:set_env(emqx, client_disconnect_takeovered, true),
|
||||
Config;
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
init_events_counters(),
|
||||
ok = emqx_rule_registry:register_resource_types(
|
||||
|
@ -152,6 +161,12 @@ init_per_testcase(_TestCase, Config) ->
|
|||
%ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]),
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
|
||||
application:set_env(emqx, client_disconnect_takeovered, false), %% back to default
|
||||
Config;
|
||||
end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
|
||||
application:set_env(emqx, client_disconnect_discarded, false), %% back to default
|
||||
Config;
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ok.
|
||||
|
||||
|
@ -523,9 +538,118 @@ t_sqlparse_event_client_connected_02(_Config) ->
|
|||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
%% FROM $events/client_disconnected
|
||||
t_sqlparse_event_client_disconnected(_Config) ->
|
||||
%% TODO
|
||||
ok.
|
||||
t_sqlparse_event_client_disconnected_normal(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/normal">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
{ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
emqtt:disconnect(Client1),
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_normal)
|
||||
end,
|
||||
emqtt:stop(Client),
|
||||
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_sqlparse_event_client_disconnected_kicked(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/kicked">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
emqx_cm:kick_session(<<"emqx">>),
|
||||
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_kicked)
|
||||
end,
|
||||
emqtt:stop(ClientRecvRepub),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_sqlparse_event_client_disconnected_discarded(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/discarded">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_discarded)
|
||||
end,
|
||||
|
||||
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_sqlparse_event_client_disconnected_takeovered(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/takeovered">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_discarded)
|
||||
end,
|
||||
|
||||
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
%% FROM $events/session_subscribed
|
||||
t_sqlparse_event_session_subscribed(_Config) ->
|
||||
|
|
|
@ -36,9 +36,16 @@
|
|||
For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`,
|
||||
meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend.
|
||||
|
||||
|
||||
- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267).
|
||||
Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
|
||||
performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
|
||||
stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session).
|
||||
Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios.
|
||||
|
||||
## Bug fixes
|
||||
|
||||
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
||||
- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
||||
|
||||
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185).
|
||||
This is to avoid displaying floats like `0.30000000000000004` on the dashboard.
|
||||
|
|
|
@ -32,9 +32,14 @@
|
|||
例如,`acl_order = jwt,http`,可以用于保证 `jwt` 这个模块总是排在 `http` 的前面,
|
||||
也就是说,在对客户端进行 ACL 检查时,如果 JWT 不存在(或者没有定义 ACL),那么回退到使用 HTTP。
|
||||
|
||||
- 为更多类型的 `client.disconnected` 事件(计数器触发)提供可配置项 [#9267](https://github.com/emqx/emqx/pull/9267)。
|
||||
此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发,
|
||||
但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。
|
||||
可将 `broker.client_disconnect_discarded` 和 `broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。
|
||||
|
||||
## 修复
|
||||
|
||||
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
||||
- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
||||
|
||||
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。
|
||||
避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。
|
||||
|
|
|
@ -2455,6 +2455,16 @@ broker.route_batch_clean = off
|
|||
## - false: disable trie path compaction
|
||||
# broker.perf.trie_compaction = true
|
||||
|
||||
## Enable client disconnect event will be triggered by which reasons.
|
||||
## Value: on | off
|
||||
## `takeover`: session was takenover by another client with same client ID. (clean_session = false)
|
||||
## Default: off
|
||||
## `discard`: session was takeover by another client with same client ID. (clean_session = true)
|
||||
## Default: off
|
||||
##
|
||||
# broker.client_disconnect_discarded = off
|
||||
# broker.client_disconnect_takeovered = off
|
||||
|
||||
## CONFIG_SECTION_BGN=sys_mon ==================================================
|
||||
|
||||
## Enable Long GC monitoring. Disable if the value is 0.
|
||||
|
|
|
@ -2507,6 +2507,20 @@ end}.
|
|||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
%% @doc Configuration of disconnected event reason.
|
||||
%% `takeover`: session was takenover by another client with same client ID. (clean_session = false)
|
||||
%% `discard`: session was takeover by another client with same client ID. (clean_session = true)
|
||||
{mapping, "broker.client_disconnect_discarded", "emqx.client_disconnect_discarded", [
|
||||
{default, off},
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
{mapping, "broker.client_disconnect_takeovered", "emqx.client_disconnect_takeovered", [
|
||||
{default, off},
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% System Monitor
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -972,7 +972,13 @@ handle_call(discard, Channel) ->
|
|||
disconnect_and_shutdown(discarded, ok, Channel);
|
||||
|
||||
%% Session Takeover
|
||||
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
|
||||
handle_call({takeover, 'begin'}, Channel = #channel{
|
||||
session = Session,
|
||||
conninfo = #{clientid := ClientId}
|
||||
}) ->
|
||||
?tp(debug,
|
||||
emqx_channel_takeover_begin,
|
||||
#{clientid => ClientId}),
|
||||
reply(Session, Channel#channel{takeover = true});
|
||||
|
||||
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
|
||||
|
@ -1698,7 +1704,16 @@ parse_topic_filters(TopicFilters) ->
|
|||
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure disconnected
|
||||
%% Maybe & Ensure disconnected
|
||||
|
||||
ensure_disconnected(connected, Reason, Channel)
|
||||
when Reason =:= discarded orelse Reason =:= takeovered ->
|
||||
case is_disconnect_event_enabled(Reason) of
|
||||
true -> ensure_disconnected(Reason, Channel);
|
||||
false -> Channel
|
||||
end;
|
||||
ensure_disconnected(_, _, Channel) ->
|
||||
Channel.
|
||||
|
||||
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo}) ->
|
||||
|
@ -1793,12 +1808,15 @@ shutdown(Reason, Reply, Channel) ->
|
|||
shutdown(Reason, Reply, Packet, Channel) ->
|
||||
{shutdown, Reason, Reply, Packet, Channel}.
|
||||
|
||||
%% mqtt v5 connected sessions
|
||||
disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5
|
||||
= #channel{conn_state = connected}) ->
|
||||
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
|
||||
|
||||
disconnect_and_shutdown(Reason, Reply, Channel) ->
|
||||
shutdown(Reason, Reply, Channel).
|
||||
NChannel = ensure_disconnected(connected, Reason, Channel),
|
||||
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
|
||||
%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
|
||||
disconnect_and_shutdown(Reason, Reply, Channel= #channel{conn_state = ConnState}) ->
|
||||
NChannel = ensure_disconnected(ConnState, Reason, Channel),
|
||||
shutdown(Reason, Reply, NChannel).
|
||||
|
||||
sp(true) -> 1;
|
||||
sp(false) -> 0.
|
||||
|
@ -1806,6 +1824,11 @@ sp(false) -> 0.
|
|||
flag(true) -> 1;
|
||||
flag(false) -> 0.
|
||||
|
||||
is_disconnect_event_enabled(discarded) ->
|
||||
emqx:get_env(client_disconnect_discarded, false);
|
||||
is_disconnect_event_enabled(takeovered) ->
|
||||
emqx:get_env(client_disconnect_takeovered, false).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% For CT tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue