diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 6e74126ca..af4d7be56 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -151,7 +151,7 @@ info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). --spec info(list(atom()) | atom(), channel()) -> term(). +-spec info(list(atom()) | atom() | tuple(), channel()) -> term(). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> @@ -180,6 +180,8 @@ info(username, #channel{clientinfo = ClientInfo}) -> maps:get(username, ClientInfo, undefined); info(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:info/1, Session); +info({session, Info}, #channel{session = Session}) -> + maybe_apply(fun(S) -> emqx_session:info(Info, S) end, Session); info(conn_state, #channel{conn_state = ConnState}) -> ConnState; info(keepalive, #channel{keepalive = Keepalive}) -> @@ -1195,8 +1197,6 @@ handle_call( ChanInfo1 = info(NChannel), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), reply(ok, reset_timer(alive_timer, NChannel)); -handle_call(get_mqueue, Channel) -> - reply({ok, get_mqueue(Channel)}, Channel); handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). @@ -2240,6 +2240,3 @@ get_mqtt_conf(Zone, Key, Default) -> set_field(Name, Value, Channel) -> Pos = emqx_utils:index_of(Name, record_info(fields, channel)), setelement(Pos + 1, Channel, Value). - -get_mqueue(#channel{session = Session}) -> - emqx_session:get_mqueue(Session). diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 1172460ac..70eb0d1e4 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -44,6 +44,7 @@ -export([ info/1, + info/2, stats/1 ]). @@ -221,11 +222,10 @@ info(CPid) when is_pid(CPid) -> call(CPid, info); info(State = #state{channel = Channel}) -> ChanInfo = emqx_channel:info(Channel), - SockInfo = maps:from_list( - info(?INFO_KEYS, State) - ), + SockInfo = maps:from_list(info(?INFO_KEYS, State)), ChanInfo#{sockinfo => SockInfo}. +-spec info([atom()] | atom() | tuple(), pid() | state()) -> term(). info(Keys, State) when is_list(Keys) -> [{Key, info(Key, State)} || Key <- Keys]; info(socktype, #state{transport = Transport, socket = Socket}) -> @@ -241,7 +241,9 @@ info(stats_timer, #state{stats_timer = StatsTimer}) -> info(limiter, #state{limiter = Limiter}) -> Limiter; info(limiter_timer, #state{limiter_timer = Timer}) -> - Timer. + Timer; +info({channel, Info}, #state{channel = Channel}) -> + emqx_channel:info(Info, Channel). %% @doc Get stats of the connection/channel. -spec stats(pid() | state()) -> emqx_types:stats(). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 3036887de..d838e95d0 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -65,8 +65,7 @@ info/1, info/2, stats/1, - obtain_next_pkt_id/1, - get_mqueue/1 + obtain_next_pkt_id/1 ]). -export([ @@ -955,6 +954,3 @@ age(Now, Ts) -> Now - Ts. set_field(Name, Value, Session) -> Pos = emqx_utils:index_of(Name, record_info(fields, session)), setelement(Pos + 1, Session, Value). - -get_mqueue(#session{mqueue = Q}) -> - emqx_mqueue:to_list(Q). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 4726f1111..6439981f6 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -758,13 +758,16 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) -> {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1), ct:sleep(100), - {ok, Msgs1} = gen_server:call(Pid1, get_mqueue), - {ok, Msgs2} = gen_server:call(Pid2, get_mqueue), + Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)), + Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)), %% assert the message is in mqueue (because socket is closed) ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2), emqtt:stop(ConnPub), ok. +get_mqueue(ConnPid) -> + emqx_connection:info({channel, {session, mqueue}}, sys:get_state(ConnPid)). + %% No ack, QoS 2 subscriptions, %% client1 receives one message, send pubrec, then suspend %% client2 acts normal (auto_ack=true)