fix(conn): avoid storing incomplete channel info

This commit is contained in:
Ilya Averyanov 2024-01-11 17:46:06 +03:00
parent 3dd3f4803f
commit 7b0b2a0527
6 changed files with 105 additions and 12 deletions

View File

@ -133,6 +133,10 @@
%% Server name %% Server name
-define(CM, ?MODULE). -define(CM, ?MODULE).
-define(IS_CLIENTID(CLIENTID),
(is_binary(CLIENTID) orelse (is_atom(CLIENTID) andalso CLIENTID =/= undefined))
).
%% linting overrides %% linting overrides
-elvis([ -elvis([
{elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}, {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}},
@ -154,7 +158,7 @@ start_link() ->
emqx_types:infos(), emqx_types:infos(),
emqx_types:stats() emqx_types:stats()
) -> ok. ) -> ok.
insert_channel_info(ClientId, Info, Stats) -> insert_channel_info(ClientId, Info, Stats) when ?IS_CLIENTID(ClientId) ->
Chan = {ClientId, self()}, Chan = {ClientId, self()},
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
?tp(debug, insert_channel_info, #{clientid => ClientId}), ?tp(debug, insert_channel_info, #{clientid => ClientId}),
@ -168,7 +172,9 @@ insert_channel_info(ClientId, Info, Stats) ->
%% the conn_mod first for taking up the clientid access right. %% the conn_mod first for taking up the clientid access right.
%% %%
%% Note that: It should be called on a lock transaction %% Note that: It should be called on a lock transaction
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when
is_pid(ChanPid) andalso ?IS_CLIENTID(ClientId)
->
Chan = {ClientId, ChanPid}, Chan = {ClientId, ChanPid},
%% cast (for process monitor) before inserting ets tables %% cast (for process monitor) before inserting ets tables
cast({registered, Chan}), cast({registered, Chan}),
@ -180,7 +186,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid)
%% @doc Unregister a channel. %% @doc Unregister a channel.
-spec unregister_channel(emqx_types:clientid()) -> ok. -spec unregister_channel(emqx_types:clientid()) -> ok.
unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel(ClientId) when ?IS_CLIENTID(ClientId) ->
true = do_unregister_channel({ClientId, self()}), true = do_unregister_channel({ClientId, self()}),
ok. ok.
@ -215,7 +221,7 @@ get_chan_info(ClientId, ChanPid) ->
%% @doc Update infos of the channel. %% @doc Update infos of the channel.
-spec set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean(). -spec set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean().
set_chan_info(ClientId, Info) when is_binary(ClientId) -> set_chan_info(ClientId, Info) when ?IS_CLIENTID(ClientId) ->
Chan = {ClientId, self()}, Chan = {ClientId, self()},
try try
ets:update_element(?CHAN_INFO_TAB, Chan, {2, Info}) ets:update_element(?CHAN_INFO_TAB, Chan, {2, Info})
@ -245,12 +251,12 @@ get_chan_stats(ClientId, ChanPid) ->
%% @doc Set channel's stats. %% @doc Set channel's stats.
-spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean(). -spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean().
set_chan_stats(ClientId, Stats) when is_binary(ClientId) -> set_chan_stats(ClientId, Stats) when ?IS_CLIENTID(ClientId) ->
set_chan_stats(ClientId, self(), Stats). set_chan_stats(ClientId, self(), Stats).
-spec set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) -> -spec set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) ->
boolean(). boolean().
set_chan_stats(ClientId, ChanPid, Stats) -> set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) ->
Chan = {ClientId, ChanPid}, Chan = {ClientId, ChanPid},
try try
ets:update_element(?CHAN_INFO_TAB, Chan, {3, Stats}) ets:update_element(?CHAN_INFO_TAB, Chan, {3, Stats})

View File

@ -638,8 +638,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
emqx_cm:set_chan_info(ClientId, info(State)), emqx_cm:set_chan_info(ClientId, info(State)),
{ok, State}; {ok, State};
handle_msg({event, _Other}, State = #state{channel = Channel}) -> handle_msg({event, _Other}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel), case emqx_channel:info(clientid, Channel) of
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)), %% ClientId is yet unknown (i.e. connect packet is not received yet)
undefined -> ok;
ClientId -> emqx_cm:insert_channel_info(ClientId, info(State), stats(State))
end,
{ok, State}; {ok, State};
handle_msg({timeout, TRef, TMsg}, State) -> handle_msg({timeout, TRef, TMsg}, State) ->
handle_timeout(TRef, TMsg, State); handle_timeout(TRef, TMsg, State);

View File

@ -533,9 +533,14 @@ handle_info({event, disconnected}, State = #state{channel = Channel}) ->
emqx_cm:set_chan_info(ClientId, info(State)), emqx_cm:set_chan_info(ClientId, info(State)),
return(State); return(State);
handle_info({event, _Other}, State = #state{channel = Channel}) -> handle_info({event, _Other}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel), case emqx_channel:info(clientid, Channel) of
emqx_cm:set_chan_info(ClientId, info(State)), %% ClientId is yet unknown (i.e. connect packet is not received yet)
emqx_cm:set_chan_stats(ClientId, stats(State)), undefined ->
ok;
ClientId ->
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:set_chan_stats(ClientId, stats(State))
end,
return(State); return(State);
handle_info(Info, State) -> handle_info(Info, State) ->
with_channel(handle_info, [Info], State). with_channel(handle_info, [Info], State).

View File

@ -81,7 +81,12 @@ t_get_set_chan_info(_) ->
true = emqx_cm:set_chan_info(<<"clientid">>, Info1), true = emqx_cm:set_chan_info(<<"clientid">>, Info1),
?assertEqual(Info1, emqx_cm:get_chan_info(<<"clientid">>)), ?assertEqual(Info1, emqx_cm:get_chan_info(<<"clientid">>)),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)). ?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)),
?assertError(
function_clause,
emqx_cm:insert_channel_info(undefined, #{}, [])
).
t_get_set_chan_stats(_) -> t_get_set_chan_stats(_) ->
Stats = [{recv_oct, 10}, {send_oct, 8}], Stats = [{recv_oct, 10}, {send_oct, 8}],
@ -93,6 +98,12 @@ t_get_set_chan_stats(_) ->
Stats1 = [{recv_oct, 10} | Stats], Stats1 = [{recv_oct, 10} | Stats],
true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1), true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
?assertEqual(Stats1, emqx_cm:get_chan_stats(<<"clientid">>)), ?assertEqual(Stats1, emqx_cm:get_chan_stats(<<"clientid">>)),
?assertError(
function_clause,
emqx_cm:set_chan_stats(undefined, [])
),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)). ?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).

View File

@ -0,0 +1,67 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connection_conninfo_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(Case, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"listeners.tcp.default {\n"
" tcp_options {\n"
" recbuf = 10\n"
" buffer = 10\n"
" active_n = 1\n"
" }\n"
" messages_rate = \"1/1s\"\n"
" bytes_rate = \"1KB/1s\"\n"
"}\n"}
],
#{work_dir => emqx_cth_suite:work_dir(Case, Config)}
),
[{apps, Apps} | Config].
end_per_testcase(_Case, Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
t_inconsistent_chan_info(_Config) ->
{ok, C} = emqtt:start_link([{clientid, emqx_guid:to_hexstr(emqx_guid:gen())}]),
{ok, _} = emqtt:connect(C),
ok = emqtt:disconnect(C),
ClientIds = [
ClientId
|| {ClientId, _ConnState, _ConnInfo, _ClientInfo} <- qlc:eval(
emqx_cm:all_channels_table([emqx_connection])
)
],
?assertNot(lists:member(undefined, ClientIds)).

View File

@ -0,0 +1 @@
Eliminated passing incomplete client/connection information into `emqx_cm`. This could lead to internal inconsistency and affect memory consumption and some processes like node evacuation.