diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 038ebca19..3f8dfdb8d 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -36,7 +36,6 @@ authenticate(ClientInfo = #{zone := Zone}) -> case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of Result = #{auth_result := success, anonymous := true} -> - emqx_metrics:inc('auth.mqtt.anonymous'), {ok, Result}; Result = #{auth_result := success} -> {ok, Result}; diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 7950f2bca..850b466d3 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -160,9 +160,9 @@ do_subscribe(Group, Topic, SubPid, SubOpts) -> true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), emqx_shared_sub:subscribe(Group, Topic, SubPid). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Unsubscribe API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(unsubscribe(emqx_topic:topic()) -> ok). unsubscribe(Topic) when is_binary(Topic) -> @@ -198,43 +198,46 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> -spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), - Msg1 = emqx_message:set_header(allow_publish, true, - emqx_message:clean_dup(Msg)), - case emqx_hooks:run_fold('message.publish', [], Msg1) of + emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), + case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> - ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg1)]), + ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]), []; - #message{topic = Topic} = Msg2 -> - route(aggre(emqx_router:match_routes(Topic)), delivery(Msg2)) + Msg1 = #message{topic = Topic} -> + route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. %% Called internally --spec(safe_publish(emqx_types:message()) -> ok | emqx_types:publish_result()). +-spec(safe_publish(emqx_types:message()) -> emqx_types:publish_result()). safe_publish(Msg) when is_record(Msg, message) -> try publish(Msg) catch - _:Error:Stacktrace -> - ?LOG(error, "Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) + _:Error:Stk-> + ?LOG(error, "Publish error: ~p~n~s~n~p", + [Error, emqx_message:format(Msg), Stk]) after - ok + [] end. -delivery(Msg) -> - #delivery{sender = self(), message = Msg}. +-compile({inline, [delivery/1]}). +delivery(Msg) -> #delivery{sender = self(), message = Msg}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Route -%%------------------------------------------------------------------------------ --spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()). +%%-------------------------------------------------------------------- + +-spec(route([emqx_types:route_entry()], emqx_types:delivery()) + -> emqx_types:publish_result()). route([], #delivery{message = Msg}) -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - inc_dropped_cnt(Msg#message.topic), + ok = inc_dropped_cnt(Msg), []; + route(Routes, Delivery) -> lists:foldl(fun(Route, Acc) -> - [do_route(Route, Delivery) | Acc] - end, [], Routes). + [do_route(Route, Delivery) | Acc] + end, [], Routes). do_route({To, Node}, Delivery) when Node =:= node() -> {Node, To, dispatch(To, Delivery)}; @@ -243,8 +246,7 @@ do_route({To, Node}, Delivery) when is_atom(Node) -> do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. -aggre([]) -> - []; +aggre([]) -> []; aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> [{To, Node}]; aggre([#route{topic = To, dest = {Group, _Node}}]) -> @@ -258,41 +260,40 @@ aggre(Routes) -> end, [], Routes). %% @doc Forward message to another node. --spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) +-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync|async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of - true -> ok; + true -> emqx_metrics:inc('messages.forward'); {badrpc, Reason} -> - ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), + ?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]), {error, badrpc} end; forward(Node, To, Delivery, sync) -> case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), + ?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]), {error, badrpc}; - Result -> Result + Result -> + emqx_metrics:inc('messages.forward'), Result end. -spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). dispatch(Topic, #delivery{message = Msg}) -> case subscribers(Topic) of - [] -> - emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - inc_dropped_cnt(Topic), - {error, no_subscribers}; + [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), + _ = inc_dropped_cnt(Topic), + {error, no_subscribers}; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg); - Subs -> - lists:foldl( - fun(Sub, Res) -> - case dispatch(Sub, Topic, Msg) of - ok -> Res; - Err -> Err - end - end, ok, Subs) + Subs -> lists:foldl( + fun(Sub, Res) -> + case dispatch(Sub, Topic, Msg) of + ok -> Res; + Err -> Err + end + end, ok, Subs) end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> @@ -302,6 +303,7 @@ dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> ok; false -> {error, subscriber_die} end; + dispatch({shard, I}, Topic, Msg) -> lists:foldl( fun(SubPid, Res) -> @@ -311,20 +313,24 @@ dispatch({shard, I}, Topic, Msg) -> end end, ok, subscribers({shard, Topic, I})). -inc_dropped_cnt(<<"$SYS/", _/binary>>) -> - ok; -inc_dropped_cnt(_Topic) -> - emqx_metrics:inc('messages.dropped'). +-compile({inline, [inc_dropped_cnt/1]}). +inc_dropped_cnt(Msg) -> + case emqx_message:is_sys(Msg) of + true -> ok; + false -> ok = emqx_metrics:inc('messages.dropped'), + emqx_metrics:inc('messages.dropped.no_subscribers') + end. +-compile({inline, [subscribers/1]}). -spec(subscribers(emqx_topic:topic()) -> [pid()]). subscribers(Topic) when is_binary(Topic) -> lookup_value(?SUBSCRIBER, Topic, []); subscribers(Shard = {shard, _Topic, _I}) -> lookup_value(?SUBSCRIBER, Shard, []). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Subscriber is down -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(subscriber_down(pid()) -> true). subscriber_down(SubPid) -> @@ -345,9 +351,9 @@ subscriber_down(SubPid) -> end, lookup_value(?SUBSCRIPTION, SubPid, [])), ets:delete(?SUBSCRIPTION, SubPid). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Management APIs -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(subscriptions(pid() | emqx_types:subid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). @@ -410,9 +416,11 @@ safe_update_stats(Tab, Stat, MaxStat) -> Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% call, cast, pick -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- + +-compile({inline, [call/2, cast/2, pick/1]}). call(Broker, Req) -> gen_server:call(Broker, Req). @@ -424,9 +432,9 @@ cast(Broker, Msg) -> pick(Topic) -> gproc_pool:pick_worker(broker_pool, Topic). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), @@ -490,7 +498,7 @@ terminate(_Reason, #{pool := Pool, id := Id}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 6e85acf26..7d2bf8f23 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -223,30 +223,30 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_out(disconnect, ReasonCode, Channel) end; -handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), - Channel = #channel{clientinfo = ClientInfo, session = Session}) -> +handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel + = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:puback(PacketId, Session) of {ok, Msg, NSession} -> - ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), + ok = after_message_acked(ClientInfo, Msg), {ok, Channel#channel{session = NSession}}; {ok, Msg, Publishes, NSession} -> - ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), + ok = after_message_acked(ClientInfo, Msg), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.puback.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), {ok, Channel} end; -handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), - Channel = #channel{clientinfo = ClientInfo, session = Session}) -> +handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel + = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> - ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), + ok = after_message_acked(ClientInfo, Msg), NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -265,8 +265,8 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> + ?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.pubrel.missed'), - ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), handle_out(pubcomp, {PacketId, RC}, Channel) end; @@ -346,8 +346,7 @@ handle_in(Packet, Channel) -> %%-------------------------------------------------------------------- process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, - Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> + Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, @@ -378,17 +377,17 @@ process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) -> fun check_pub_caps/2 ], Packet, Channel) of {ok, NPacket, NChannel} -> - Msg = packet_to_msg(NPacket, NChannel), + Msg = packet_to_message(NPacket, NChannel), do_publish(PacketId, Msg, NChannel); {error, ReasonCode, NChannel} -> - ?LOG(warning, "Cannot publish message to ~s due to ~s", + ?LOG(warning, "Cannot publish message to ~s due to ~s.", [Topic, emqx_reason_codes:text(ReasonCode)]), handle_out(disconnect, ReasonCode, NChannel) end. -packet_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, - clientinfo = ClientInfo = - #{mountpoint := MountPoint}}) -> +packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, + clientinfo = ClientInfo = + #{mountpoint := MountPoint}}) -> emqx_mountpoint:mount( MountPoint, emqx_packet:to_message( ClientInfo, #{proto_ver => ProtoVer}, Packet)). @@ -414,8 +413,9 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full.", [PacketId]), - ok = emqx_metrics:inc('messages.qos2.dropped'), + ?LOG(warning, "Dropped the qos2 packet ~w " + "due to awaiting_rel is full.", [PacketId]), + ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(pubrec, {PacketId, RC}, Channel) end. @@ -423,6 +423,11 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback_reason_code([_|_]) -> ?RC_SUCCESS. +-compile({inline, [after_message_acked/2]}). +after_message_acked(ClientInfo, Msg) -> + ok = emqx_metrics:inc('messages.acked'), + emqx_hooks:run('message.acked', [ClientInfo, Msg]). + %%-------------------------------------------------------------------- %% Process Subscribe %%-------------------------------------------------------------------- @@ -569,17 +574,8 @@ handle_out(connack, {ReasonCode, _ConnPkt}, handle_out(publish, [], Channel) -> {ok, Channel}; -handle_out(publish, [{pubrel, PacketId}], Channel) -> - handle_out(pubrel, {PacketId, ?RC_SUCCESS}, Channel); - -handle_out(publish, [Pub = {_PacketId, Msg}], Channel) -> - case ignore_local(Msg, Channel) of - true -> {ok, Channel}; - false -> {ok, pub_to_packet(Pub, Channel), Channel} - end; - handle_out(publish, Publishes, Channel) -> - Packets = handle_publish(Publishes, Channel), + Packets = do_deliver(Publishes, Channel), {ok, {outgoing, Packets}, Channel}; handle_out(puback, {PacketId, ReasonCode}, Channel) -> @@ -641,45 +637,50 @@ return_connack(AckPacket, Channel = #channel{conninfo = ConnInfo, resuming = false, pendings = [] }, - Packets = handle_publish(Publishes, NChannel), + Packets = do_deliver(Publishes, NChannel), Outgoing = [{outgoing, Packets} || length(Packets) > 0], {ok, Replies ++ Outgoing, NChannel} end. %%-------------------------------------------------------------------- -%% Handle out Publish +%% Deliver publish: broker -> client %%-------------------------------------------------------------------- --compile({inline, [handle_publish/2]}). -handle_publish(Publishes, Channel) -> - handle_publish(Publishes, [], Channel). +%% return list(emqx_types:packet()) +do_deliver({pubrel, PacketId}, _Channel) -> + [?PUBREL_PACKET(PacketId, ?RC_SUCCESS)]; -%% Handle out publish -handle_publish([], Acc, _Channel) -> - lists:reverse(Acc); - -handle_publish([{pubrel, PacketId}|More], Acc, Channel) -> - Packet = ?PUBREL_PACKET(PacketId, ?RC_SUCCESS), - handle_publish(More, [Packet|Acc], Channel); - -handle_publish([Pub = {_PacketId, Msg}|More], Acc, Channel) -> - case ignore_local(Msg, Channel) of - true -> handle_publish(More, Acc, Channel); +do_deliver({PacketId, Msg}, #channel{clientinfo = ClientInfo = + #{mountpoint := MountPoint}}) -> + case ignore_local(Msg, ClientInfo) of + true -> + ok = emqx_metrics:inc('delivery.dropped'), + ok = emqx_metrics:inc('delivery.dropped.no_local'), + []; false -> - Packet = pub_to_packet(Pub, Channel), - handle_publish(More, [Packet|Acc], Channel) - end. + ok = emqx_metrics:inc('messages.delivered'), + Msg1 = emqx_hooks:run_fold('message.delivered', + [ClientInfo], + emqx_message:update_expiry(Msg) + ), + Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), + [emqx_message:to_packet(PacketId, Msg2)] + end; -pub_to_packet({PacketId, Msg}, #channel{clientinfo = ClientInfo}) -> - Msg1 = emqx_hooks:run_fold('message.delivered', [ClientInfo], - emqx_message:update_expiry(Msg)), - Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, ClientInfo), Msg1), - emqx_message:to_packet(PacketId, Msg2). +do_deliver([Publish], Channel) -> + do_deliver(Publish, Channel); + +do_deliver(Publishes, Channel) when is_list(Publishes) -> + lists:reverse( + lists:foldl( + fun(Publish, Acc) -> + lists:append(do_deliver(Publish, Channel), Acc) + end, [], Publishes)). ignore_local(#message{flags = #{nl := true}, from = ClientId}, - #channel{clientinfo = #{clientid := ClientId}}) -> + #{clientid := ClientId}) -> true; -ignore_local(_Msg, _Channel) -> false. +ignore_local(_Msg, _ClientInfo) -> false. %%-------------------------------------------------------------------- %% Handle out suback @@ -1010,6 +1011,8 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId, #channel{clientinfo = ClientInfo} = Channel) -> case emqx_access_control:authenticate(ClientInfo#{password => Password}) of {ok, AuthResult} -> + is_anonymous(AuthResult) andalso + emqx_metrics:inc('client.auth.anonymous'), NClientInfo = maps:merge(ClientInfo, AuthResult), {ok, Channel#channel{clientinfo = NClientInfo}}; {error, Reason} -> @@ -1018,6 +1021,9 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId, {error, emqx_reason_codes:connack_error(Reason)} end. +is_anonymous(#{anonymous := true}) -> true; +is_anonymous(_AuthResult) -> false. + %%-------------------------------------------------------------------- %% Process Topic Alias diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 40ec230e9..774c0ba42 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -560,6 +560,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> case Serialize(Packet) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), + ok = emqx_metrics:inc('delivery.dropped.too_large'), + ok = emqx_metrics:inc('delivery.dropped'), <<>>; Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), @@ -627,8 +629,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> run_gc(Stats, State = #state{gc_state = GcSt}) -> case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of false -> State; - {IsGC, GcSt1} -> - IsGC andalso emqx_metrics:inc('channel.gc'), + {_IsGC, GcSt1} -> State#state{gc_state = GcSt1} end. diff --git a/src/emqx_message.erl b/src/emqx_message.erl index d117036a4..3f0d874dc 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -38,7 +38,8 @@ ]). %% Flags --export([ clean_dup/1 +-export([ is_sys/1 + , clean_dup/1 , get_flag/2 , get_flag/3 , get_flags/1 @@ -112,6 +113,13 @@ payload(#message{payload = Payload}) -> Payload. -spec(timestamp(emqx_types:message()) -> integer()). timestamp(#message{timestamp = TS}) -> TS. +-spec(is_sys(emqx_types:message()) -> boolean()). +is_sys(#message{flags = #{sys := true}}) -> + true; +is_sys(#message{topic = <<"$SYS/", _/binary>>}) -> + true; +is_sys(_Msg) -> false. + -spec(clean_dup(emqx_types:message()) -> emqx_types:message()). clean_dup(Msg = #message{flags = Flags = #{dup := true}}) -> Msg#message{flags = Flags#{dup => false}}; diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index d92402caa..76e14fab0 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -62,88 +62,118 @@ -export_type([metric_idx/0]). +-compile({inline, [inc/1, inc/2, dec/1, dec/2]}). +-compile({inline, [inc_recv/1, inc_sent/1]}). + -opaque(metric_idx() :: 1..1024). -type(metric_name() :: atom() | string() | binary()). -define(MAX_SIZE, 1024). --define(RESERVED_IDX, 256). +-define(RESERVED_IDX, 512). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). -%% Bytes sent and received of broker --define(BYTES_METRICS, [ - {counter, 'bytes.received'}, % Total bytes received - {counter, 'bytes.sent'} % Total bytes sent -]). +%% Bytes sent and received +-define(BYTES_METRICS, + [{counter, 'bytes.received'}, % Total bytes received + {counter, 'bytes.sent'} % Total bytes sent + ]). -%% Packets sent and received of broker --define(PACKET_METRICS, [ - {counter, 'packets.received'}, % All Packets received - {counter, 'packets.sent'}, % All Packets sent - {counter, 'packets.connect.received'}, % CONNECT Packets received - {counter, 'packets.connack.sent'}, % CONNACK Packets sent - {counter, 'packets.connack.error'}, % CONNACK error sent - {counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent - {counter, 'packets.publish.received'}, % PUBLISH packets received - {counter, 'packets.publish.sent'}, % PUBLISH packets sent - {counter, 'packets.publish.error'}, % PUBLISH failed for error - {counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error - {counter, 'packets.puback.received'}, % PUBACK packets received - {counter, 'packets.puback.sent'}, % PUBACK packets sent - {counter, 'packets.puback.missed'}, % PUBACK packets missed - {counter, 'packets.pubrec.received'}, % PUBREC packets received - {counter, 'packets.pubrec.sent'}, % PUBREC packets sent - {counter, 'packets.pubrec.inuse'}, % PUBREC packet_id inuse - {counter, 'packets.pubrec.missed'}, % PUBREC packets missed - {counter, 'packets.pubrel.received'}, % PUBREL packets received - {counter, 'packets.pubrel.sent'}, % PUBREL packets sent - {counter, 'packets.pubrel.missed'}, % PUBREL packets missed - {counter, 'packets.pubcomp.received'}, % PUBCOMP packets received - {counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent - {counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse - {counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed - {counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received - {counter, 'packets.subscribe.error'}, % SUBSCRIBE error - {counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth - {counter, 'packets.suback.sent'}, % SUBACK packets sent - {counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received - {counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error - {counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent - {counter, 'packets.pingreq.received'}, % PINGREQ packets received - {counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent - {counter, 'packets.disconnect.received'}, % DISCONNECT Packets received - {counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent - {counter, 'packets.auth.received'}, % Auth Packets received - {counter, 'packets.auth.sent'} % Auth Packets sent -]). +%% Packets sent and received +-define(PACKET_METRICS, + [{counter, 'packets.received'}, % All Packets received + {counter, 'packets.sent'}, % All Packets sent + {counter, 'packets.connect.received'}, % CONNECT Packets received + {counter, 'packets.connack.sent'}, % CONNACK Packets sent + {counter, 'packets.connack.error'}, % CONNACK error sent + {counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent + {counter, 'packets.publish.received'}, % PUBLISH packets received + {counter, 'packets.publish.sent'}, % PUBLISH packets sent + {counter, 'packets.publish.error'}, % PUBLISH failed for error + {counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error + {counter, 'packets.publish.dropped'}, % PUBLISH(QoS2) packets dropped + {counter, 'packets.puback.received'}, % PUBACK packets received + {counter, 'packets.puback.sent'}, % PUBACK packets sent + {counter, 'packets.puback.inuse'}, % PUBACK packet_id inuse + {counter, 'packets.puback.missed'}, % PUBACK packets missed + {counter, 'packets.pubrec.received'}, % PUBREC packets received + {counter, 'packets.pubrec.sent'}, % PUBREC packets sent + {counter, 'packets.pubrec.inuse'}, % PUBREC packet_id inuse + {counter, 'packets.pubrec.missed'}, % PUBREC packets missed + {counter, 'packets.pubrel.received'}, % PUBREL packets received + {counter, 'packets.pubrel.sent'}, % PUBREL packets sent + {counter, 'packets.pubrel.missed'}, % PUBREL packets missed + {counter, 'packets.pubcomp.received'}, % PUBCOMP packets received + {counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent + {counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse + {counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed + {counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received + {counter, 'packets.subscribe.error'}, % SUBSCRIBE error + {counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth + {counter, 'packets.suback.sent'}, % SUBACK packets sent + {counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received + {counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error + {counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent + {counter, 'packets.pingreq.received'}, % PINGREQ packets received + {counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent + {counter, 'packets.disconnect.received'}, % DISCONNECT Packets received + {counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent + {counter, 'packets.auth.received'}, % Auth Packets received + {counter, 'packets.auth.sent'} % Auth Packets sent + ]). -%% Messages sent and received of broker --define(MESSAGE_METRICS, [ - {counter, 'messages.received'}, % All Messages received - {counter, 'messages.sent'}, % All Messages sent - {counter, 'messages.qos0.received'}, % QoS0 Messages received - {counter, 'messages.qos0.sent'}, % QoS0 Messages sent - {counter, 'messages.qos1.received'}, % QoS1 Messages received - {counter, 'messages.qos1.sent'}, % QoS1 Messages sent - {counter, 'messages.qos2.received'}, % QoS2 Messages received - {counter, 'messages.qos2.expired'}, % QoS2 Messages expired - {counter, 'messages.qos2.sent'}, % QoS2 Messages sent - {counter, 'messages.qos2.dropped'}, % QoS2 Messages dropped - {gauge, 'messages.retained'}, % Messagea retained - {counter, 'messages.dropped'}, % Messages dropped - {counter, 'messages.expired'}, % Messages expired - {counter, 'messages.forward'} % Messages forward -]). +%% Messages sent/received and pubsub +-define(MESSAGE_METRICS, + [{counter, 'messages.received'}, % All Messages received + {counter, 'messages.sent'}, % All Messages sent + {counter, 'messages.qos0.received'}, % QoS0 Messages received + {counter, 'messages.qos0.sent'}, % QoS0 Messages sent + {counter, 'messages.qos1.received'}, % QoS1 Messages received + {counter, 'messages.qos1.sent'}, % QoS1 Messages sent + {counter, 'messages.qos2.received'}, % QoS2 Messages received + {counter, 'messages.qos2.sent'}, % QoS2 Messages sent + %% PubSub Metrics + {counter, 'messages.publish'}, % Messages Publish + {counter, 'messages.dropped'}, % Messages dropped due to no subscribers + {counter, 'messages.dropped.expired'}, % QoS2 Messages expired + {counter, 'messages.dropped.no_subscribers'}, % Messages dropped + {counter, 'messages.forward'}, % Messages forward + {gauge, 'messages.retained'}, % Messages retained + {gauge, 'messages.delayed'}, % Messages delayed + {counter, 'messages.delivered'}, % Messages delivered + {counter, 'messages.acked'} % Messages acked + ]). --define(CHAN_METRICS, [ - {counter, 'channel.gc'} -]). +%% Delivery metrics +-define(DELIVERY_METRICS, + [{counter, 'delivery.dropped'}, + {counter, 'delivery.dropped.no_local'}, + {counter, 'delivery.dropped.too_large'}, + {counter, 'delivery.dropped.qos0_msg'}, + {counter, 'delivery.dropped.queue_full'}, + {counter, 'delivery.dropped.expired'} + ]). --define(MQTT_METRICS, [ - {counter, 'auth.mqtt.anonymous'} -]). +%% Client Lifecircle metrics +-define(CLIENT_METRICS, + [{counter, 'client.connected'}, + {cpunter, 'client.authenticate'}, + {counter, 'client.auth.anonymous'}, + {counter, 'client.check_acl'}, + {counter, 'client.subscribe'}, + {counter, 'client.unsubscribe'}, + {counter, 'client.disconnected'} + ]). +%% Session Lifecircle metrics +-define(SESSION_METRICS, + [{counter, 'session.created'}, + {counter, 'session.resumed'}, + {counter, 'session.takeovered'}, + {counter, 'session.discarded'}, + {counter, 'session.terminated'} + ]). -record(state, {next_idx = 1}). @@ -155,8 +185,7 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec(stop() -> ok). -stop() -> - gen_server:stop(?SERVER). +stop() -> gen_server:stop(?SERVER). %%-------------------------------------------------------------------- %% Metrics API @@ -265,7 +294,7 @@ update_counter(Name, Value) -> counters:add(CRef, CIdx, Value). %%-------------------------------------------------------------------- -%% Inc Received/Sent metrics +%% Inc received/sent metrics %%-------------------------------------------------------------------- %% @doc Inc packets received. @@ -276,13 +305,13 @@ inc_recv(Packet) -> do_inc_recv(?PACKET(?CONNECT)) -> inc('packets.connect.received'); -do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) -> +do_inc_recv(?PUBLISH_PACKET(QoS)) -> inc('messages.received'), case QoS of ?QOS_0 -> inc('messages.qos0.received'); ?QOS_1 -> inc('messages.qos1.received'); ?QOS_2 -> inc('messages.qos2.received'); - _ -> ok + _other -> ok end, inc('packets.publish.received'); do_inc_recv(?PACKET(?PUBACK)) -> @@ -319,12 +348,13 @@ do_inc_sent(?CONNACK_PACKET(ReasonCode)) -> (ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'), inc('packets.connack.sent'); -do_inc_sent(?PUBLISH_PACKET(QoS, _PacketId)) -> +do_inc_sent(?PUBLISH_PACKET(QoS)) -> inc('messages.sent'), case QoS of ?QOS_0 -> inc('messages.qos0.sent'); ?QOS_1 -> inc('messages.qos1.sent'); - ?QOS_2 -> inc('messages.qos2.sent') + ?QOS_2 -> inc('messages.qos2.sent'); + _other -> ok end, inc('packets.publish.sent'); do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) -> @@ -360,14 +390,21 @@ init([]) -> CRef = counters:new(?MAX_SIZE, [write_concurrency]), ok = persistent_term:put(?MODULE, CRef), % Create index mapping table - ok = emqx_tables:new(?TAB, [protected, {keypos, 2}, {read_concurrency, true}]), + ok = emqx_tables:new(?TAB, [{keypos, 2}, {read_concurrency, true}]), + Metrics = lists:append([?BYTES_METRICS, + ?PACKET_METRICS, + ?MESSAGE_METRICS, + ?DELIVERY_METRICS, + ?CLIENT_METRICS, + ?SESSION_METRICS + ]), % Store reserved indices - lists:foreach(fun({Type, Name}) -> - Idx = reserved_idx(Name), - Metric = #metric{name = Name, type = Type, idx = Idx}, - true = ets:insert(?TAB, Metric), - ok = counters:put(CRef, Idx, 0) - end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS), + ok = lists:foreach(fun({Type, Name}) -> + Idx = reserved_idx(Name), + Metric = #metric{name = Name, type = Type, idx = Idx}, + true = ets:insert(?TAB, Metric), + ok = counters:put(CRef, Idx, 0) + end, Metrics), {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> @@ -409,58 +446,85 @@ code_change(_OldVsn, State, _Extra) -> reserved_idx('bytes.received') -> 01; reserved_idx('bytes.sent') -> 02; -reserved_idx('packets.received') -> 03; -reserved_idx('packets.sent') -> 04; -reserved_idx('packets.connect.received') -> 05; -reserved_idx('packets.connack.sent') -> 06; -reserved_idx('packets.connack.error') -> 07; -reserved_idx('packets.connack.auth_error') -> 08; -reserved_idx('packets.publish.received') -> 09; -reserved_idx('packets.publish.sent') -> 10; -reserved_idx('packets.publish.error') -> 11; -reserved_idx('packets.publish.auth_error') -> 12; -reserved_idx('packets.puback.received') -> 13; -reserved_idx('packets.puback.sent') -> 14; -reserved_idx('packets.puback.missed') -> 15; -reserved_idx('packets.pubrec.received') -> 16; -reserved_idx('packets.pubrec.sent') -> 17; -reserved_idx('packets.pubrec.missed') -> 18; -reserved_idx('packets.pubrel.received') -> 19; -reserved_idx('packets.pubrel.sent') -> 20; -reserved_idx('packets.pubrel.missed') -> 21; -reserved_idx('packets.pubcomp.received') -> 22; -reserved_idx('packets.pubcomp.sent') -> 23; -reserved_idx('packets.pubcomp.missed') -> 24; -reserved_idx('packets.subscribe.received') -> 25; -reserved_idx('packets.subscribe.error') -> 26; -reserved_idx('packets.subscribe.auth_error') -> 27; -reserved_idx('packets.suback.sent') -> 28; -reserved_idx('packets.unsubscribe.received') -> 29; -reserved_idx('packets.unsubscribe.error') -> 30; -reserved_idx('packets.unsuback.sent') -> 31; -reserved_idx('packets.pingreq.received') -> 32; -reserved_idx('packets.pingresp.sent') -> 33; -reserved_idx('packets.disconnect.received') -> 34; -reserved_idx('packets.disconnect.sent') -> 35; -reserved_idx('packets.auth.received') -> 36; -reserved_idx('packets.auth.sent') -> 37; -reserved_idx('messages.received') -> 38; -reserved_idx('messages.sent') -> 39; -reserved_idx('messages.qos0.received') -> 40; -reserved_idx('messages.qos0.sent') -> 41; -reserved_idx('messages.qos1.received') -> 42; -reserved_idx('messages.qos1.sent') -> 43; -reserved_idx('messages.qos2.received') -> 44; -reserved_idx('messages.qos2.expired') -> 45; -reserved_idx('messages.qos2.sent') -> 46; -reserved_idx('messages.qos2.dropped') -> 47; -reserved_idx('messages.retained') -> 48; -reserved_idx('messages.dropped') -> 49; -reserved_idx('messages.expired') -> 50; -reserved_idx('messages.forward') -> 51; -reserved_idx('auth.mqtt.anonymous') -> 52; -reserved_idx('channel.gc') -> 53; -reserved_idx('packets.pubrec.inuse') -> 54; -reserved_idx('packets.pubcomp.inuse') -> 55; +%% Reserved indices of packet's metrics +reserved_idx('packets.received') -> 10; +reserved_idx('packets.sent') -> 11; +reserved_idx('packets.connect.received') -> 12; +reserved_idx('packets.connack.sent') -> 13; +reserved_idx('packets.connack.error') -> 14; +reserved_idx('packets.connack.auth_error') -> 15; +reserved_idx('packets.publish.received') -> 16; +reserved_idx('packets.publish.sent') -> 17; +reserved_idx('packets.publish.error') -> 18; +reserved_idx('packets.publish.auth_error') -> 19; +reserved_idx('packets.puback.received') -> 20; +reserved_idx('packets.puback.sent') -> 21; +reserved_idx('packets.puback.inuse') -> 22; +reserved_idx('packets.puback.missed') -> 23; +reserved_idx('packets.pubrec.received') -> 24; +reserved_idx('packets.pubrec.sent') -> 25; +reserved_idx('packets.pubrec.inuse') -> 26; +reserved_idx('packets.pubrec.missed') -> 27; +reserved_idx('packets.pubrel.received') -> 28; +reserved_idx('packets.pubrel.sent') -> 29; +reserved_idx('packets.pubrel.missed') -> 30; +reserved_idx('packets.pubcomp.received') -> 31; +reserved_idx('packets.pubcomp.sent') -> 32; +reserved_idx('packets.pubcomp.inuse') -> 33; +reserved_idx('packets.pubcomp.missed') -> 34; +reserved_idx('packets.subscribe.received') -> 35; +reserved_idx('packets.subscribe.error') -> 36; +reserved_idx('packets.subscribe.auth_error') -> 37; +reserved_idx('packets.suback.sent') -> 38; +reserved_idx('packets.unsubscribe.received') -> 39; +reserved_idx('packets.unsubscribe.error') -> 40; +reserved_idx('packets.unsuback.sent') -> 41; +reserved_idx('packets.pingreq.received') -> 42; +reserved_idx('packets.pingresp.sent') -> 43; +reserved_idx('packets.disconnect.received') -> 44; +reserved_idx('packets.disconnect.sent') -> 45; +reserved_idx('packets.auth.received') -> 46; +reserved_idx('packets.auth.sent') -> 47; +reserved_idx('packets.publish.dropped') -> 48; +%% Reserved indices of message's metrics +reserved_idx('messages.received') -> 100; +reserved_idx('messages.sent') -> 101; +reserved_idx('messages.qos0.received') -> 102; +reserved_idx('messages.qos0.sent') -> 103; +reserved_idx('messages.qos1.received') -> 104; +reserved_idx('messages.qos1.sent') -> 105; +reserved_idx('messages.qos2.received') -> 106; +reserved_idx('messages.qos2.sent') -> 107; +reserved_idx('messages.publish') -> 108; +reserved_idx('messages.dropped') -> 109; +reserved_idx('messages.dropped.expired') -> 110; +reserved_idx('messages.dropped.no_subscribers') -> 111; +reserved_idx('messages.forward') -> 112; +reserved_idx('messages.retained') -> 113; +reserved_idx('messages.delayed') -> 114; +reserved_idx('messages.delivered') -> 115; +reserved_idx('messages.acked') -> 116; +reserved_idx('delivery.expired') -> 117; +reserved_idx('delivery.dropped') -> 118; +reserved_idx('delivery.dropped.no_local') -> 119; +reserved_idx('delivery.dropped.too_large') -> 120; +reserved_idx('delivery.dropped.qos0_msg') -> 121; +reserved_idx('delivery.dropped.queue_full') -> 122; +reserved_idx('delivery.dropped.expired') -> 123; + +reserved_idx('client.connected') -> 200; +reserved_idx('client.authenticate') -> 201; +reserved_idx('client.auth.anonymous') -> 202; +reserved_idx('client.check_acl') -> 203; +reserved_idx('client.subscribe') -> 204; +reserved_idx('client.unsubscribe') -> 205; +reserved_idx('client.disconnected') -> 206; + +reserved_idx('session.created') -> 220; +reserved_idx('session.resumed') -> 221; +reserved_idx('session.takeovered') -> 222; +reserved_idx('session.discarded') -> 223; +reserved_idx('session.terminated') -> 224; + reserved_idx(_) -> undefined. diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 12154c184..f02304f24 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -53,7 +53,10 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). --export([init/1]). +-export([ init/1 + , info/1 + , info/2 + ]). -export([ is_empty/1 , len/1 @@ -86,6 +89,7 @@ -define(LOWEST_PRIORITY, 0). -define(HIGHEST_PRIORITY, infinity). -define(MAX_LEN_INFINITY, 0). +-define(INFO_KEYS, [store_qos0, max_len, len, dropped]). -record(mqueue, { store_qos0 = false :: boolean(), @@ -111,6 +115,20 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> default_p = get_priority_opt(Opts) }. +-spec(info(mqueue()) -> emqx_types:infos()). +info(MQ) -> + maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]). + +-spec(info(atom(), mqueue()) -> term()). +info(store_qos0, #mqueue{store_qos0 = True}) -> + True; +info(max_len, #mqueue{max_len = MaxLen}) -> + MaxLen; +info(len, #mqueue{len = Len}) -> + Len; +info(dropped, #mqueue{dropped = Dropped}) -> + Dropped. + is_empty(#mqueue{len = Len}) -> Len =:= 0. len(#mqueue{len = Len}) -> Len. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9561efef9..f9e304aa0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -398,18 +398,17 @@ dequeue(0, Msgs, Q) -> dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> dequeue(0, Msgs, Q); - {{value, Msg = #message{qos = ?QOS_0}}, Q1} -> - dequeue(Cnt, acc_msg(Msg, Msgs), Q1); {{value, Msg}, Q1} -> - dequeue(Cnt-1, acc_msg(Msg, Msgs), Q1) + case emqx_message:is_expired(Msg) of + true -> ok = inc_expired_cnt(delivery), + dequeue(Cnt, Msgs, Q1); + false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) + end end. --compile({inline, [acc_msg/2]}). -acc_msg(Msg, Msgs) -> - case emqx_message:is_expired(Msg) of - true -> Msgs; - false -> [Msg|Msgs] - end. +-compile({inline, [acc_cnt/2]}). +acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt; +acc_cnt(_Msg, Cnt) -> Cnt - 1. %%-------------------------------------------------------------------- %% Broker -> Client: Deliver @@ -467,13 +466,20 @@ enqueue(Delivers, Session) when is_list(Delivers) -> enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - if is_record(Dropped, message) -> - ?LOG(warning, "Dropped msg due to mqueue is full: ~s", - [emqx_message:format(Dropped)]); - true -> ok - end, + (Dropped =/= undefined) andalso log_dropped(Dropped, Session), Session#session{mqueue = NewQ}. +log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> + case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of + true -> + ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), + ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); + false -> + ok = emqx_metrics:inc('delivery.dropped.queue_full'), + ?LOG(warning, "Dropped msg due to mqueue is full: ~s", + [emqx_message:format(Msg)]) + end. + enrich_fun(Session = #session{subscriptions = Subs}) -> fun({deliver, Topic, Msg}) -> enrich_subopts(get_subopts(Topic, Subs), Msg, Session) @@ -555,6 +561,7 @@ retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> + ok = inc_expired_cnt(delivery), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), @@ -581,6 +588,8 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), + ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1), + (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt), NSession = Session#session{awaiting_rel = AwaitingRel1}, case maps:size(AwaitingRel1) of 0 -> {ok, NSession}; @@ -617,10 +626,28 @@ replay(Inflight) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)). +%%-------------------------------------------------------------------- +%% Inc message/delivery expired counter +%%-------------------------------------------------------------------- + +-compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}). + +inc_expired_cnt(K) -> inc_expired_cnt(K, 1). + +inc_expired_cnt(delivery, N) -> + ok = emqx_metrics:inc('delivery.dropped', N), + emqx_metrics:inc('delivery.dropped.expired', N); + +inc_expired_cnt(message, N) -> + ok = emqx_metrics:inc('messages.dropped', N), + emqx_metrics:inc('messages.dropped.expired', N). + %%-------------------------------------------------------------------- %% Next Packet Id %%-------------------------------------------------------------------- +-compile({inline, [next_pkt_id/1]}). + next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> Session#session{next_pkt_id = 1}; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 16f87ff4a..c9be74c80 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -427,8 +427,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> run_gc(Stats, State = #state{gc_state = GcSt}) -> case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of - {IsGC, GcSt1} -> - IsGC andalso emqx_metrics:inc('channel.gc'), + {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}; false -> State end. @@ -521,6 +520,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> case Serialize(Packet) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", [emqx_packet:format(Packet)]), + ok = emqx_metrics:inc('delivery.dropped.too_large'), + ok = emqx_metrics:inc('delivery.dropped'), <<>>; Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index e2cb977df..aa8dfb743 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -53,13 +53,13 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - ok = meck:unload(emqx_access_control), - ok = meck:unload(emqx_metrics), - ok = meck:unload(emqx_session), - ok = meck:unload(emqx_broker), - ok = meck:unload(emqx_hooks), - ok = meck:unload(emqx_cm), - ok. + meck:unload([emqx_access_control, + emqx_metrics, + emqx_session, + emqx_broker, + emqx_hooks, + emqx_cm + ]). init_per_testcase(_TestCase, Config) -> Config. @@ -334,15 +334,15 @@ t_handle_out_publish(_) -> t_handle_out_publish_1(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} = - emqx_channel:handle_out(publish, [{1, Msg}], channel()). + {ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan} + = emqx_channel:handle_out(publish, [{1, Msg}], channel()). t_handle_out_publish_nl(_) -> ClientInfo = clientinfo(#{clientid => <<"clientid">>}), Channel = channel(#{clientinfo => ClientInfo}), Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>), Pubs = [{1, emqx_message:set_flag(nl, Msg)}], - {ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel). + {ok, {outgoing,[]}, Channel} = emqx_channel:handle_out(publish, Pubs, Channel). t_handle_out_connack_sucess(_) -> {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 24eedccd0..a2d9b17c4 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -68,6 +68,14 @@ t_timestamp(_) -> timer:sleep(1), ?assert(erlang:system_time(millisecond) > emqx_message:timestamp(Msg)). +t_is_sys(_) -> + Msg0 = emqx_message:make(<<"t">>, <<"payload">>), + ?assertNot(emqx_message:is_sys(Msg0)), + Msg1 = emqx_message:set_flag(sys, Msg0), + ?assert(emqx_message:is_sys(Msg1)), + Msg2 = emqx_message:make(<<"$SYS/events">>, <<"payload">>), + ?assert(emqx_message:is_sys(Msg2)). + t_clean_dup(_) -> Msg = emqx_message:make(<<"topic">>, <<"payload">>), ?assertNot(emqx_message:get_flag(dup, Msg)), diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index a421b6391..921491dff 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -28,24 +28,17 @@ all() -> emqx_ct:all(?MODULE). - -% t_init(_) -> -% error('TODO'). - -% t_is_empty(_) -> -% error('TODO'). - -% t_len(_) -> -% error('TODO'). - -% t_max_len(_) -> -% error('TODO'). - -% t_dropped(_) -> -% error('TODO'). - -% t_stats(_) -> -% error('TODO'). +t_info(_) -> + Q = ?Q:init(#{max_len => 5, store_qos0 => true}), + true = ?Q:info(store_qos0, Q), + 5 = ?Q:info(max_len, Q), + 0 = ?Q:info(len, Q), + 0 = ?Q:info(dropped, Q), + #{store_qos0 := true, + max_len := 5, + len := 0, + dropped := 0 + } = ?Q:info(Q). t_in(_) -> Opts = #{max_len => 5, store_qos0 => true}, @@ -163,3 +156,10 @@ t_length_priority_mqueue(_) -> {{value, _Val}, Q5} = ?Q:out(Q4), ?assertEqual(1, ?Q:len(Q5)). +t_dropped(_) -> + Q = ?Q:init(#{max_len => 1, store_qos0 => true}), + Msg = emqx_message:make(<<"t">>, <<"payload">>), + {undefined, Q1} = ?Q:in(Msg, Q), + {Msg, Q2} = ?Q:in(Msg, Q1), + ?assertEqual(1, ?Q:dropped(Q2)). + diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 9f9aff825..457afc71e 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -31,14 +31,15 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> %% Broker - ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), - ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]), + ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker], + [passthrough, no_history, no_link]), + ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), + ok = meck:expect(emqx_metrics, inc, fun(_K, _V) -> ok end), ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), Config. end_per_suite(_Config) -> - ok = meck:unload(emqx_broker), - ok = meck:unload(emqx_hooks). + meck:unload([emqx_broker, emqx_hooks, emqx_metrics]). init_per_testcase(_TestCase, Config) -> Config.