Inc deliver_stats, enqueue_stats with emqx_pd:update_counter/2 (#2100)

This commit is contained in:
Feng Lee 2018-12-21 18:31:04 +08:00 committed by turtleDeng
parent 6f4d517350
commit bb45825e77
1 changed files with 24 additions and 37 deletions

View File

@ -138,12 +138,6 @@
%% Stats timer
stats_timer :: reference() | undefined,
%% Deliver stats
deliver_stats = 0,
%% Enqueue stats
enqueue_stats = 0,
%% GC State
gc_state,
@ -224,9 +218,7 @@ stats(#state{max_subscriptions = MaxSubscriptions,
inflight = Inflight,
mqueue = MQueue,
max_awaiting_rel = MaxAwaitingRel,
awaiting_rel = AwaitingRel,
deliver_stats = DeliverMsg,
enqueue_stats = EnqueueMsg}) ->
awaiting_rel = AwaitingRel}) ->
lists:append(emqx_misc:proc_stats(),
[{max_subscriptions, MaxSubscriptions},
{subscriptions_count, maps:size(Subscriptions)},
@ -237,8 +229,8 @@ stats(#state{max_subscriptions = MaxSubscriptions,
{mqueue_dropped, emqx_mqueue:dropped(MQueue)},
{max_awaiting_rel, MaxAwaitingRel},
{awaiting_rel_len, maps:size(AwaitingRel)},
{deliver_msg, DeliverMsg},
{enqueue_msg, EnqueueMsg}]).
{deliver_msg, emqx_pd:get_counter(deliver_stats)},
{enqueue_msg, emqx_pd:get_counter(enqueue_stats)}]).
%%------------------------------------------------------------------------------
%% PubSub API
@ -366,8 +358,6 @@ init([Parent, #{zone := Zone,
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
expiry_interval = ExpiryInterval,
enable_stats = get_env(Zone, enable_stats, true),
deliver_stats = 0,
enqueue_stats = 0,
gc_state = emqx_gc:init(GcPolicy),
created_at = os:timestamp(),
will_msg = WillMsg
@ -593,7 +583,8 @@ handle_info({dispatch, Topic, Msg = #message{}}, State) ->
ok = emqx_shared_sub:nack_no_connection(Msg),
{noreply, State};
false ->
noreply(ensure_stats_timer(handle_dispatch(Topic, Msg, State)))
NewState = handle_dispatch(Topic, Msg, State),
noreply(ensure_stats_timer(maybe_gc({1, msg_size(Msg)}, NewState)))
end;
%% Do nothing if the client has been disconnected.
@ -834,30 +825,31 @@ run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
%% Enqueue message if the client has been disconnected
dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of
ok -> enqueue_msg(Msg, State);
ok -> enqueue_msg(Msg, State);
stop -> State
end;
%% Deliver qos0 message directly to client
dispatch(Msg = #message{qos = ?QOS_0} = Msg, State) ->
deliver(undefined, Msg, State),
inc_stats(deliver, Msg, State);
ok = deliver(undefined, Msg, State),
State;
dispatch(Msg = #message{qos = QoS} = Msg,
State = #state{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case emqx_inflight:is_full(Inflight) of
true ->
enqueue_msg(Msg, State);
false ->
deliver(PacketId, Msg, State),
await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State)))
ok = deliver(PacketId, Msg, State),
await(PacketId, Msg, next_pkt_id(State))
end.
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
emqx_pd:update_counter(enqueue_stats, 1),
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
Dropped =/= undefined andalso emqx_shared_sub:maybe_nack_dropped(Dropped),
inc_stats(enqueue, Msg, State#state{mqueue = NewQ}).
State#state{mqueue = NewQ}.
%%------------------------------------------------------------------------------
%% Deliver
@ -872,6 +864,7 @@ redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) ->
ConnPid ! {deliver, {pubrel, PacketId}}.
deliver(PacketId, Msg, State) ->
emqx_pd:update_counter(deliver_stats, 1),
%% Ack QoS1/QoS2 messages when message is delivered to connection.
%% NOTE: NOT to wait for PUBACK because:
%% The sender is monitoring this session process,
@ -882,7 +875,7 @@ deliver(PacketId, Msg, State) ->
do_deliver(PacketId, emqx_shared_sub:maybe_ack(Msg), State).
do_deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = local}) ->
ConnPid ! {deliver, {publish, PacketId, Msg}};
ConnPid ! {deliver, {publish, PacketId, Msg}}, ok;
do_deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) ->
emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, {publish, PacketId, Msg}}]).
@ -994,21 +987,21 @@ next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) ->
next_pkt_id(State = #state{next_pkt_id = Id}) ->
State#state{next_pkt_id = Id + 1}.
%%------------------------------------------------------------------------------
%% Inc stats
inc_stats(deliver, Msg, State = #state{deliver_stats = I}) ->
State1 = maybe_gc({1, msg_size(Msg)}, State),
State1#state{deliver_stats = I + 1};
inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) ->
State#state{enqueue_stats = I + 1}.
%% Take only the payload size into account, add other fields if necessary
msg_size(#message{payload = Payload}) -> payload_size(Payload).
%% Payload should be binary(), but not 100% sure. Need dialyzer!
payload_size(Payload) -> erlang:iolist_size(Payload).
%%------------------------------------------------------------------------------
%% Maybe GC
maybe_gc(_, State = #state{gc_state = undefined}) ->
State;
maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) ->
{_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt),
State#state{gc_state = GCSt1}.
%%------------------------------------------------------------------------------
%% Helper functions
@ -1024,9 +1017,3 @@ noreply(State) ->
shutdown(Reason, State) ->
{stop, Reason, State}.
maybe_gc(_, State = #state{gc_state = undefined}) ->
State;
maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) ->
{_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt),
State#state{gc_state = GCSt1}.