Merge pull request #11315 from keynslug/ft/EMQX-9593/peek-mqueue-info

refactor(session): allow peeking at mqueue less intrusively
This commit is contained in:
Andrew Mayorov 2023-07-24 09:57:05 +02:00 committed by GitHub
commit 81793c31fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 15 additions and 17 deletions

View File

@ -151,7 +151,7 @@
info(Channel) -> info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)). maps:from_list(info(?INFO_KEYS, Channel)).
-spec info(list(atom()) | atom(), channel()) -> term(). -spec info(list(atom()) | atom() | tuple(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) -> info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys]; [{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) -> info(conninfo, #channel{conninfo = ConnInfo}) ->
@ -180,6 +180,8 @@ info(username, #channel{clientinfo = ClientInfo}) ->
maps:get(username, ClientInfo, undefined); maps:get(username, ClientInfo, undefined);
info(session, #channel{session = Session}) -> info(session, #channel{session = Session}) ->
maybe_apply(fun emqx_session:info/1, Session); maybe_apply(fun emqx_session:info/1, Session);
info({session, Info}, #channel{session = Session}) ->
maybe_apply(fun(S) -> emqx_session:info(Info, S) end, Session);
info(conn_state, #channel{conn_state = ConnState}) -> info(conn_state, #channel{conn_state = ConnState}) ->
ConnState; ConnState;
info(keepalive, #channel{keepalive = Keepalive}) -> info(keepalive, #channel{keepalive = Keepalive}) ->
@ -1195,8 +1197,6 @@ handle_call(
ChanInfo1 = info(NChannel), ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(alive_timer, NChannel)); reply(ok, reset_timer(alive_timer, NChannel));
handle_call(get_mqueue, Channel) ->
reply({ok, get_mqueue(Channel)}, Channel);
handle_call(Req, Channel) -> handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel). reply(ignored, Channel).
@ -2240,6 +2240,3 @@ get_mqtt_conf(Zone, Key, Default) ->
set_field(Name, Value, Channel) -> set_field(Name, Value, Channel) ->
Pos = emqx_utils:index_of(Name, record_info(fields, channel)), Pos = emqx_utils:index_of(Name, record_info(fields, channel)),
setelement(Pos + 1, Channel, Value). setelement(Pos + 1, Channel, Value).
get_mqueue(#channel{session = Session}) ->
emqx_session:get_mqueue(Session).

View File

@ -44,6 +44,7 @@
-export([ -export([
info/1, info/1,
info/2,
stats/1 stats/1
]). ]).
@ -221,11 +222,10 @@ info(CPid) when is_pid(CPid) ->
call(CPid, info); call(CPid, info);
info(State = #state{channel = Channel}) -> info(State = #state{channel = Channel}) ->
ChanInfo = emqx_channel:info(Channel), ChanInfo = emqx_channel:info(Channel),
SockInfo = maps:from_list( SockInfo = maps:from_list(info(?INFO_KEYS, State)),
info(?INFO_KEYS, State)
),
ChanInfo#{sockinfo => SockInfo}. ChanInfo#{sockinfo => SockInfo}.
-spec info([atom()] | atom() | tuple(), pid() | state()) -> term().
info(Keys, State) when is_list(Keys) -> info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, State)} || Key <- Keys]; [{Key, info(Key, State)} || Key <- Keys];
info(socktype, #state{transport = Transport, socket = Socket}) -> info(socktype, #state{transport = Transport, socket = Socket}) ->
@ -241,7 +241,9 @@ info(stats_timer, #state{stats_timer = StatsTimer}) ->
info(limiter, #state{limiter = Limiter}) -> info(limiter, #state{limiter = Limiter}) ->
Limiter; Limiter;
info(limiter_timer, #state{limiter_timer = Timer}) -> info(limiter_timer, #state{limiter_timer = Timer}) ->
Timer. Timer;
info({channel, Info}, #state{channel = Channel}) ->
emqx_channel:info(Info, Channel).
%% @doc Get stats of the connection/channel. %% @doc Get stats of the connection/channel.
-spec stats(pid() | state()) -> emqx_types:stats(). -spec stats(pid() | state()) -> emqx_types:stats().

View File

@ -65,8 +65,7 @@
info/1, info/1,
info/2, info/2,
stats/1, stats/1,
obtain_next_pkt_id/1, obtain_next_pkt_id/1
get_mqueue/1
]). ]).
-export([ -export([
@ -955,6 +954,3 @@ age(Now, Ts) -> Now - Ts.
set_field(Name, Value, Session) -> set_field(Name, Value, Session) ->
Pos = emqx_utils:index_of(Name, record_info(fields, session)), Pos = emqx_utils:index_of(Name, record_info(fields, session)),
setelement(Pos + 1, Session, Value). setelement(Pos + 1, Session, Value).
get_mqueue(#session{mqueue = Q}) ->
emqx_mqueue:to_list(Q).

View File

@ -758,13 +758,16 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1), {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
ct:sleep(100), ct:sleep(100),
{ok, Msgs1} = gen_server:call(Pid1, get_mqueue), Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
{ok, Msgs2} = gen_server:call(Pid2, get_mqueue), Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
%% assert the message is in mqueue (because socket is closed) %% assert the message is in mqueue (because socket is closed)
?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2), ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2),
emqtt:stop(ConnPub), emqtt:stop(ConnPub),
ok. ok.
get_mqueue(ConnPid) ->
emqx_connection:info({channel, {session, mqueue}}, sys:get_state(ConnPid)).
%% No ack, QoS 2 subscriptions, %% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend %% client1 receives one message, send pubrec, then suspend
%% client2 acts normal (auto_ack=true) %% client2 acts normal (auto_ack=true)