Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-12-21 02:22:59 +00:00
commit 2ffeaf4ef4
13 changed files with 437 additions and 296 deletions

View File

@ -36,7 +36,6 @@
authenticate(ClientInfo = #{zone := Zone}) -> authenticate(ClientInfo = #{zone := Zone}) ->
case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
Result = #{auth_result := success, anonymous := true} -> Result = #{auth_result := success, anonymous := true} ->
emqx_metrics:inc('auth.mqtt.anonymous'),
{ok, Result}; {ok, Result};
Result = #{auth_result := success} -> Result = #{auth_result := success} ->
{ok, Result}; {ok, Result};

View File

@ -160,9 +160,9 @@ do_subscribe(Group, Topic, SubPid, SubOpts) ->
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
emqx_shared_sub:subscribe(Group, Topic, SubPid). emqx_shared_sub:subscribe(Group, Topic, SubPid).
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% Unsubscribe API %% Unsubscribe API
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
-spec(unsubscribe(emqx_topic:topic()) -> ok). -spec(unsubscribe(emqx_topic:topic()) -> ok).
unsubscribe(Topic) when is_binary(Topic) -> unsubscribe(Topic) when is_binary(Topic) ->
@ -198,39 +198,42 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()). -spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
publish(Msg) when is_record(Msg, message) -> publish(Msg) when is_record(Msg, message) ->
_ = emqx_tracer:trace(publish, Msg), _ = emqx_tracer:trace(publish, Msg),
Msg1 = emqx_message:set_header(allow_publish, true, emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
emqx_message:clean_dup(Msg)), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
case emqx_hooks:run_fold('message.publish', [], Msg1) of
#message{headers = #{allow_publish := false}} -> #message{headers = #{allow_publish := false}} ->
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg1)]), ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
[]; [];
#message{topic = Topic} = Msg2 -> Msg1 = #message{topic = Topic} ->
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg2)) route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
end. end.
%% Called internally %% Called internally
-spec(safe_publish(emqx_types:message()) -> ok | emqx_types:publish_result()). -spec(safe_publish(emqx_types:message()) -> emqx_types:publish_result()).
safe_publish(Msg) when is_record(Msg, message) -> safe_publish(Msg) when is_record(Msg, message) ->
try try
publish(Msg) publish(Msg)
catch catch
_:Error:Stacktrace -> _:Error:Stk->
?LOG(error, "Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) ?LOG(error, "Publish error: ~p~n~s~n~p",
[Error, emqx_message:format(Msg), Stk])
after after
ok []
end. end.
delivery(Msg) -> -compile({inline, [delivery/1]}).
#delivery{sender = self(), message = Msg}. delivery(Msg) -> #delivery{sender = self(), message = Msg}.
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% Route %% Route
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
-spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()).
-spec(route([emqx_types:route_entry()], emqx_types:delivery())
-> emqx_types:publish_result()).
route([], #delivery{message = Msg}) -> route([], #delivery{message = Msg}) ->
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
inc_dropped_cnt(Msg#message.topic), ok = inc_dropped_cnt(Msg),
[]; [];
route(Routes, Delivery) -> route(Routes, Delivery) ->
lists:foldl(fun(Route, Acc) -> lists:foldl(fun(Route, Acc) ->
[do_route(Route, Delivery) | Acc] [do_route(Route, Delivery) | Acc]
@ -243,8 +246,7 @@ do_route({To, Node}, Delivery) when is_atom(Node) ->
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
aggre([]) -> aggre([]) -> [];
[];
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
[{To, Node}]; [{To, Node}];
aggre([#route{topic = To, dest = {Group, _Node}}]) -> aggre([#route{topic = To, dest = {Group, _Node}}]) ->
@ -258,35 +260,34 @@ aggre(Routes) ->
end, [], Routes). end, [], Routes).
%% @doc Forward message to another node. %% @doc Forward message to another node.
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync|async)
-> emqx_types:deliver_result()). -> emqx_types:deliver_result()).
forward(Node, To, Delivery, async) -> forward(Node, To, Delivery, async) ->
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
true -> ok; true -> emqx_metrics:inc('messages.forward');
{badrpc, Reason} -> {badrpc, Reason} ->
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), ?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]),
{error, badrpc} {error, badrpc}
end; end;
forward(Node, To, Delivery, sync) -> forward(Node, To, Delivery, sync) ->
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} -> {badrpc, Reason} ->
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), ?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]),
{error, badrpc}; {error, badrpc};
Result -> Result Result ->
emqx_metrics:inc('messages.forward'), Result
end. end.
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). -spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
dispatch(Topic, #delivery{message = Msg}) -> dispatch(Topic, #delivery{message = Msg}) ->
case subscribers(Topic) of case subscribers(Topic) of
[] -> [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), _ = inc_dropped_cnt(Topic),
inc_dropped_cnt(Topic),
{error, no_subscribers}; {error, no_subscribers};
[Sub] -> %% optimize? [Sub] -> %% optimize?
dispatch(Sub, Topic, Msg); dispatch(Sub, Topic, Msg);
Subs -> Subs -> lists:foldl(
lists:foldl(
fun(Sub, Res) -> fun(Sub, Res) ->
case dispatch(Sub, Topic, Msg) of case dispatch(Sub, Topic, Msg) of
ok -> Res; ok -> Res;
@ -302,6 +303,7 @@ dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
ok; ok;
false -> {error, subscriber_die} false -> {error, subscriber_die}
end; end;
dispatch({shard, I}, Topic, Msg) -> dispatch({shard, I}, Topic, Msg) ->
lists:foldl( lists:foldl(
fun(SubPid, Res) -> fun(SubPid, Res) ->
@ -311,20 +313,24 @@ dispatch({shard, I}, Topic, Msg) ->
end end
end, ok, subscribers({shard, Topic, I})). end, ok, subscribers({shard, Topic, I})).
inc_dropped_cnt(<<"$SYS/", _/binary>>) -> -compile({inline, [inc_dropped_cnt/1]}).
ok; inc_dropped_cnt(Msg) ->
inc_dropped_cnt(_Topic) -> case emqx_message:is_sys(Msg) of
emqx_metrics:inc('messages.dropped'). true -> ok;
false -> ok = emqx_metrics:inc('messages.dropped'),
emqx_metrics:inc('messages.dropped.no_subscribers')
end.
-compile({inline, [subscribers/1]}).
-spec(subscribers(emqx_topic:topic()) -> [pid()]). -spec(subscribers(emqx_topic:topic()) -> [pid()]).
subscribers(Topic) when is_binary(Topic) -> subscribers(Topic) when is_binary(Topic) ->
lookup_value(?SUBSCRIBER, Topic, []); lookup_value(?SUBSCRIBER, Topic, []);
subscribers(Shard = {shard, _Topic, _I}) -> subscribers(Shard = {shard, _Topic, _I}) ->
lookup_value(?SUBSCRIBER, Shard, []). lookup_value(?SUBSCRIBER, Shard, []).
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% Subscriber is down %% Subscriber is down
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
-spec(subscriber_down(pid()) -> true). -spec(subscriber_down(pid()) -> true).
subscriber_down(SubPid) -> subscriber_down(SubPid) ->
@ -345,9 +351,9 @@ subscriber_down(SubPid) ->
end, lookup_value(?SUBSCRIPTION, SubPid, [])), end, lookup_value(?SUBSCRIPTION, SubPid, [])),
ets:delete(?SUBSCRIPTION, SubPid). ets:delete(?SUBSCRIPTION, SubPid).
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% Management APIs %% Management APIs
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
-spec(subscriptions(pid() | emqx_types:subid()) -spec(subscriptions(pid() | emqx_types:subid())
-> [{emqx_topic:topic(), emqx_types:subopts()}]). -> [{emqx_topic:topic(), emqx_types:subopts()}]).
@ -410,9 +416,11 @@ safe_update_stats(Tab, Stat, MaxStat) ->
Size -> emqx_stats:setstat(Stat, MaxStat, Size) Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end. end.
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% call, cast, pick %% call, cast, pick
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
-compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) -> call(Broker, Req) ->
gen_server:call(Broker, Req). gen_server:call(Broker, Req).
@ -424,9 +432,9 @@ cast(Broker, Msg) ->
pick(Topic) -> pick(Topic) ->
gproc_pool:pick_worker(broker_pool, Topic). gproc_pool:pick_worker(broker_pool, Topic).
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
init([Pool, Id]) -> init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}), true = gproc_pool:connect_worker(Pool, {Pool, Id}),
@ -490,7 +498,7 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------

View File

@ -223,30 +223,30 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
handle_out(disconnect, ReasonCode, Channel) handle_out(disconnect, ReasonCode, Channel)
end; end;
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel
Channel = #channel{clientinfo = ClientInfo, session = Session}) -> = #channel{clientinfo = ClientInfo, session = Session}) ->
case emqx_session:puback(PacketId, Session) of case emqx_session:puback(PacketId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = after_message_acked(ClientInfo, Msg),
{ok, Channel#channel{session = NSession}}; {ok, Channel#channel{session = NSession}};
{ok, Msg, Publishes, NSession} -> {ok, Msg, Publishes, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = after_message_acked(ClientInfo, Msg),
handle_out(publish, Publishes, Channel#channel{session = NSession}); handle_out(publish, Publishes, Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.puback.inuse'), ok = emqx_metrics:inc('packets.puback.inuse'),
{ok, Channel}; {ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'), ok = emqx_metrics:inc('packets.puback.missed'),
{ok, Channel} {ok, Channel}
end; end;
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel
Channel = #channel{clientinfo = ClientInfo, session = Session}) -> = #channel{clientinfo = ClientInfo, session = Session}) ->
case emqx_session:pubrec(PacketId, Session) of case emqx_session:pubrec(PacketId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = after_message_acked(ClientInfo, Msg),
NChannel = Channel#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
@ -265,8 +265,8 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
NChannel = Channel#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrel.missed'), ok = emqx_metrics:inc('packets.pubrel.missed'),
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
handle_out(pubcomp, {PacketId, RC}, Channel) handle_out(pubcomp, {PacketId, RC}, Channel)
end; end;
@ -346,8 +346,7 @@ handle_in(Packet, Channel) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
Channel = #channel{conninfo = ConnInfo, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
clientinfo = ClientInfo}) ->
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
{ok, #{session := Session, present := false}} -> {ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session}, NChannel = Channel#channel{session = Session},
@ -378,15 +377,15 @@ process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) ->
fun check_pub_caps/2 fun check_pub_caps/2
], Packet, Channel) of ], Packet, Channel) of
{ok, NPacket, NChannel} -> {ok, NPacket, NChannel} ->
Msg = packet_to_msg(NPacket, NChannel), Msg = packet_to_message(NPacket, NChannel),
do_publish(PacketId, Msg, NChannel); do_publish(PacketId, Msg, NChannel);
{error, ReasonCode, NChannel} -> {error, ReasonCode, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s", ?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(ReasonCode)]), [Topic, emqx_reason_codes:text(ReasonCode)]),
handle_out(disconnect, ReasonCode, NChannel) handle_out(disconnect, ReasonCode, NChannel)
end. end.
packet_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
clientinfo = ClientInfo = clientinfo = ClientInfo =
#{mountpoint := MountPoint}}) -> #{mountpoint := MountPoint}}) ->
emqx_mountpoint:mount( emqx_mountpoint:mount(
@ -414,8 +413,9 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
ok = emqx_metrics:inc('packets.publish.inuse'), ok = emqx_metrics:inc('packets.publish.inuse'),
handle_out(pubrec, {PacketId, RC}, Channel); handle_out(pubrec, {PacketId, RC}, Channel);
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full.", [PacketId]), ?LOG(warning, "Dropped the qos2 packet ~w "
ok = emqx_metrics:inc('messages.qos2.dropped'), "due to awaiting_rel is full.", [PacketId]),
ok = emqx_metrics:inc('packets.publish.dropped'),
handle_out(pubrec, {PacketId, RC}, Channel) handle_out(pubrec, {PacketId, RC}, Channel)
end. end.
@ -423,6 +423,11 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
puback_reason_code([_|_]) -> ?RC_SUCCESS. puback_reason_code([_|_]) -> ?RC_SUCCESS.
-compile({inline, [after_message_acked/2]}).
after_message_acked(ClientInfo, Msg) ->
ok = emqx_metrics:inc('messages.acked'),
emqx_hooks:run('message.acked', [ClientInfo, Msg]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process Subscribe %% Process Subscribe
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -569,17 +574,8 @@ handle_out(connack, {ReasonCode, _ConnPkt},
handle_out(publish, [], Channel) -> handle_out(publish, [], Channel) ->
{ok, Channel}; {ok, Channel};
handle_out(publish, [{pubrel, PacketId}], Channel) ->
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, Channel);
handle_out(publish, [Pub = {_PacketId, Msg}], Channel) ->
case ignore_local(Msg, Channel) of
true -> {ok, Channel};
false -> {ok, pub_to_packet(Pub, Channel), Channel}
end;
handle_out(publish, Publishes, Channel) -> handle_out(publish, Publishes, Channel) ->
Packets = handle_publish(Publishes, Channel), Packets = do_deliver(Publishes, Channel),
{ok, {outgoing, Packets}, Channel}; {ok, {outgoing, Packets}, Channel};
handle_out(puback, {PacketId, ReasonCode}, Channel) -> handle_out(puback, {PacketId, ReasonCode}, Channel) ->
@ -641,45 +637,50 @@ return_connack(AckPacket, Channel = #channel{conninfo = ConnInfo,
resuming = false, resuming = false,
pendings = [] pendings = []
}, },
Packets = handle_publish(Publishes, NChannel), Packets = do_deliver(Publishes, NChannel),
Outgoing = [{outgoing, Packets} || length(Packets) > 0], Outgoing = [{outgoing, Packets} || length(Packets) > 0],
{ok, Replies ++ Outgoing, NChannel} {ok, Replies ++ Outgoing, NChannel}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle out Publish %% Deliver publish: broker -> client
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-compile({inline, [handle_publish/2]}). %% return list(emqx_types:packet())
handle_publish(Publishes, Channel) -> do_deliver({pubrel, PacketId}, _Channel) ->
handle_publish(Publishes, [], Channel). [?PUBREL_PACKET(PacketId, ?RC_SUCCESS)];
%% Handle out publish do_deliver({PacketId, Msg}, #channel{clientinfo = ClientInfo =
handle_publish([], Acc, _Channel) -> #{mountpoint := MountPoint}}) ->
lists:reverse(Acc); case ignore_local(Msg, ClientInfo) of
true ->
handle_publish([{pubrel, PacketId}|More], Acc, Channel) -> ok = emqx_metrics:inc('delivery.dropped'),
Packet = ?PUBREL_PACKET(PacketId, ?RC_SUCCESS), ok = emqx_metrics:inc('delivery.dropped.no_local'),
handle_publish(More, [Packet|Acc], Channel); [];
handle_publish([Pub = {_PacketId, Msg}|More], Acc, Channel) ->
case ignore_local(Msg, Channel) of
true -> handle_publish(More, Acc, Channel);
false -> false ->
Packet = pub_to_packet(Pub, Channel), ok = emqx_metrics:inc('messages.delivered'),
handle_publish(More, [Packet|Acc], Channel) Msg1 = emqx_hooks:run_fold('message.delivered',
end. [ClientInfo],
emqx_message:update_expiry(Msg)
),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
[emqx_message:to_packet(PacketId, Msg2)]
end;
pub_to_packet({PacketId, Msg}, #channel{clientinfo = ClientInfo}) -> do_deliver([Publish], Channel) ->
Msg1 = emqx_hooks:run_fold('message.delivered', [ClientInfo], do_deliver(Publish, Channel);
emqx_message:update_expiry(Msg)),
Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, ClientInfo), Msg1), do_deliver(Publishes, Channel) when is_list(Publishes) ->
emqx_message:to_packet(PacketId, Msg2). lists:reverse(
lists:foldl(
fun(Publish, Acc) ->
lists:append(do_deliver(Publish, Channel), Acc)
end, [], Publishes)).
ignore_local(#message{flags = #{nl := true}, from = ClientId}, ignore_local(#message{flags = #{nl := true}, from = ClientId},
#channel{clientinfo = #{clientid := ClientId}}) -> #{clientid := ClientId}) ->
true; true;
ignore_local(_Msg, _Channel) -> false. ignore_local(_Msg, _ClientInfo) -> false.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle out suback %% Handle out suback
@ -1010,6 +1011,8 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId,
#channel{clientinfo = ClientInfo} = Channel) -> #channel{clientinfo = ClientInfo} = Channel) ->
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
{ok, AuthResult} -> {ok, AuthResult} ->
is_anonymous(AuthResult) andalso
emqx_metrics:inc('client.auth.anonymous'),
NClientInfo = maps:merge(ClientInfo, AuthResult), NClientInfo = maps:merge(ClientInfo, AuthResult),
{ok, Channel#channel{clientinfo = NClientInfo}}; {ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} -> {error, Reason} ->
@ -1018,6 +1021,9 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId,
{error, emqx_reason_codes:connack_error(Reason)} {error, emqx_reason_codes:connack_error(Reason)}
end. end.
is_anonymous(#{anonymous := true}) -> true;
is_anonymous(_AuthResult) -> false.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process Topic Alias %% Process Topic Alias

View File

@ -560,6 +560,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
case Serialize(Packet) of case Serialize(Packet) of
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
[emqx_packet:format(Packet)]), [emqx_packet:format(Packet)]),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
<<>>; <<>>;
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
ok = inc_outgoing_stats(Packet), ok = inc_outgoing_stats(Packet),
@ -627,8 +629,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
run_gc(Stats, State = #state{gc_state = GcSt}) -> run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
false -> State; false -> State;
{IsGC, GcSt1} -> {_IsGC, GcSt1} ->
IsGC andalso emqx_metrics:inc('channel.gc'),
State#state{gc_state = GcSt1} State#state{gc_state = GcSt1}
end. end.

View File

@ -38,7 +38,8 @@
]). ]).
%% Flags %% Flags
-export([ clean_dup/1 -export([ is_sys/1
, clean_dup/1
, get_flag/2 , get_flag/2
, get_flag/3 , get_flag/3
, get_flags/1 , get_flags/1
@ -112,6 +113,13 @@ payload(#message{payload = Payload}) -> Payload.
-spec(timestamp(emqx_types:message()) -> integer()). -spec(timestamp(emqx_types:message()) -> integer()).
timestamp(#message{timestamp = TS}) -> TS. timestamp(#message{timestamp = TS}) -> TS.
-spec(is_sys(emqx_types:message()) -> boolean()).
is_sys(#message{flags = #{sys := true}}) ->
true;
is_sys(#message{topic = <<"$SYS/", _/binary>>}) ->
true;
is_sys(_Msg) -> false.
-spec(clean_dup(emqx_types:message()) -> emqx_types:message()). -spec(clean_dup(emqx_types:message()) -> emqx_types:message()).
clean_dup(Msg = #message{flags = Flags = #{dup := true}}) -> clean_dup(Msg = #message{flags = Flags = #{dup := true}}) ->
Msg#message{flags = Flags#{dup => false}}; Msg#message{flags = Flags#{dup => false}};

View File

@ -62,24 +62,27 @@
-export_type([metric_idx/0]). -export_type([metric_idx/0]).
-compile({inline, [inc/1, inc/2, dec/1, dec/2]}).
-compile({inline, [inc_recv/1, inc_sent/1]}).
-opaque(metric_idx() :: 1..1024). -opaque(metric_idx() :: 1..1024).
-type(metric_name() :: atom() | string() | binary()). -type(metric_name() :: atom() | string() | binary()).
-define(MAX_SIZE, 1024). -define(MAX_SIZE, 1024).
-define(RESERVED_IDX, 256). -define(RESERVED_IDX, 512).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
%% Bytes sent and received of broker %% Bytes sent and received
-define(BYTES_METRICS, [ -define(BYTES_METRICS,
{counter, 'bytes.received'}, % Total bytes received [{counter, 'bytes.received'}, % Total bytes received
{counter, 'bytes.sent'} % Total bytes sent {counter, 'bytes.sent'} % Total bytes sent
]). ]).
%% Packets sent and received of broker %% Packets sent and received
-define(PACKET_METRICS, [ -define(PACKET_METRICS,
{counter, 'packets.received'}, % All Packets received [{counter, 'packets.received'}, % All Packets received
{counter, 'packets.sent'}, % All Packets sent {counter, 'packets.sent'}, % All Packets sent
{counter, 'packets.connect.received'}, % CONNECT Packets received {counter, 'packets.connect.received'}, % CONNECT Packets received
{counter, 'packets.connack.sent'}, % CONNACK Packets sent {counter, 'packets.connack.sent'}, % CONNACK Packets sent
@ -89,8 +92,10 @@
{counter, 'packets.publish.sent'}, % PUBLISH packets sent {counter, 'packets.publish.sent'}, % PUBLISH packets sent
{counter, 'packets.publish.error'}, % PUBLISH failed for error {counter, 'packets.publish.error'}, % PUBLISH failed for error
{counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error {counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error
{counter, 'packets.publish.dropped'}, % PUBLISH(QoS2) packets dropped
{counter, 'packets.puback.received'}, % PUBACK packets received {counter, 'packets.puback.received'}, % PUBACK packets received
{counter, 'packets.puback.sent'}, % PUBACK packets sent {counter, 'packets.puback.sent'}, % PUBACK packets sent
{counter, 'packets.puback.inuse'}, % PUBACK packet_id inuse
{counter, 'packets.puback.missed'}, % PUBACK packets missed {counter, 'packets.puback.missed'}, % PUBACK packets missed
{counter, 'packets.pubrec.received'}, % PUBREC packets received {counter, 'packets.pubrec.received'}, % PUBREC packets received
{counter, 'packets.pubrec.sent'}, % PUBREC packets sent {counter, 'packets.pubrec.sent'}, % PUBREC packets sent
@ -118,32 +123,57 @@
{counter, 'packets.auth.sent'} % Auth Packets sent {counter, 'packets.auth.sent'} % Auth Packets sent
]). ]).
%% Messages sent and received of broker %% Messages sent/received and pubsub
-define(MESSAGE_METRICS, [ -define(MESSAGE_METRICS,
{counter, 'messages.received'}, % All Messages received [{counter, 'messages.received'}, % All Messages received
{counter, 'messages.sent'}, % All Messages sent {counter, 'messages.sent'}, % All Messages sent
{counter, 'messages.qos0.received'}, % QoS0 Messages received {counter, 'messages.qos0.received'}, % QoS0 Messages received
{counter, 'messages.qos0.sent'}, % QoS0 Messages sent {counter, 'messages.qos0.sent'}, % QoS0 Messages sent
{counter, 'messages.qos1.received'}, % QoS1 Messages received {counter, 'messages.qos1.received'}, % QoS1 Messages received
{counter, 'messages.qos1.sent'}, % QoS1 Messages sent {counter, 'messages.qos1.sent'}, % QoS1 Messages sent
{counter, 'messages.qos2.received'}, % QoS2 Messages received {counter, 'messages.qos2.received'}, % QoS2 Messages received
{counter, 'messages.qos2.expired'}, % QoS2 Messages expired
{counter, 'messages.qos2.sent'}, % QoS2 Messages sent {counter, 'messages.qos2.sent'}, % QoS2 Messages sent
{counter, 'messages.qos2.dropped'}, % QoS2 Messages dropped %% PubSub Metrics
{gauge, 'messages.retained'}, % Messagea retained {counter, 'messages.publish'}, % Messages Publish
{counter, 'messages.dropped'}, % Messages dropped {counter, 'messages.dropped'}, % Messages dropped due to no subscribers
{counter, 'messages.expired'}, % Messages expired {counter, 'messages.dropped.expired'}, % QoS2 Messages expired
{counter, 'messages.forward'} % Messages forward {counter, 'messages.dropped.no_subscribers'}, % Messages dropped
{counter, 'messages.forward'}, % Messages forward
{gauge, 'messages.retained'}, % Messages retained
{gauge, 'messages.delayed'}, % Messages delayed
{counter, 'messages.delivered'}, % Messages delivered
{counter, 'messages.acked'} % Messages acked
]). ]).
-define(CHAN_METRICS, [ %% Delivery metrics
{counter, 'channel.gc'} -define(DELIVERY_METRICS,
[{counter, 'delivery.dropped'},
{counter, 'delivery.dropped.no_local'},
{counter, 'delivery.dropped.too_large'},
{counter, 'delivery.dropped.qos0_msg'},
{counter, 'delivery.dropped.queue_full'},
{counter, 'delivery.dropped.expired'}
]). ]).
-define(MQTT_METRICS, [ %% Client Lifecircle metrics
{counter, 'auth.mqtt.anonymous'} -define(CLIENT_METRICS,
[{counter, 'client.connected'},
{cpunter, 'client.authenticate'},
{counter, 'client.auth.anonymous'},
{counter, 'client.check_acl'},
{counter, 'client.subscribe'},
{counter, 'client.unsubscribe'},
{counter, 'client.disconnected'}
]). ]).
%% Session Lifecircle metrics
-define(SESSION_METRICS,
[{counter, 'session.created'},
{counter, 'session.resumed'},
{counter, 'session.takeovered'},
{counter, 'session.discarded'},
{counter, 'session.terminated'}
]).
-record(state, {next_idx = 1}). -record(state, {next_idx = 1}).
@ -155,8 +185,7 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(stop() -> ok). -spec(stop() -> ok).
stop() -> stop() -> gen_server:stop(?SERVER).
gen_server:stop(?SERVER).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Metrics API %% Metrics API
@ -265,7 +294,7 @@ update_counter(Name, Value) ->
counters:add(CRef, CIdx, Value). counters:add(CRef, CIdx, Value).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Inc Received/Sent metrics %% Inc received/sent metrics
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Inc packets received. %% @doc Inc packets received.
@ -276,13 +305,13 @@ inc_recv(Packet) ->
do_inc_recv(?PACKET(?CONNECT)) -> do_inc_recv(?PACKET(?CONNECT)) ->
inc('packets.connect.received'); inc('packets.connect.received');
do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) -> do_inc_recv(?PUBLISH_PACKET(QoS)) ->
inc('messages.received'), inc('messages.received'),
case QoS of case QoS of
?QOS_0 -> inc('messages.qos0.received'); ?QOS_0 -> inc('messages.qos0.received');
?QOS_1 -> inc('messages.qos1.received'); ?QOS_1 -> inc('messages.qos1.received');
?QOS_2 -> inc('messages.qos2.received'); ?QOS_2 -> inc('messages.qos2.received');
_ -> ok _other -> ok
end, end,
inc('packets.publish.received'); inc('packets.publish.received');
do_inc_recv(?PACKET(?PUBACK)) -> do_inc_recv(?PACKET(?PUBACK)) ->
@ -319,12 +348,13 @@ do_inc_sent(?CONNACK_PACKET(ReasonCode)) ->
(ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'), (ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'),
inc('packets.connack.sent'); inc('packets.connack.sent');
do_inc_sent(?PUBLISH_PACKET(QoS, _PacketId)) -> do_inc_sent(?PUBLISH_PACKET(QoS)) ->
inc('messages.sent'), inc('messages.sent'),
case QoS of case QoS of
?QOS_0 -> inc('messages.qos0.sent'); ?QOS_0 -> inc('messages.qos0.sent');
?QOS_1 -> inc('messages.qos1.sent'); ?QOS_1 -> inc('messages.qos1.sent');
?QOS_2 -> inc('messages.qos2.sent') ?QOS_2 -> inc('messages.qos2.sent');
_other -> ok
end, end,
inc('packets.publish.sent'); inc('packets.publish.sent');
do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) -> do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) ->
@ -360,14 +390,21 @@ init([]) ->
CRef = counters:new(?MAX_SIZE, [write_concurrency]), CRef = counters:new(?MAX_SIZE, [write_concurrency]),
ok = persistent_term:put(?MODULE, CRef), ok = persistent_term:put(?MODULE, CRef),
% Create index mapping table % Create index mapping table
ok = emqx_tables:new(?TAB, [protected, {keypos, 2}, {read_concurrency, true}]), ok = emqx_tables:new(?TAB, [{keypos, 2}, {read_concurrency, true}]),
Metrics = lists:append([?BYTES_METRICS,
?PACKET_METRICS,
?MESSAGE_METRICS,
?DELIVERY_METRICS,
?CLIENT_METRICS,
?SESSION_METRICS
]),
% Store reserved indices % Store reserved indices
lists:foreach(fun({Type, Name}) -> ok = lists:foreach(fun({Type, Name}) ->
Idx = reserved_idx(Name), Idx = reserved_idx(Name),
Metric = #metric{name = Name, type = Type, idx = Idx}, Metric = #metric{name = Name, type = Type, idx = Idx},
true = ets:insert(?TAB, Metric), true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0) ok = counters:put(CRef, Idx, 0)
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS), end, Metrics),
{ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
@ -409,58 +446,85 @@ code_change(_OldVsn, State, _Extra) ->
reserved_idx('bytes.received') -> 01; reserved_idx('bytes.received') -> 01;
reserved_idx('bytes.sent') -> 02; reserved_idx('bytes.sent') -> 02;
reserved_idx('packets.received') -> 03; %% Reserved indices of packet's metrics
reserved_idx('packets.sent') -> 04; reserved_idx('packets.received') -> 10;
reserved_idx('packets.connect.received') -> 05; reserved_idx('packets.sent') -> 11;
reserved_idx('packets.connack.sent') -> 06; reserved_idx('packets.connect.received') -> 12;
reserved_idx('packets.connack.error') -> 07; reserved_idx('packets.connack.sent') -> 13;
reserved_idx('packets.connack.auth_error') -> 08; reserved_idx('packets.connack.error') -> 14;
reserved_idx('packets.publish.received') -> 09; reserved_idx('packets.connack.auth_error') -> 15;
reserved_idx('packets.publish.sent') -> 10; reserved_idx('packets.publish.received') -> 16;
reserved_idx('packets.publish.error') -> 11; reserved_idx('packets.publish.sent') -> 17;
reserved_idx('packets.publish.auth_error') -> 12; reserved_idx('packets.publish.error') -> 18;
reserved_idx('packets.puback.received') -> 13; reserved_idx('packets.publish.auth_error') -> 19;
reserved_idx('packets.puback.sent') -> 14; reserved_idx('packets.puback.received') -> 20;
reserved_idx('packets.puback.missed') -> 15; reserved_idx('packets.puback.sent') -> 21;
reserved_idx('packets.pubrec.received') -> 16; reserved_idx('packets.puback.inuse') -> 22;
reserved_idx('packets.pubrec.sent') -> 17; reserved_idx('packets.puback.missed') -> 23;
reserved_idx('packets.pubrec.missed') -> 18; reserved_idx('packets.pubrec.received') -> 24;
reserved_idx('packets.pubrel.received') -> 19; reserved_idx('packets.pubrec.sent') -> 25;
reserved_idx('packets.pubrel.sent') -> 20; reserved_idx('packets.pubrec.inuse') -> 26;
reserved_idx('packets.pubrel.missed') -> 21; reserved_idx('packets.pubrec.missed') -> 27;
reserved_idx('packets.pubcomp.received') -> 22; reserved_idx('packets.pubrel.received') -> 28;
reserved_idx('packets.pubcomp.sent') -> 23; reserved_idx('packets.pubrel.sent') -> 29;
reserved_idx('packets.pubcomp.missed') -> 24; reserved_idx('packets.pubrel.missed') -> 30;
reserved_idx('packets.subscribe.received') -> 25; reserved_idx('packets.pubcomp.received') -> 31;
reserved_idx('packets.subscribe.error') -> 26; reserved_idx('packets.pubcomp.sent') -> 32;
reserved_idx('packets.subscribe.auth_error') -> 27; reserved_idx('packets.pubcomp.inuse') -> 33;
reserved_idx('packets.suback.sent') -> 28; reserved_idx('packets.pubcomp.missed') -> 34;
reserved_idx('packets.unsubscribe.received') -> 29; reserved_idx('packets.subscribe.received') -> 35;
reserved_idx('packets.unsubscribe.error') -> 30; reserved_idx('packets.subscribe.error') -> 36;
reserved_idx('packets.unsuback.sent') -> 31; reserved_idx('packets.subscribe.auth_error') -> 37;
reserved_idx('packets.pingreq.received') -> 32; reserved_idx('packets.suback.sent') -> 38;
reserved_idx('packets.pingresp.sent') -> 33; reserved_idx('packets.unsubscribe.received') -> 39;
reserved_idx('packets.disconnect.received') -> 34; reserved_idx('packets.unsubscribe.error') -> 40;
reserved_idx('packets.disconnect.sent') -> 35; reserved_idx('packets.unsuback.sent') -> 41;
reserved_idx('packets.auth.received') -> 36; reserved_idx('packets.pingreq.received') -> 42;
reserved_idx('packets.auth.sent') -> 37; reserved_idx('packets.pingresp.sent') -> 43;
reserved_idx('messages.received') -> 38; reserved_idx('packets.disconnect.received') -> 44;
reserved_idx('messages.sent') -> 39; reserved_idx('packets.disconnect.sent') -> 45;
reserved_idx('messages.qos0.received') -> 40; reserved_idx('packets.auth.received') -> 46;
reserved_idx('messages.qos0.sent') -> 41; reserved_idx('packets.auth.sent') -> 47;
reserved_idx('messages.qos1.received') -> 42; reserved_idx('packets.publish.dropped') -> 48;
reserved_idx('messages.qos1.sent') -> 43; %% Reserved indices of message's metrics
reserved_idx('messages.qos2.received') -> 44; reserved_idx('messages.received') -> 100;
reserved_idx('messages.qos2.expired') -> 45; reserved_idx('messages.sent') -> 101;
reserved_idx('messages.qos2.sent') -> 46; reserved_idx('messages.qos0.received') -> 102;
reserved_idx('messages.qos2.dropped') -> 47; reserved_idx('messages.qos0.sent') -> 103;
reserved_idx('messages.retained') -> 48; reserved_idx('messages.qos1.received') -> 104;
reserved_idx('messages.dropped') -> 49; reserved_idx('messages.qos1.sent') -> 105;
reserved_idx('messages.expired') -> 50; reserved_idx('messages.qos2.received') -> 106;
reserved_idx('messages.forward') -> 51; reserved_idx('messages.qos2.sent') -> 107;
reserved_idx('auth.mqtt.anonymous') -> 52; reserved_idx('messages.publish') -> 108;
reserved_idx('channel.gc') -> 53; reserved_idx('messages.dropped') -> 109;
reserved_idx('packets.pubrec.inuse') -> 54; reserved_idx('messages.dropped.expired') -> 110;
reserved_idx('packets.pubcomp.inuse') -> 55; reserved_idx('messages.dropped.no_subscribers') -> 111;
reserved_idx('messages.forward') -> 112;
reserved_idx('messages.retained') -> 113;
reserved_idx('messages.delayed') -> 114;
reserved_idx('messages.delivered') -> 115;
reserved_idx('messages.acked') -> 116;
reserved_idx('delivery.expired') -> 117;
reserved_idx('delivery.dropped') -> 118;
reserved_idx('delivery.dropped.no_local') -> 119;
reserved_idx('delivery.dropped.too_large') -> 120;
reserved_idx('delivery.dropped.qos0_msg') -> 121;
reserved_idx('delivery.dropped.queue_full') -> 122;
reserved_idx('delivery.dropped.expired') -> 123;
reserved_idx('client.connected') -> 200;
reserved_idx('client.authenticate') -> 201;
reserved_idx('client.auth.anonymous') -> 202;
reserved_idx('client.check_acl') -> 203;
reserved_idx('client.subscribe') -> 204;
reserved_idx('client.unsubscribe') -> 205;
reserved_idx('client.disconnected') -> 206;
reserved_idx('session.created') -> 220;
reserved_idx('session.resumed') -> 221;
reserved_idx('session.takeovered') -> 222;
reserved_idx('session.discarded') -> 223;
reserved_idx('session.terminated') -> 224;
reserved_idx(_) -> undefined. reserved_idx(_) -> undefined.

View File

@ -53,7 +53,10 @@
-include("types.hrl"). -include("types.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([init/1]). -export([ init/1
, info/1
, info/2
]).
-export([ is_empty/1 -export([ is_empty/1
, len/1 , len/1
@ -86,6 +89,7 @@
-define(LOWEST_PRIORITY, 0). -define(LOWEST_PRIORITY, 0).
-define(HIGHEST_PRIORITY, infinity). -define(HIGHEST_PRIORITY, infinity).
-define(MAX_LEN_INFINITY, 0). -define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-record(mqueue, { -record(mqueue, {
store_qos0 = false :: boolean(), store_qos0 = false :: boolean(),
@ -111,6 +115,20 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
default_p = get_priority_opt(Opts) default_p = get_priority_opt(Opts)
}. }.
-spec(info(mqueue()) -> emqx_types:infos()).
info(MQ) ->
maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
-spec(info(atom(), mqueue()) -> term()).
info(store_qos0, #mqueue{store_qos0 = True}) ->
True;
info(max_len, #mqueue{max_len = MaxLen}) ->
MaxLen;
info(len, #mqueue{len = Len}) ->
Len;
info(dropped, #mqueue{dropped = Dropped}) ->
Dropped.
is_empty(#mqueue{len = Len}) -> Len =:= 0. is_empty(#mqueue{len = Len}) -> Len =:= 0.
len(#mqueue{len = Len}) -> Len. len(#mqueue{len = Len}) -> Len.

View File

@ -398,18 +398,17 @@ dequeue(0, Msgs, Q) ->
dequeue(Cnt, Msgs, Q) -> dequeue(Cnt, Msgs, Q) ->
case emqx_mqueue:out(Q) of case emqx_mqueue:out(Q) of
{empty, _Q} -> dequeue(0, Msgs, Q); {empty, _Q} -> dequeue(0, Msgs, Q);
{{value, Msg = #message{qos = ?QOS_0}}, Q1} ->
dequeue(Cnt, acc_msg(Msg, Msgs), Q1);
{{value, Msg}, Q1} -> {{value, Msg}, Q1} ->
dequeue(Cnt-1, acc_msg(Msg, Msgs), Q1) case emqx_message:is_expired(Msg) of
true -> ok = inc_expired_cnt(delivery),
dequeue(Cnt, Msgs, Q1);
false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
end
end. end.
-compile({inline, [acc_msg/2]}). -compile({inline, [acc_cnt/2]}).
acc_msg(Msg, Msgs) -> acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
case emqx_message:is_expired(Msg) of acc_cnt(_Msg, Cnt) -> Cnt - 1.
true -> Msgs;
false -> [Msg|Msgs]
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Broker -> Client: Deliver %% Broker -> Client: Deliver
@ -467,13 +466,20 @@ enqueue(Delivers, Session) when is_list(Delivers) ->
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q), {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
if is_record(Dropped, message) -> (Dropped =/= undefined) andalso log_dropped(Dropped, Session),
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
[emqx_message:format(Dropped)]);
true -> ok
end,
Session#session{mqueue = NewQ}. Session#session{mqueue = NewQ}.
log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
true ->
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
false ->
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
[emqx_message:format(Msg)])
end.
enrich_fun(Session = #session{subscriptions = Subs}) -> enrich_fun(Session = #session{subscriptions = Subs}) ->
fun({deliver, Topic, Msg}) -> fun({deliver, Topic, Msg}) ->
enrich_subopts(get_subopts(Topic, Subs), Msg, Session) enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
@ -555,6 +561,7 @@ retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
case emqx_message:is_expired(Msg) of case emqx_message:is_expired(Msg) of
true -> true ->
ok = inc_expired_cnt(delivery),
{Acc, emqx_inflight:delete(PacketId, Inflight)}; {Acc, emqx_inflight:delete(PacketId, Inflight)};
false -> false ->
Msg1 = emqx_message:set_flag(dup, true, Msg), Msg1 = emqx_message:set_flag(dup, true, Msg),
@ -581,6 +588,8 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) -> await_rel_timeout = Timeout}) ->
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
(ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt),
NSession = Session#session{awaiting_rel = AwaitingRel1}, NSession = Session#session{awaiting_rel = AwaitingRel1},
case maps:size(AwaitingRel1) of case maps:size(AwaitingRel1) of
0 -> {ok, NSession}; 0 -> {ok, NSession};
@ -617,10 +626,28 @@ replay(Inflight) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)} {PacketId, emqx_message:set_flag(dup, true, Msg)}
end, emqx_inflight:to_list(Inflight)). end, emqx_inflight:to_list(Inflight)).
%%--------------------------------------------------------------------
%% Inc message/delivery expired counter
%%--------------------------------------------------------------------
-compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}).
inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
inc_expired_cnt(delivery, N) ->
ok = emqx_metrics:inc('delivery.dropped', N),
emqx_metrics:inc('delivery.dropped.expired', N);
inc_expired_cnt(message, N) ->
ok = emqx_metrics:inc('messages.dropped', N),
emqx_metrics:inc('messages.dropped.expired', N).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Next Packet Id %% Next Packet Id
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-compile({inline, [next_pkt_id/1]}).
next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) ->
Session#session{next_pkt_id = 1}; Session#session{next_pkt_id = 1};

View File

@ -427,8 +427,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
run_gc(Stats, State = #state{gc_state = GcSt}) -> run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
{IsGC, GcSt1} -> {_IsGC, GcSt1} ->
IsGC andalso emqx_metrics:inc('channel.gc'),
State#state{gc_state = GcSt1}; State#state{gc_state = GcSt1};
false -> State false -> State
end. end.
@ -521,6 +520,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
case Serialize(Packet) of case Serialize(Packet) of
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
[emqx_packet:format(Packet)]), [emqx_packet:format(Packet)]),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
<<>>; <<>>;
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
ok = inc_outgoing_stats(Packet), ok = inc_outgoing_stats(Packet),

View File

@ -53,13 +53,13 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = meck:unload(emqx_access_control), meck:unload([emqx_access_control,
ok = meck:unload(emqx_metrics), emqx_metrics,
ok = meck:unload(emqx_session), emqx_session,
ok = meck:unload(emqx_broker), emqx_broker,
ok = meck:unload(emqx_hooks), emqx_hooks,
ok = meck:unload(emqx_cm), emqx_cm
ok. ]).
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.
@ -334,15 +334,15 @@ t_handle_out_publish(_) ->
t_handle_out_publish_1(_) -> t_handle_out_publish_1(_) ->
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>), Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} = {ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan}
emqx_channel:handle_out(publish, [{1, Msg}], channel()). = emqx_channel:handle_out(publish, [{1, Msg}], channel()).
t_handle_out_publish_nl(_) -> t_handle_out_publish_nl(_) ->
ClientInfo = clientinfo(#{clientid => <<"clientid">>}), ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
Channel = channel(#{clientinfo => ClientInfo}), Channel = channel(#{clientinfo => ClientInfo}),
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>), Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
Pubs = [{1, emqx_message:set_flag(nl, Msg)}], Pubs = [{1, emqx_message:set_flag(nl, Msg)}],
{ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel). {ok, {outgoing,[]}, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
t_handle_out_connack_sucess(_) -> t_handle_out_connack_sucess(_) ->
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =

View File

@ -68,6 +68,14 @@ t_timestamp(_) ->
timer:sleep(1), timer:sleep(1),
?assert(erlang:system_time(millisecond) > emqx_message:timestamp(Msg)). ?assert(erlang:system_time(millisecond) > emqx_message:timestamp(Msg)).
t_is_sys(_) ->
Msg0 = emqx_message:make(<<"t">>, <<"payload">>),
?assertNot(emqx_message:is_sys(Msg0)),
Msg1 = emqx_message:set_flag(sys, Msg0),
?assert(emqx_message:is_sys(Msg1)),
Msg2 = emqx_message:make(<<"$SYS/events">>, <<"payload">>),
?assert(emqx_message:is_sys(Msg2)).
t_clean_dup(_) -> t_clean_dup(_) ->
Msg = emqx_message:make(<<"topic">>, <<"payload">>), Msg = emqx_message:make(<<"topic">>, <<"payload">>),
?assertNot(emqx_message:get_flag(dup, Msg)), ?assertNot(emqx_message:get_flag(dup, Msg)),

View File

@ -28,24 +28,17 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
t_info(_) ->
% t_init(_) -> Q = ?Q:init(#{max_len => 5, store_qos0 => true}),
% error('TODO'). true = ?Q:info(store_qos0, Q),
5 = ?Q:info(max_len, Q),
% t_is_empty(_) -> 0 = ?Q:info(len, Q),
% error('TODO'). 0 = ?Q:info(dropped, Q),
#{store_qos0 := true,
% t_len(_) -> max_len := 5,
% error('TODO'). len := 0,
dropped := 0
% t_max_len(_) -> } = ?Q:info(Q).
% error('TODO').
% t_dropped(_) ->
% error('TODO').
% t_stats(_) ->
% error('TODO').
t_in(_) -> t_in(_) ->
Opts = #{max_len => 5, store_qos0 => true}, Opts = #{max_len => 5, store_qos0 => true},
@ -163,3 +156,10 @@ t_length_priority_mqueue(_) ->
{{value, _Val}, Q5} = ?Q:out(Q4), {{value, _Val}, Q5} = ?Q:out(Q4),
?assertEqual(1, ?Q:len(Q5)). ?assertEqual(1, ?Q:len(Q5)).
t_dropped(_) ->
Q = ?Q:init(#{max_len => 1, store_qos0 => true}),
Msg = emqx_message:make(<<"t">>, <<"payload">>),
{undefined, Q1} = ?Q:in(Msg, Q),
{Msg, Q2} = ?Q:in(Msg, Q1),
?assertEqual(1, ?Q:dropped(Q2)).

View File

@ -31,14 +31,15 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
%% Broker %% Broker
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker],
ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]), [passthrough, no_history, no_link]),
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
ok = meck:expect(emqx_metrics, inc, fun(_K, _V) -> ok end),
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = meck:unload(emqx_broker), meck:unload([emqx_broker, emqx_hooks, emqx_metrics]).
ok = meck:unload(emqx_hooks).
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.