Merge pull request #12305 from savonarola/1101-fix-channel-info-update
fix(conn): avoid storing incomplete channel info
This commit is contained in:
commit
ef0850c71f
|
@ -133,6 +133,10 @@
|
|||
%% Server name
|
||||
-define(CM, ?MODULE).
|
||||
|
||||
-define(IS_CLIENTID(CLIENTID),
|
||||
(is_binary(CLIENTID) orelse (is_atom(CLIENTID) andalso CLIENTID =/= undefined))
|
||||
).
|
||||
|
||||
%% linting overrides
|
||||
-elvis([
|
||||
{elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}},
|
||||
|
@ -154,7 +158,7 @@ start_link() ->
|
|||
emqx_types:infos(),
|
||||
emqx_types:stats()
|
||||
) -> ok.
|
||||
insert_channel_info(ClientId, Info, Stats) ->
|
||||
insert_channel_info(ClientId, Info, Stats) when ?IS_CLIENTID(ClientId) ->
|
||||
Chan = {ClientId, self()},
|
||||
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
|
||||
?tp(debug, insert_channel_info, #{clientid => ClientId}),
|
||||
|
@ -168,7 +172,9 @@ insert_channel_info(ClientId, Info, Stats) ->
|
|||
%% the conn_mod first for taking up the clientid access right.
|
||||
%%
|
||||
%% Note that: It should be called on a lock transaction
|
||||
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
|
||||
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when
|
||||
is_pid(ChanPid) andalso ?IS_CLIENTID(ClientId)
|
||||
->
|
||||
Chan = {ClientId, ChanPid},
|
||||
%% cast (for process monitor) before inserting ets tables
|
||||
cast({registered, Chan}),
|
||||
|
@ -180,7 +186,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid)
|
|||
|
||||
%% @doc Unregister a channel.
|
||||
-spec unregister_channel(emqx_types:clientid()) -> ok.
|
||||
unregister_channel(ClientId) when is_binary(ClientId) ->
|
||||
unregister_channel(ClientId) when ?IS_CLIENTID(ClientId) ->
|
||||
true = do_unregister_channel({ClientId, self()}),
|
||||
ok.
|
||||
|
||||
|
@ -215,7 +221,7 @@ get_chan_info(ClientId, ChanPid) ->
|
|||
|
||||
%% @doc Update infos of the channel.
|
||||
-spec set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean().
|
||||
set_chan_info(ClientId, Info) when is_binary(ClientId) ->
|
||||
set_chan_info(ClientId, Info) when ?IS_CLIENTID(ClientId) ->
|
||||
Chan = {ClientId, self()},
|
||||
try
|
||||
ets:update_element(?CHAN_INFO_TAB, Chan, {2, Info})
|
||||
|
@ -245,12 +251,12 @@ get_chan_stats(ClientId, ChanPid) ->
|
|||
|
||||
%% @doc Set channel's stats.
|
||||
-spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean().
|
||||
set_chan_stats(ClientId, Stats) when is_binary(ClientId) ->
|
||||
set_chan_stats(ClientId, Stats) when ?IS_CLIENTID(ClientId) ->
|
||||
set_chan_stats(ClientId, self(), Stats).
|
||||
|
||||
-spec set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) ->
|
||||
boolean().
|
||||
set_chan_stats(ClientId, ChanPid, Stats) ->
|
||||
set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) ->
|
||||
Chan = {ClientId, ChanPid},
|
||||
try
|
||||
ets:update_element(?CHAN_INFO_TAB, Chan, {3, Stats})
|
||||
|
|
|
@ -638,8 +638,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
|||
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||
{ok, State};
|
||||
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
|
||||
case emqx_channel:info(clientid, Channel) of
|
||||
%% ClientId is yet unknown (i.e. connect packet is not received yet)
|
||||
undefined -> ok;
|
||||
ClientId -> emqx_cm:insert_channel_info(ClientId, info(State), stats(State))
|
||||
end,
|
||||
{ok, State};
|
||||
handle_msg({timeout, TRef, TMsg}, State) ->
|
||||
handle_timeout(TRef, TMsg, State);
|
||||
|
|
|
@ -533,9 +533,14 @@ handle_info({event, disconnected}, State = #state{channel = Channel}) ->
|
|||
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||
return(State);
|
||||
handle_info({event, _Other}, State = #state{channel = Channel}) ->
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
case emqx_channel:info(clientid, Channel) of
|
||||
%% ClientId is yet unknown (i.e. connect packet is not received yet)
|
||||
undefined ->
|
||||
ok;
|
||||
ClientId ->
|
||||
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State))
|
||||
end,
|
||||
return(State);
|
||||
handle_info(Info, State) ->
|
||||
with_channel(handle_info, [Info], State).
|
||||
|
|
|
@ -81,7 +81,12 @@ t_get_set_chan_info(_) ->
|
|||
true = emqx_cm:set_chan_info(<<"clientid">>, Info1),
|
||||
?assertEqual(Info1, emqx_cm:get_chan_info(<<"clientid">>)),
|
||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)).
|
||||
?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)),
|
||||
|
||||
?assertError(
|
||||
function_clause,
|
||||
emqx_cm:insert_channel_info(undefined, #{}, [])
|
||||
).
|
||||
|
||||
t_get_set_chan_stats(_) ->
|
||||
Stats = [{recv_oct, 10}, {send_oct, 8}],
|
||||
|
@ -93,6 +98,12 @@ t_get_set_chan_stats(_) ->
|
|||
Stats1 = [{recv_oct, 10} | Stats],
|
||||
true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
|
||||
?assertEqual(Stats1, emqx_cm:get_chan_stats(<<"clientid">>)),
|
||||
|
||||
?assertError(
|
||||
function_clause,
|
||||
emqx_cm:set_chan_stats(undefined, [])
|
||||
),
|
||||
|
||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).
|
||||
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_connection_conninfo_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
init_per_testcase(Case, Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx,
|
||||
"listeners.tcp.default {\n"
|
||||
" tcp_options {\n"
|
||||
" recbuf = 10\n"
|
||||
" buffer = 10\n"
|
||||
" active_n = 1\n"
|
||||
" }\n"
|
||||
" messages_rate = \"1/1s\"\n"
|
||||
" bytes_rate = \"1KB/1s\"\n"
|
||||
"}\n"}
|
||||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Case, Config)}
|
||||
),
|
||||
[{apps, Apps} | Config].
|
||||
|
||||
end_per_testcase(_Case, Config) ->
|
||||
emqx_cth_suite:stop(?config(apps, Config)).
|
||||
|
||||
t_inconsistent_chan_info(_Config) ->
|
||||
{ok, C} = emqtt:start_link([{clientid, emqx_guid:to_hexstr(emqx_guid:gen())}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ok = emqtt:disconnect(C),
|
||||
|
||||
ClientIds = [
|
||||
ClientId
|
||||
|| {ClientId, _ConnState, _ConnInfo, _ClientInfo} <- qlc:eval(
|
||||
emqx_cm:all_channels_table([emqx_connection])
|
||||
)
|
||||
],
|
||||
|
||||
?assertNot(lists:member(undefined, ClientIds)).
|
|
@ -0,0 +1 @@
|
|||
Eliminated passing incomplete client/connection information into `emqx_cm`. This could lead to internal inconsistency and affect memory consumption and some processes like node evacuation.
|
Loading…
Reference in New Issue