diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 3f9fd1610..eac258bcf 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -138,12 +138,6 @@ %% Stats timer stats_timer :: reference() | undefined, - %% Deliver stats - deliver_stats = 0, - - %% Enqueue stats - enqueue_stats = 0, - %% GC State gc_state, @@ -224,9 +218,7 @@ stats(#state{max_subscriptions = MaxSubscriptions, inflight = Inflight, mqueue = MQueue, max_awaiting_rel = MaxAwaitingRel, - awaiting_rel = AwaitingRel, - deliver_stats = DeliverMsg, - enqueue_stats = EnqueueMsg}) -> + awaiting_rel = AwaitingRel}) -> lists:append(emqx_misc:proc_stats(), [{max_subscriptions, MaxSubscriptions}, {subscriptions_count, maps:size(Subscriptions)}, @@ -237,8 +229,8 @@ stats(#state{max_subscriptions = MaxSubscriptions, {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, {max_awaiting_rel, MaxAwaitingRel}, {awaiting_rel_len, maps:size(AwaitingRel)}, - {deliver_msg, DeliverMsg}, - {enqueue_msg, EnqueueMsg}]). + {deliver_msg, emqx_pd:get_counter(deliver_stats)}, + {enqueue_msg, emqx_pd:get_counter(enqueue_stats)}]). %%------------------------------------------------------------------------------ %% PubSub API @@ -366,8 +358,6 @@ init([Parent, #{zone := Zone, max_awaiting_rel = get_env(Zone, max_awaiting_rel), expiry_interval = ExpiryInterval, enable_stats = get_env(Zone, enable_stats, true), - deliver_stats = 0, - enqueue_stats = 0, gc_state = emqx_gc:init(GcPolicy), created_at = os:timestamp(), will_msg = WillMsg @@ -593,7 +583,8 @@ handle_info({dispatch, Topic, Msg = #message{}}, State) -> ok = emqx_shared_sub:nack_no_connection(Msg), {noreply, State}; false -> - noreply(ensure_stats_timer(handle_dispatch(Topic, Msg, State))) + NewState = handle_dispatch(Topic, Msg, State), + noreply(ensure_stats_timer(maybe_gc({1, msg_size(Msg)}, NewState))) end; %% Do nothing if the client has been disconnected. @@ -834,30 +825,31 @@ run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> %% Enqueue message if the client has been disconnected dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of - ok -> enqueue_msg(Msg, State); + ok -> enqueue_msg(Msg, State); stop -> State end; %% Deliver qos0 message directly to client dispatch(Msg = #message{qos = ?QOS_0} = Msg, State) -> - deliver(undefined, Msg, State), - inc_stats(deliver, Msg, State); + ok = deliver(undefined, Msg, State), + State; dispatch(Msg = #message{qos = QoS} = Msg, State = #state{next_pkt_id = PacketId, inflight = Inflight}) - when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); false -> - deliver(PacketId, Msg, State), - await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State))) + ok = deliver(PacketId, Msg, State), + await(PacketId, Msg, next_pkt_id(State)) end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> + emqx_pd:update_counter(enqueue_stats, 1), {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), Dropped =/= undefined andalso emqx_shared_sub:maybe_nack_dropped(Dropped), - inc_stats(enqueue, Msg, State#state{mqueue = NewQ}). + State#state{mqueue = NewQ}. %%------------------------------------------------------------------------------ %% Deliver @@ -872,6 +864,7 @@ redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) -> ConnPid ! {deliver, {pubrel, PacketId}}. deliver(PacketId, Msg, State) -> + emqx_pd:update_counter(deliver_stats, 1), %% Ack QoS1/QoS2 messages when message is delivered to connection. %% NOTE: NOT to wait for PUBACK because: %% The sender is monitoring this session process, @@ -882,7 +875,7 @@ deliver(PacketId, Msg, State) -> do_deliver(PacketId, emqx_shared_sub:maybe_ack(Msg), State). do_deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = local}) -> - ConnPid ! {deliver, {publish, PacketId, Msg}}; + ConnPid ! {deliver, {publish, PacketId, Msg}}, ok; do_deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) -> emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, {publish, PacketId, Msg}}]). @@ -994,21 +987,21 @@ next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) -> next_pkt_id(State = #state{next_pkt_id = Id}) -> State#state{next_pkt_id = Id + 1}. -%%------------------------------------------------------------------------------ -%% Inc stats - -inc_stats(deliver, Msg, State = #state{deliver_stats = I}) -> - State1 = maybe_gc({1, msg_size(Msg)}, State), - State1#state{deliver_stats = I + 1}; -inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) -> - State#state{enqueue_stats = I + 1}. - %% Take only the payload size into account, add other fields if necessary msg_size(#message{payload = Payload}) -> payload_size(Payload). %% Payload should be binary(), but not 100% sure. Need dialyzer! payload_size(Payload) -> erlang:iolist_size(Payload). +%%------------------------------------------------------------------------------ +%% Maybe GC + +maybe_gc(_, State = #state{gc_state = undefined}) -> + State; +maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> + {_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt), + State#state{gc_state = GCSt1}. + %%------------------------------------------------------------------------------ %% Helper functions @@ -1024,9 +1017,3 @@ noreply(State) -> shutdown(Reason, State) -> {stop, Reason, State}. -maybe_gc(_, State = #state{gc_state = undefined}) -> - State; -maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> - {_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt), - State#state{gc_state = GCSt1}. -