diff --git a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl index f4f329e87..c2941c810 100644 --- a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl @@ -56,7 +56,10 @@ groups() -> {rulesql_select_events, [], [ t_sqlparse_event_client_connected_01 , t_sqlparse_event_client_connected_02 - , t_sqlparse_event_client_disconnected + , t_sqlparse_event_client_disconnected_normal + , t_sqlparse_event_client_disconnected_kicked + , t_sqlparse_event_client_disconnected_discarded + , t_sqlparse_event_client_disconnected_takeovered , t_sqlparse_event_session_subscribed , t_sqlparse_event_session_unsubscribed , t_sqlparse_event_message_delivered @@ -145,6 +148,12 @@ end_per_group(_Groupname, _Config) -> %% Testcase specific setup/teardown %%------------------------------------------------------------------------------ +init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) -> + application:set_env(emqx, client_disconnect_discarded, true), + Config; +init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) -> + application:set_env(emqx, client_disconnect_takeovered, true), + Config; init_per_testcase(_TestCase, Config) -> init_events_counters(), ok = emqx_rule_registry:register_resource_types( @@ -152,6 +161,12 @@ init_per_testcase(_TestCase, Config) -> %ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]), Config. +end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) -> + application:set_env(emqx, client_disconnect_takeovered, false), %% back to default + Config; +end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) -> + application:set_env(emqx, client_disconnect_discarded, false), %% back to default + Config; end_per_testcase(_TestCase, _Config) -> ok. @@ -523,9 +538,118 @@ t_sqlparse_event_client_connected_02(_Config) -> emqx_rule_registry:remove_rule(TopicRule). %% FROM $events/client_disconnected -t_sqlparse_event_client_disconnected(_Config) -> - %% TODO - ok. +t_sqlparse_event_client_disconnected_normal(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/normal">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + emqtt:disconnect(Client1), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_normal) + end, + emqtt:stop(Client), + + emqx_rule_registry:remove_rule(TopicRule). + +t_sqlparse_event_client_disconnected_kicked(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/kicked">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + emqx_cm:kick_session(<<"emqx">>), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_kicked) + end, + emqtt:stop(ClientRecvRepub), + emqx_rule_registry:remove_rule(TopicRule). + +t_sqlparse_event_client_disconnected_discarded(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/discarded">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} + + {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]), + {ok, _} = emqtt:connect(Client2), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + emqtt:stop(ClientRecvRepub), emqtt:stop(Client2), + emqx_rule_registry:remove_rule(TopicRule). + +t_sqlparse_event_client_disconnected_takeovered(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/takeovered">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} + + {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]), + {ok, _} = emqtt:connect(Client2), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + emqtt:stop(ClientRecvRepub), emqtt:stop(Client2), + emqx_rule_registry:remove_rule(TopicRule). %% FROM $events/session_subscribed t_sqlparse_event_session_subscribed(_Config) -> diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index c902fbf61..ba4b7ddbd 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -36,9 +36,16 @@ For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`, meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend. + +- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267). + Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client + performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a + stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session). + Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios. + ## Bug fixes -- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224). +- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224). - Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185). This is to avoid displaying floats like `0.30000000000000004` on the dashboard. diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index b969dd4c2..0b245d698 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -32,9 +32,14 @@ 例如,`acl_order = jwt,http`,可以用于保证 `jwt` 这个模块总是排在 `http` 的前面, 也就是说,在对客户端进行 ACL 检查时,如果 JWT 不存在(或者没有定义 ACL),那么回退到使用 HTTP。 +- 为更多类型的 `client.disconnected` 事件(计数器触发)提供可配置项 [#9267](https://github.com/emqx/emqx/pull/9267)。 + 此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发, + 但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。 + 可将 `broker.client_disconnect_discarded` 和 `broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。 + ## 修复 -- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。 +- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。 - 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。 避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。 diff --git a/etc/emqx.conf b/etc/emqx.conf index a5dd13cc3..f23b6f841 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2455,6 +2455,16 @@ broker.route_batch_clean = off ## - false: disable trie path compaction # broker.perf.trie_compaction = true +## Enable client disconnect event will be triggered by which reasons. +## Value: on | off +## `takeover`: session was takenover by another client with same client ID. (clean_session = false) +## Default: off +## `discard`: session was takeover by another client with same client ID. (clean_session = true) +## Default: off +## +# broker.client_disconnect_discarded = off +# broker.client_disconnect_takeovered = off + ## CONFIG_SECTION_BGN=sys_mon ================================================== ## Enable Long GC monitoring. Disable if the value is 0. diff --git a/priv/emqx.schema b/priv/emqx.schema index 6c12125cb..0399cb27d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2507,6 +2507,20 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Configuration of disconnected event reason. +%% `takeover`: session was takenover by another client with same client ID. (clean_session = false) +%% `discard`: session was takeover by another client with same client ID. (clean_session = true) +{mapping, "broker.client_disconnect_discarded", "emqx.client_disconnect_discarded", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "broker.client_disconnect_takeovered", "emqx.client_disconnect_takeovered", [ + {default, off}, + {datatype, flag} +]}. + + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 021ab6610..4cae29348 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -972,7 +972,13 @@ handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); %% Session Takeover -handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> +handle_call({takeover, 'begin'}, Channel = #channel{ + session = Session, + conninfo = #{clientid := ClientId} + }) -> + ?tp(debug, + emqx_channel_takeover_begin, + #{clientid => ClientId}), reply(Session, Channel#channel{takeover = true}); handle_call({takeover, 'end'}, Channel = #channel{session = Session, @@ -1698,7 +1704,16 @@ parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). %%-------------------------------------------------------------------- -%% Ensure disconnected +%% Maybe & Ensure disconnected + +ensure_disconnected(connected, Reason, Channel) + when Reason =:= discarded orelse Reason =:= takeovered -> + case is_disconnect_event_enabled(Reason) of + true -> ensure_disconnected(Reason, Channel); + false -> Channel + end; +ensure_disconnected(_, _, Channel) -> + Channel. ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> @@ -1793,12 +1808,15 @@ shutdown(Reason, Reply, Channel) -> shutdown(Reason, Reply, Packet, Channel) -> {shutdown, Reason, Reply, Packet, Channel}. +%% mqtt v5 connected sessions disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5 = #channel{conn_state = connected}) -> - shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel); - -disconnect_and_shutdown(Reason, Reply, Channel) -> - shutdown(Reason, Reply, Channel). + NChannel = ensure_disconnected(connected, Reason, Channel), + shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel); +%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions +disconnect_and_shutdown(Reason, Reply, Channel= #channel{conn_state = ConnState}) -> + NChannel = ensure_disconnected(ConnState, Reason, Channel), + shutdown(Reason, Reply, NChannel). sp(true) -> 1; sp(false) -> 0. @@ -1806,6 +1824,11 @@ sp(false) -> 0. flag(true) -> 1; flag(false) -> 0. +is_disconnect_event_enabled(discarded) -> + emqx:get_env(client_disconnect_discarded, false); +is_disconnect_event_enabled(takeovered) -> + emqx:get_env(client_disconnect_takeovered, false). + %%-------------------------------------------------------------------- %% For CT tests %%--------------------------------------------------------------------