Alter apis provided by emqx_metrics, and use existing timer to commit metrics
This commit is contained in:
parent
194dbc02c8
commit
ea62b15c87
|
@ -479,15 +479,6 @@ mqtt.shared_subscription = true
|
||||||
## Value: true | false
|
## Value: true | false
|
||||||
mqtt.ignore_loop_deliver = false
|
mqtt.ignore_loop_deliver = false
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## Metric
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
|
|
||||||
## Commit interval for metric
|
|
||||||
##
|
|
||||||
## Value: Duration
|
|
||||||
metric.commit_interval = 10s
|
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## Zones
|
## Zones
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
|
|
|
@ -616,15 +616,6 @@ end}.
|
||||||
{datatype, {enum, [true, false]}}
|
{datatype, {enum, [true, false]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Metirc
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% @doc Commit interval for metric
|
|
||||||
{mapping, "metric.commit_interval", "emqx.metric_commit_interval", [
|
|
||||||
{default, "10s"},
|
|
||||||
{datatype, {duration, ms}}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Zones
|
%% Zones
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -321,8 +321,6 @@ topics() -> emqx_router:topics().
|
||||||
|
|
||||||
init([Pool, Id]) ->
|
init([Pool, Id]) ->
|
||||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||||
MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval, MetricCommitInterval div 2, {metric_commit, MetricCommitInterval}),
|
|
||||||
{ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}.
|
{ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -373,11 +371,6 @@ handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{submap = Su
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
|
|
||||||
emqx_metrics:commit(),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
|
emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
|
@ -154,10 +154,6 @@ init([Transport, RawSocket, Options]) ->
|
||||||
ok = emqx_misc:init_proc_mng_policy(Zone),
|
ok = emqx_misc:init_proc_mng_policy(Zone),
|
||||||
|
|
||||||
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
||||||
MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval,
|
|
||||||
MetricCommitInterval div 2,
|
|
||||||
{metric_commit, MetricCommitInterval}),
|
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
|
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
|
||||||
State, self(), IdleTimout);
|
State, self(), IdleTimout);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -174,7 +170,7 @@ send_fun(Transport, Socket) ->
|
||||||
Data = emqx_frame:serialize(Packet, Options),
|
Data = emqx_frame:serialize(Packet, Options),
|
||||||
try Transport:async_send(Socket, Data) of
|
try Transport:async_send(Socket, Data) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_metrics:inc('bytes/sent', iolist_size(Data)),
|
emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),
|
||||||
ok;
|
ok;
|
||||||
Error -> Error
|
Error -> Error
|
||||||
catch
|
catch
|
||||||
|
@ -219,6 +215,7 @@ handle_info({timeout, Timer, emit_stats},
|
||||||
State = #state{stats_timer = Timer,
|
State = #state{stats_timer = Timer,
|
||||||
proto_state = ProtoState
|
proto_state = ProtoState
|
||||||
}) ->
|
}) ->
|
||||||
|
emqx_metrics:commit(),
|
||||||
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
|
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
|
||||||
NewState = State#state{stats_timer = undefined},
|
NewState = State#state{stats_timer = undefined},
|
||||||
Limits = erlang:get(force_shutdown_policy),
|
Limits = erlang:get(force_shutdown_policy),
|
||||||
|
@ -232,10 +229,6 @@ handle_info({timeout, Timer, emit_stats},
|
||||||
?LOG(warning, "shutdown due to ~p", [Reason]),
|
?LOG(warning, "shutdown due to ~p", [Reason]),
|
||||||
shutdown(Reason, NewState)
|
shutdown(Reason, NewState)
|
||||||
end;
|
end;
|
||||||
handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
|
|
||||||
emqx_metrics:commit(),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
|
|
||||||
{noreply, State};
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
shutdown(idle_timeout, State);
|
shutdown(idle_timeout, State);
|
||||||
|
|
||||||
|
@ -256,7 +249,7 @@ handle_info(activate_sock, State) ->
|
||||||
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
|
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
|
||||||
?LOG(debug, "RECV ~p", [Data]),
|
?LOG(debug, "RECV ~p", [Data]),
|
||||||
Size = iolist_size(Data),
|
Size = iolist_size(Data),
|
||||||
emqx_metrics:inc('bytes/received', Size),
|
emqx_metrics:trans(inc, 'bytes/received', Size),
|
||||||
Incoming = #{bytes => Size, packets => 0},
|
Incoming = #{bytes => Size, packets => 0},
|
||||||
handle_packet(Data, State#state{await_recv = false, incoming = Incoming});
|
handle_packet(Data, State#state{await_recv = false, incoming = Incoming});
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([new/1, all/0]).
|
-export([new/1, all/0]).
|
||||||
-export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2, commit/0]).
|
-export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]).
|
||||||
-export([start_timer/2, start_timer/3]).
|
-export([trans/2, trans/3, trans/4, commit/0]).
|
||||||
%% Received/sent metrics
|
%% Received/sent metrics
|
||||||
-export([received/1, sent/1]).
|
-export([received/1, sent/1]).
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ inc(Metric, Val) when is_atom(Metric) ->
|
||||||
%% @doc Increase metric value
|
%% @doc Increase metric value
|
||||||
-spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
|
-spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
|
||||||
inc(Type, Metric, Val) ->
|
inc(Type, Metric, Val) ->
|
||||||
hold(Type, Metric, Val).
|
update_counter(key(Type, Metric), {2, Val}).
|
||||||
|
|
||||||
%% @doc Decrease metric value
|
%% @doc Decrease metric value
|
||||||
-spec(dec(gauge, atom()) -> integer()).
|
-spec(dec(gauge, atom()) -> integer()).
|
||||||
|
@ -145,7 +145,7 @@ dec(gauge, Metric) ->
|
||||||
%% @doc Decrease metric value
|
%% @doc Decrease metric value
|
||||||
-spec(dec(gauge, atom(), pos_integer()) -> integer()).
|
-spec(dec(gauge, atom(), pos_integer()) -> integer()).
|
||||||
dec(gauge, Metric, Val) ->
|
dec(gauge, Metric, Val) ->
|
||||||
hold(gauge, Metric, -Val).
|
update_counter(key(gauge, Metric), {2, -Val}).
|
||||||
|
|
||||||
%% @doc Set metric value
|
%% @doc Set metric value
|
||||||
set(Metric, Val) when is_atom(Metric) ->
|
set(Metric, Val) when is_atom(Metric) ->
|
||||||
|
@ -153,7 +153,24 @@ set(Metric, Val) when is_atom(Metric) ->
|
||||||
set(gauge, Metric, Val) ->
|
set(gauge, Metric, Val) ->
|
||||||
ets:insert(?TAB, {key(gauge, Metric), Val}).
|
ets:insert(?TAB, {key(gauge, Metric), Val}).
|
||||||
|
|
||||||
% -spec(hold(counter | gauge, atom(), inc_dec | assign, integer()) -> integer()).
|
trans(inc, Metric) ->
|
||||||
|
trans(inc, {counter, Metric}, 1).
|
||||||
|
|
||||||
|
trans(Opt, {gauge, Metric}, Val) ->
|
||||||
|
trans(Opt, gauge, Metric, Val);
|
||||||
|
trans(inc, {counter, Metric}, Val) ->
|
||||||
|
trans(inc, counter, Metric, Val);
|
||||||
|
trans(inc, Metric, Val) when is_atom(Metric) ->
|
||||||
|
trans(inc, counter, Metric, Val);
|
||||||
|
trans(dec, gauge, Metric) ->
|
||||||
|
trans(dec, gauge, Metric, 1).
|
||||||
|
|
||||||
|
trans(inc, Type, Metric, Val) ->
|
||||||
|
hold(Type, Metric, Val);
|
||||||
|
trans(dec, gauge, Metric, Val) ->
|
||||||
|
hold(gauge, Metric, -Val).
|
||||||
|
|
||||||
|
% -spec(hold(counter | gauge, atom(), integer()) -> integer()).
|
||||||
hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge ->
|
hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge ->
|
||||||
NewMetrics = case get(metrics) of
|
NewMetrics = case get(metrics) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -176,21 +193,6 @@ commit() ->
|
||||||
put(metrics, #{})
|
put(metrics, #{})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(start_timer(integer(), term()) -> reference() | undefined).
|
|
||||||
start_timer(Interval, Msg) ->
|
|
||||||
start_timer(Interval, 0, Msg).
|
|
||||||
|
|
||||||
-spec(start_timer(integer(), integer(), term()) -> reference() | undefined).
|
|
||||||
start_timer(Interval, MaxJitter, Msg) when Interval > 0 ->
|
|
||||||
emqx_misc:start_timer((Interval + case MaxJitter >= 1 of
|
|
||||||
true ->
|
|
||||||
rand:uniform(MaxJitter);
|
|
||||||
false ->
|
|
||||||
0
|
|
||||||
end), Msg);
|
|
||||||
start_timer(_Interval, _Jitter, _Msg) ->
|
|
||||||
undefined.
|
|
||||||
|
|
||||||
%% @doc Metric key
|
%% @doc Metric key
|
||||||
key(gauge, Metric) ->
|
key(gauge, Metric) ->
|
||||||
{Metric, 0};
|
{Metric, 0};
|
||||||
|
|
|
@ -377,10 +377,6 @@ init([Parent, #{zone := Zone,
|
||||||
ok = emqx_gc:init(GcPolicy),
|
ok = emqx_gc:init(GcPolicy),
|
||||||
ok = emqx_misc:init_proc_mng_policy(Zone),
|
ok = emqx_misc:init_proc_mng_policy(Zone),
|
||||||
ok = proc_lib:init_ack(Parent, {ok, self()}),
|
ok = proc_lib:init_ack(Parent, {ok, self()}),
|
||||||
MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval,
|
|
||||||
MetricCommitInterval div 2,
|
|
||||||
{metric_commit, MetricCommitInterval}),
|
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
||||||
|
|
||||||
init_mqueue(Zone) ->
|
init_mqueue(Zone) ->
|
||||||
|
@ -425,7 +421,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
|
||||||
{ok, ensure_await_rel_timer(State1)}
|
{ok, ensure_await_rel_timer(State1)}
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
emqx_metrics:inc('messages/qos2/dropped'),
|
emqx_metrics:trans(inc, 'messages/qos2/dropped'),
|
||||||
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
|
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
|
||||||
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
|
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
|
||||||
end);
|
end);
|
||||||
|
@ -436,7 +432,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
|
||||||
true ->
|
true ->
|
||||||
{ok, acked(pubrec, PacketId, State)};
|
{ok, acked(pubrec, PacketId, State)};
|
||||||
false ->
|
false ->
|
||||||
emqx_metrics:inc('packets/pubrec/missed'),
|
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
|
||||||
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
|
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
|
||||||
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
||||||
end);
|
end);
|
||||||
|
@ -447,7 +443,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
|
||||||
{_Ts, AwaitingRel1} ->
|
{_Ts, AwaitingRel1} ->
|
||||||
{ok, State#state{awaiting_rel = AwaitingRel1}};
|
{ok, State#state{awaiting_rel = AwaitingRel1}};
|
||||||
error ->
|
error ->
|
||||||
emqx_metrics:inc('packets/pubrel/missed'),
|
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
|
||||||
?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State),
|
?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State),
|
||||||
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
||||||
end);
|
end);
|
||||||
|
@ -506,7 +502,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
|
||||||
noreply(dequeue(acked(puback, PacketId, State)));
|
noreply(dequeue(acked(puback, PacketId, State)));
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
|
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
|
||||||
emqx_metrics:inc('packets/puback/missed'),
|
emqx_metrics:trans(inc, 'packets/puback/missed'),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -517,7 +513,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
|
||||||
noreply(dequeue(acked(pubcomp, PacketId, State)));
|
noreply(dequeue(acked(pubcomp, PacketId, State)));
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
|
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
|
||||||
emqx_metrics:inc('packets/pubcomp/missed'),
|
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -607,6 +603,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer
|
||||||
handle_info({timeout, Timer, emit_stats},
|
handle_info({timeout, Timer, emit_stats},
|
||||||
State = #state{client_id = ClientId,
|
State = #state{client_id = ClientId,
|
||||||
stats_timer = Timer}) ->
|
stats_timer = Timer}) ->
|
||||||
|
emqx_metrics:commit(),
|
||||||
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
|
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
NewState = State#state{stats_timer = undefined},
|
NewState = State#state{stats_timer = undefined},
|
||||||
Limits = erlang:get(force_shutdown_policy),
|
Limits = erlang:get(force_shutdown_policy),
|
||||||
|
@ -628,11 +625,6 @@ handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, wil
|
||||||
send_willmsg(WillMsg),
|
send_willmsg(WillMsg),
|
||||||
{noreply, State#state{will_msg = undefined}};
|
{noreply, State#state{will_msg = undefined}};
|
||||||
|
|
||||||
handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
|
|
||||||
emqx_metrics:commit(),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
|
handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
|
||||||
send_willmsg(WillMsg),
|
send_willmsg(WillMsg),
|
||||||
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
|
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
|
||||||
|
@ -743,7 +735,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||||
{publish, {PacketId, Msg}} ->
|
{publish, {PacketId, Msg}} ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg) of
|
||||||
true ->
|
true ->
|
||||||
emqx_metrics:inc('messages/expired'),
|
emqx_metrics:trans(inc, 'messages/expired'),
|
||||||
emqx_inflight:delete(PacketId, Inflight);
|
emqx_inflight:delete(PacketId, Inflight);
|
||||||
false ->
|
false ->
|
||||||
redeliver({PacketId, Msg}, State),
|
redeliver({PacketId, Msg}, State),
|
||||||
|
@ -783,7 +775,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
|
||||||
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
||||||
case (timer:now_diff(Now, Ts) div 1000) of
|
case (timer:now_diff(Now, Ts) div 1000) of
|
||||||
Age when Age >= Timeout ->
|
Age when Age >= Timeout ->
|
||||||
emqx_metrics:inc('messages/qos2/expired'),
|
emqx_metrics:trans(inc, 'messages/qos2/expired'),
|
||||||
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State),
|
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State),
|
||||||
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
||||||
Age ->
|
Age ->
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
all() -> [t_inc_dec_metrics].
|
all() -> [t_inc_dec_metrics, t_trans].
|
||||||
|
|
||||||
t_inc_dec_metrics(_) ->
|
t_inc_dec_metrics(_) ->
|
||||||
{ok, _} = emqx_metrics:start_link(),
|
{ok, _} = emqx_metrics:start_link(),
|
||||||
|
@ -29,15 +29,27 @@ t_inc_dec_metrics(_) ->
|
||||||
emqx_metrics:inc(counter, 'bytes/received', 2),
|
emqx_metrics:inc(counter, 'bytes/received', 2),
|
||||||
emqx_metrics:inc({gauge, 'messages/retained'}, 2),
|
emqx_metrics:inc({gauge, 'messages/retained'}, 2),
|
||||||
emqx_metrics:inc(gauge, 'messages/retained', 2),
|
emqx_metrics:inc(gauge, 'messages/retained', 2),
|
||||||
emqx_metrics:commit(),
|
|
||||||
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
|
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
|
||||||
emqx_metrics:dec(gauge, 'messages/retained'),
|
emqx_metrics:dec(gauge, 'messages/retained'),
|
||||||
emqx_metrics:dec(gauge, 'messages/retained', 1),
|
emqx_metrics:dec(gauge, 'messages/retained', 1),
|
||||||
emqx_metrics:commit(),
|
|
||||||
2 = emqx_metrics:val('messages/retained'),
|
2 = emqx_metrics:val('messages/retained'),
|
||||||
emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
|
emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
|
||||||
emqx_metrics:commit(),
|
|
||||||
{1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
|
{1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
|
||||||
emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
|
emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
|
||||||
emqx_metrics:commit(),
|
|
||||||
{1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.
|
{1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.
|
||||||
|
|
||||||
|
t_trans(_) ->
|
||||||
|
{ok, _} = emqx_metrics:start_link(),
|
||||||
|
emqx_metrics:trans(inc, 'bytes/received'),
|
||||||
|
emqx_metrics:trans(inc, {counter, 'bytes/received'}, 2),
|
||||||
|
emqx_metrics:trans(inc, counter, 'bytes/received', 2),
|
||||||
|
emqx_metrics:trans(inc, {gauge, 'messages/retained'}, 2),
|
||||||
|
emqx_metrics:trans(inc, gauge, 'messages/retained', 2),
|
||||||
|
{0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
|
||||||
|
emqx_metrics:commit(),
|
||||||
|
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
|
||||||
|
emqx_metrics:trans(dec, gauge, 'messages/retained'),
|
||||||
|
emqx_metrics:trans(dec, gauge, 'messages/retained', 1),
|
||||||
|
4 = emqx_metrics:val('messages/retained'),
|
||||||
|
emqx_metrics:commit(),
|
||||||
|
2 = emqx_metrics:val('messages/retained').
|
||||||
|
|
|
@ -45,6 +45,7 @@ ignore_loop(_Config) ->
|
||||||
application:set_env(emqx, mqtt_ignore_loop_deliver, false).
|
application:set_env(emqx, mqtt_ignore_loop_deliver, false).
|
||||||
|
|
||||||
t_session_all(_) ->
|
t_session_all(_) ->
|
||||||
|
emqx_zone:set_env(internal, idle_timeout, 100),
|
||||||
application:set_env(emqx, metric_commit_interval, 10),
|
application:set_env(emqx, metric_commit_interval, 10),
|
||||||
ClientId = <<"ClientId">>,
|
ClientId = <<"ClientId">>,
|
||||||
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
|
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
|
||||||
|
|
Loading…
Reference in New Issue