diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 92b95c7c3..660ac3cfe 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -133,6 +133,10 @@ %% Server name -define(CM, ?MODULE). +-define(IS_CLIENTID(CLIENTID), + (is_binary(CLIENTID) orelse (is_atom(CLIENTID) andalso CLIENTID =/= undefined)) +). + %% linting overrides -elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}, @@ -154,7 +158,7 @@ start_link() -> emqx_types:infos(), emqx_types:stats() ) -> ok. -insert_channel_info(ClientId, Info, Stats) -> +insert_channel_info(ClientId, Info, Stats) when ?IS_CLIENTID(ClientId) -> Chan = {ClientId, self()}, true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), ?tp(debug, insert_channel_info, #{clientid => ClientId}), @@ -168,7 +172,9 @@ insert_channel_info(ClientId, Info, Stats) -> %% the conn_mod first for taking up the clientid access right. %% %% Note that: It should be called on a lock transaction -register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> +register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when + is_pid(ChanPid) andalso ?IS_CLIENTID(ClientId) +-> Chan = {ClientId, ChanPid}, %% cast (for process monitor) before inserting ets tables cast({registered, Chan}), @@ -180,7 +186,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) %% @doc Unregister a channel. -spec unregister_channel(emqx_types:clientid()) -> ok. -unregister_channel(ClientId) when is_binary(ClientId) -> +unregister_channel(ClientId) when ?IS_CLIENTID(ClientId) -> true = do_unregister_channel({ClientId, self()}), ok. @@ -215,7 +221,7 @@ get_chan_info(ClientId, ChanPid) -> %% @doc Update infos of the channel. -spec set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean(). -set_chan_info(ClientId, Info) when is_binary(ClientId) -> +set_chan_info(ClientId, Info) when ?IS_CLIENTID(ClientId) -> Chan = {ClientId, self()}, try ets:update_element(?CHAN_INFO_TAB, Chan, {2, Info}) @@ -245,12 +251,12 @@ get_chan_stats(ClientId, ChanPid) -> %% @doc Set channel's stats. -spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean(). -set_chan_stats(ClientId, Stats) when is_binary(ClientId) -> +set_chan_stats(ClientId, Stats) when ?IS_CLIENTID(ClientId) -> set_chan_stats(ClientId, self(), Stats). -spec set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) -> boolean(). -set_chan_stats(ClientId, ChanPid, Stats) -> +set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) -> Chan = {ClientId, ChanPid}, try ets:update_element(?CHAN_INFO_TAB, Chan, {3, Stats}) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index fd7ce4148..d306464c1 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -638,8 +638,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) -> emqx_cm:set_chan_info(ClientId, info(State)), {ok, State}; handle_msg({event, _Other}, State = #state{channel = Channel}) -> - ClientId = emqx_channel:info(clientid, Channel), - emqx_cm:insert_channel_info(ClientId, info(State), stats(State)), + case emqx_channel:info(clientid, Channel) of + %% ClientId is yet unknown (i.e. connect packet is not received yet) + undefined -> ok; + ClientId -> emqx_cm:insert_channel_info(ClientId, info(State), stats(State)) + end, {ok, State}; handle_msg({timeout, TRef, TMsg}, State) -> handle_timeout(TRef, TMsg, State); diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 07329721a..4a25494ad 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -533,9 +533,14 @@ handle_info({event, disconnected}, State = #state{channel = Channel}) -> emqx_cm:set_chan_info(ClientId, info(State)), return(State); handle_info({event, _Other}, State = #state{channel = Channel}) -> - ClientId = emqx_channel:info(clientid, Channel), - emqx_cm:set_chan_info(ClientId, info(State)), - emqx_cm:set_chan_stats(ClientId, stats(State)), + case emqx_channel:info(clientid, Channel) of + %% ClientId is yet unknown (i.e. connect packet is not received yet) + undefined -> + ok; + ClientId -> + emqx_cm:set_chan_info(ClientId, info(State)), + emqx_cm:set_chan_stats(ClientId, stats(State)) + end, return(State); handle_info(Info, State) -> with_channel(handle_info, [Info], State). diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 7a7a989cd..4ecea9a4b 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -81,7 +81,12 @@ t_get_set_chan_info(_) -> true = emqx_cm:set_chan_info(<<"clientid">>, Info1), ?assertEqual(Info1, emqx_cm:get_chan_info(<<"clientid">>)), ok = emqx_cm:unregister_channel(<<"clientid">>), - ?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)). + ?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)), + + ?assertError( + function_clause, + emqx_cm:insert_channel_info(undefined, #{}, []) + ). t_get_set_chan_stats(_) -> Stats = [{recv_oct, 10}, {send_oct, 8}], @@ -93,6 +98,12 @@ t_get_set_chan_stats(_) -> Stats1 = [{recv_oct, 10} | Stats], true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1), ?assertEqual(Stats1, emqx_cm:get_chan_stats(<<"clientid">>)), + + ?assertError( + function_clause, + emqx_cm:set_chan_stats(undefined, []) + ), + ok = emqx_cm:unregister_channel(<<"clientid">>), ?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)). diff --git a/apps/emqx/test/emqx_connection_conninfo_SUITE.erl b/apps/emqx/test/emqx_connection_conninfo_SUITE.erl new file mode 100644 index 000000000..56e2f0e83 --- /dev/null +++ b/apps/emqx/test/emqx_connection_conninfo_SUITE.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connection_conninfo_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(Case, Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "listeners.tcp.default {\n" + " tcp_options {\n" + " recbuf = 10\n" + " buffer = 10\n" + " active_n = 1\n" + " }\n" + " messages_rate = \"1/1s\"\n" + " bytes_rate = \"1KB/1s\"\n" + "}\n"} + ], + #{work_dir => emqx_cth_suite:work_dir(Case, Config)} + ), + [{apps, Apps} | Config]. + +end_per_testcase(_Case, Config) -> + emqx_cth_suite:stop(?config(apps, Config)). + +t_inconsistent_chan_info(_Config) -> + {ok, C} = emqtt:start_link([{clientid, emqx_guid:to_hexstr(emqx_guid:gen())}]), + {ok, _} = emqtt:connect(C), + ok = emqtt:disconnect(C), + + ClientIds = [ + ClientId + || {ClientId, _ConnState, _ConnInfo, _ClientInfo} <- qlc:eval( + emqx_cm:all_channels_table([emqx_connection]) + ) + ], + + ?assertNot(lists:member(undefined, ClientIds)). diff --git a/changes/ce/fix-12305.en.md b/changes/ce/fix-12305.en.md new file mode 100644 index 000000000..8ab8ea119 --- /dev/null +++ b/changes/ce/fix-12305.en.md @@ -0,0 +1 @@ +Eliminated passing incomplete client/connection information into `emqx_cm`. This could lead to internal inconsistency and affect memory consumption and some processes like node evacuation.