diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 71e94f841..de1fbabde 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -313,8 +313,14 @@ format_channel_info({_Key, Info, Stats0}) -> inflight, max_inflight, awaiting_rel, max_awaiting_rel, mqueue_len, mqueue_dropped, max_mqueue, heap_size, reductions, mailbox_len, - recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, - send_msg, send_oct, send_pkt], NStats), + recv_cnt, + recv_msg, 'recv_msg.qos0', 'recv_msg.qos1', 'recv_msg.qos2', + 'recv_msg.dropped', 'recv_msg.dropped.expired', + recv_oct, recv_pkt, send_cnt, + send_msg, 'send_msg.qos0', 'send_msg.qos1', 'send_msg.qos2', + 'send_msg.dropped', 'send_msg.dropped.expired', + 'send_msg.dropped.queue_full', 'send_msg.dropped.too_large', + send_oct, send_pkt], NStats), maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), maps:with([clean_start, keepalive, expiry_interval, proto_name, proto_ver, peername, connected_at, disconnected_at], ConnInfo), @@ -373,7 +379,7 @@ match_fun(Ms, Fuzzy) -> run_fuzzy_match(_, []) -> true; -run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr}|Fuzzy]) -> +run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) -> Val = case maps:get(Key, ClientInfo, undefined) of undefined -> <<>>; V -> V diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 8e1016cc4..2e300b85e 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,6 +2,7 @@ {VSN, [{"4.4.0", [ {load_module,emqx_channel,brutal_purge,soft_purge,[]} + , {load_module,emqx_connection,brutal_purge,soft_purge,[]} , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} , {load_module,emqx_session,brutal_purge,soft_purge,[]} , {load_module,emqx_alarm,brutal_purge,soft_purge,[]} @@ -9,6 +10,7 @@ {<<".*">>,[]}], [{"4.4.0", [ {load_module,emqx_channel,brutal_purge,soft_purge,[]} + , {load_module,emqx_connection,brutal_purge,soft_purge,[]} , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} , {load_module,emqx_session,brutal_purge,soft_purge,[]} , {load_module,emqx_alarm,brutal_purge,soft_purge,[]} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 5eeb323b0..2c2cc472d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -176,7 +176,7 @@ start_link(Transport, Socket, Options) -> %%-------------------------------------------------------------------- %% @doc Get infos of the connection/channel. --spec(info(pid()|state()) -> emqx_types:infos()). +-spec(info(pid() | state()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); info(State = #state{channel = Channel}) -> @@ -205,7 +205,7 @@ info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). %% @doc Get stats of the connection/channel. --spec(stats(pid()|state()) -> emqx_types:stats()). +-spec(stats(pid() | state()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); stats(#state{transport = Transport, @@ -389,7 +389,7 @@ cancel_stats_timer(State) -> State. process_msg([], State) -> {ok, State}; -process_msg([Msg|More], State) -> +process_msg([Msg | More], State) -> try case handle_msg(Msg, State) of ok -> @@ -483,7 +483,7 @@ handle_msg({Passive, _Sock}, State) handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{active_n = ActiveN} = State) -> - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -657,7 +657,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> {Packets, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, - parse_incoming(Rest, [Packet|Packets], NState) + parse_incoming(Rest, [Packet | Packets], NState) catch error:proxy_protocol_config_disabled:_Stk -> ?LOG(error, diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 6d4b2af34..9648083d9 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -427,8 +427,8 @@ dequeue(ClientInfo, Cnt, Msgs, Q) -> {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of true -> - ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), ok = inc_delivery_expired_cnt(), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), dequeue(ClientInfo, Cnt, Msgs, Q1); false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg | Msgs], Q1) end @@ -503,11 +503,14 @@ log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), + ok = inc_pd('send_msg.dropped'), ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); false -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.queue_full'), + ok = inc_pd('send_msg.dropped'), + ok = inc_pd('send_msg.dropped.queue_full'), ?LOG(warning, "Dropped msg due to mqueue is full: ~s", [emqx_message:format(Msg)]) end. @@ -568,7 +571,8 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %% Retry Delivery %%-------------------------------------------------------------------- --spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). +-spec(retry(emqx_types:clientinfo(), session()) -> + {ok, session()} | {ok, replies(), timeout(), session()}). retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; @@ -680,13 +684,23 @@ inc_delivery_expired_cnt() -> inc_delivery_expired_cnt(1). inc_delivery_expired_cnt(N) -> + ok = inc_pd('send_msg.dropped', N), + ok = inc_pd('send_msg.dropped.expired', N), ok = emqx_metrics:inc('delivery.dropped', N), emqx_metrics:inc('delivery.dropped.expired', N). inc_await_pubrel_timeout(N) -> + ok = inc_pd('recv_msg.dropped', N), + ok = inc_pd('recv_msg.dropped.expired', N), ok = emqx_metrics:inc('messages.dropped', N), emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N). +inc_pd(Key) -> + inc_pd(Key, 1). +inc_pd(Key, Inc) -> + _ = emqx_pd:inc_counter(Key, Inc), + ok. + %%-------------------------------------------------------------------- %% Next Packet Id %%--------------------------------------------------------------------