diff --git a/etc/emqx.conf b/etc/emqx.conf index 0ec84f7ae..98e44f4a8 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -261,8 +261,6 @@ node.fullsweep_after = 1000 ## Value: Log file node.crash_dump = {{ platform_log_dir }}/crash.dump - - ## Specify SSL Options in the file if using SSL for Erlang Distribution. ## ## Value: File diff --git a/include/logger.hrl b/include/logger.hrl index 539371301..ae062b313 100644 --- a/include/logger.hrl +++ b/include/logger.hrl @@ -43,6 +43,8 @@ -define(LOG(Level, Format, Args), begin - (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}})) + (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, + mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}, + line => ?LINE})) end). diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 038ebca19..df21afa0e 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -35,9 +35,6 @@ -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). authenticate(ClientInfo = #{zone := Zone}) -> case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of - Result = #{auth_result := success, anonymous := true} -> - emqx_metrics:inc('auth.mqtt.anonymous'), - {ok, Result}; Result = #{auth_result := success} -> {ok, Result}; Result -> diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 7950f2bca..850b466d3 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -160,9 +160,9 @@ do_subscribe(Group, Topic, SubPid, SubOpts) -> true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), emqx_shared_sub:subscribe(Group, Topic, SubPid). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Unsubscribe API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(unsubscribe(emqx_topic:topic()) -> ok). unsubscribe(Topic) when is_binary(Topic) -> @@ -198,43 +198,46 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> -spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), - Msg1 = emqx_message:set_header(allow_publish, true, - emqx_message:clean_dup(Msg)), - case emqx_hooks:run_fold('message.publish', [], Msg1) of + emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), + case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> - ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg1)]), + ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]), []; - #message{topic = Topic} = Msg2 -> - route(aggre(emqx_router:match_routes(Topic)), delivery(Msg2)) + Msg1 = #message{topic = Topic} -> + route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. %% Called internally --spec(safe_publish(emqx_types:message()) -> ok | emqx_types:publish_result()). +-spec(safe_publish(emqx_types:message()) -> emqx_types:publish_result()). safe_publish(Msg) when is_record(Msg, message) -> try publish(Msg) catch - _:Error:Stacktrace -> - ?LOG(error, "Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) + _:Error:Stk-> + ?LOG(error, "Publish error: ~p~n~s~n~p", + [Error, emqx_message:format(Msg), Stk]) after - ok + [] end. -delivery(Msg) -> - #delivery{sender = self(), message = Msg}. +-compile({inline, [delivery/1]}). +delivery(Msg) -> #delivery{sender = self(), message = Msg}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Route -%%------------------------------------------------------------------------------ --spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()). +%%-------------------------------------------------------------------- + +-spec(route([emqx_types:route_entry()], emqx_types:delivery()) + -> emqx_types:publish_result()). route([], #delivery{message = Msg}) -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - inc_dropped_cnt(Msg#message.topic), + ok = inc_dropped_cnt(Msg), []; + route(Routes, Delivery) -> lists:foldl(fun(Route, Acc) -> - [do_route(Route, Delivery) | Acc] - end, [], Routes). + [do_route(Route, Delivery) | Acc] + end, [], Routes). do_route({To, Node}, Delivery) when Node =:= node() -> {Node, To, dispatch(To, Delivery)}; @@ -243,8 +246,7 @@ do_route({To, Node}, Delivery) when is_atom(Node) -> do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. -aggre([]) -> - []; +aggre([]) -> []; aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> [{To, Node}]; aggre([#route{topic = To, dest = {Group, _Node}}]) -> @@ -258,41 +260,40 @@ aggre(Routes) -> end, [], Routes). %% @doc Forward message to another node. --spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) +-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync|async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of - true -> ok; + true -> emqx_metrics:inc('messages.forward'); {badrpc, Reason} -> - ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), + ?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]), {error, badrpc} end; forward(Node, To, Delivery, sync) -> case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), + ?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]), {error, badrpc}; - Result -> Result + Result -> + emqx_metrics:inc('messages.forward'), Result end. -spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). dispatch(Topic, #delivery{message = Msg}) -> case subscribers(Topic) of - [] -> - emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - inc_dropped_cnt(Topic), - {error, no_subscribers}; + [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), + _ = inc_dropped_cnt(Topic), + {error, no_subscribers}; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg); - Subs -> - lists:foldl( - fun(Sub, Res) -> - case dispatch(Sub, Topic, Msg) of - ok -> Res; - Err -> Err - end - end, ok, Subs) + Subs -> lists:foldl( + fun(Sub, Res) -> + case dispatch(Sub, Topic, Msg) of + ok -> Res; + Err -> Err + end + end, ok, Subs) end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> @@ -302,6 +303,7 @@ dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> ok; false -> {error, subscriber_die} end; + dispatch({shard, I}, Topic, Msg) -> lists:foldl( fun(SubPid, Res) -> @@ -311,20 +313,24 @@ dispatch({shard, I}, Topic, Msg) -> end end, ok, subscribers({shard, Topic, I})). -inc_dropped_cnt(<<"$SYS/", _/binary>>) -> - ok; -inc_dropped_cnt(_Topic) -> - emqx_metrics:inc('messages.dropped'). +-compile({inline, [inc_dropped_cnt/1]}). +inc_dropped_cnt(Msg) -> + case emqx_message:is_sys(Msg) of + true -> ok; + false -> ok = emqx_metrics:inc('messages.dropped'), + emqx_metrics:inc('messages.dropped.no_subscribers') + end. +-compile({inline, [subscribers/1]}). -spec(subscribers(emqx_topic:topic()) -> [pid()]). subscribers(Topic) when is_binary(Topic) -> lookup_value(?SUBSCRIBER, Topic, []); subscribers(Shard = {shard, _Topic, _I}) -> lookup_value(?SUBSCRIBER, Shard, []). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Subscriber is down -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(subscriber_down(pid()) -> true). subscriber_down(SubPid) -> @@ -345,9 +351,9 @@ subscriber_down(SubPid) -> end, lookup_value(?SUBSCRIPTION, SubPid, [])), ets:delete(?SUBSCRIPTION, SubPid). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Management APIs -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(subscriptions(pid() | emqx_types:subid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). @@ -410,9 +416,11 @@ safe_update_stats(Tab, Stat, MaxStat) -> Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% call, cast, pick -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- + +-compile({inline, [call/2, cast/2, pick/1]}). call(Broker, Req) -> gen_server:call(Broker, Req). @@ -424,9 +432,9 @@ cast(Broker, Msg) -> pick(Topic) -> gproc_pool:pick_worker(broker_pool, Topic). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), @@ -490,7 +498,7 @@ terminate(_Reason, #{pool := Pool, id := Id}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 59ff7c1ef..7d2bf8f23 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -155,7 +155,8 @@ caps(#channel{clientinfo = #{zone := Zone}}) -> %%-------------------------------------------------------------------- -spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). -init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> +init(ConnInfo = #{peername := {PeerHost, _Port}, + sockname := {_Host, SockPort}}, Options) -> Zone = proplists:get_value(zone, Options), Peercert = maps:get(peercert, ConnInfo, undefined), Username = case peer_cert_as_username(Options) of @@ -169,6 +170,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> ClientInfo = #{zone => Zone, protocol => Protocol, peerhost => PeerHost, + sockport => SockPort, peercert => Peercert, clientid => undefined, username => Username, @@ -221,30 +223,30 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_out(disconnect, ReasonCode, Channel) end; -handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), - Channel = #channel{clientinfo = ClientInfo, session = Session}) -> +handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel + = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:puback(PacketId, Session) of {ok, Msg, NSession} -> - ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), + ok = after_message_acked(ClientInfo, Msg), {ok, Channel#channel{session = NSession}}; {ok, Msg, Publishes, NSession} -> - ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), + ok = after_message_acked(ClientInfo, Msg), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.puback.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), {ok, Channel} end; -handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), - Channel = #channel{clientinfo = ClientInfo, session = Session}) -> +handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel + = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> - ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), + ok = after_message_acked(ClientInfo, Msg), NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -263,8 +265,8 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> + ?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.pubrel.missed'), - ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), handle_out(pubcomp, {PacketId, RC}, Channel) end; @@ -344,8 +346,7 @@ handle_in(Packet, Channel) -> %%-------------------------------------------------------------------- process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, - Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> + Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, @@ -376,17 +377,17 @@ process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) -> fun check_pub_caps/2 ], Packet, Channel) of {ok, NPacket, NChannel} -> - Msg = packet_to_msg(NPacket, NChannel), + Msg = packet_to_message(NPacket, NChannel), do_publish(PacketId, Msg, NChannel); {error, ReasonCode, NChannel} -> - ?LOG(warning, "Cannot publish message to ~s due to ~s", + ?LOG(warning, "Cannot publish message to ~s due to ~s.", [Topic, emqx_reason_codes:text(ReasonCode)]), handle_out(disconnect, ReasonCode, NChannel) end. -packet_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, - clientinfo = ClientInfo = - #{mountpoint := MountPoint}}) -> +packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, + clientinfo = ClientInfo = + #{mountpoint := MountPoint}}) -> emqx_mountpoint:mount( MountPoint, emqx_packet:to_message( ClientInfo, #{proto_ver => ProtoVer}, Packet)). @@ -412,8 +413,9 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full.", [PacketId]), - ok = emqx_metrics:inc('messages.qos2.dropped'), + ?LOG(warning, "Dropped the qos2 packet ~w " + "due to awaiting_rel is full.", [PacketId]), + ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(pubrec, {PacketId, RC}, Channel) end. @@ -421,6 +423,11 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback_reason_code([_|_]) -> ?RC_SUCCESS. +-compile({inline, [after_message_acked/2]}). +after_message_acked(ClientInfo, Msg) -> + ok = emqx_metrics:inc('messages.acked'), + emqx_hooks:run('message.acked', [ClientInfo, Msg]). + %%-------------------------------------------------------------------- %% Process Subscribe %%-------------------------------------------------------------------- @@ -567,17 +574,8 @@ handle_out(connack, {ReasonCode, _ConnPkt}, handle_out(publish, [], Channel) -> {ok, Channel}; -handle_out(publish, [{pubrel, PacketId}], Channel) -> - handle_out(pubrel, {PacketId, ?RC_SUCCESS}, Channel); - -handle_out(publish, [Pub = {_PacketId, Msg}], Channel) -> - case ignore_local(Msg, Channel) of - true -> {ok, Channel}; - false -> {ok, pub_to_packet(Pub, Channel), Channel} - end; - handle_out(publish, Publishes, Channel) -> - Packets = handle_publish(Publishes, Channel), + Packets = do_deliver(Publishes, Channel), {ok, {outgoing, Packets}, Channel}; handle_out(puback, {PacketId, ReasonCode}, Channel) -> @@ -639,45 +637,50 @@ return_connack(AckPacket, Channel = #channel{conninfo = ConnInfo, resuming = false, pendings = [] }, - Packets = handle_publish(Publishes, NChannel), + Packets = do_deliver(Publishes, NChannel), Outgoing = [{outgoing, Packets} || length(Packets) > 0], {ok, Replies ++ Outgoing, NChannel} end. %%-------------------------------------------------------------------- -%% Handle out Publish +%% Deliver publish: broker -> client %%-------------------------------------------------------------------- --compile({inline, [handle_publish/2]}). -handle_publish(Publishes, Channel) -> - handle_publish(Publishes, [], Channel). +%% return list(emqx_types:packet()) +do_deliver({pubrel, PacketId}, _Channel) -> + [?PUBREL_PACKET(PacketId, ?RC_SUCCESS)]; -%% Handle out publish -handle_publish([], Acc, _Channel) -> - lists:reverse(Acc); - -handle_publish([{pubrel, PacketId}|More], Acc, Channel) -> - Packet = ?PUBREL_PACKET(PacketId, ?RC_SUCCESS), - handle_publish(More, [Packet|Acc], Channel); - -handle_publish([Pub = {_PacketId, Msg}|More], Acc, Channel) -> - case ignore_local(Msg, Channel) of - true -> handle_publish(More, Acc, Channel); +do_deliver({PacketId, Msg}, #channel{clientinfo = ClientInfo = + #{mountpoint := MountPoint}}) -> + case ignore_local(Msg, ClientInfo) of + true -> + ok = emqx_metrics:inc('delivery.dropped'), + ok = emqx_metrics:inc('delivery.dropped.no_local'), + []; false -> - Packet = pub_to_packet(Pub, Channel), - handle_publish(More, [Packet|Acc], Channel) - end. + ok = emqx_metrics:inc('messages.delivered'), + Msg1 = emqx_hooks:run_fold('message.delivered', + [ClientInfo], + emqx_message:update_expiry(Msg) + ), + Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), + [emqx_message:to_packet(PacketId, Msg2)] + end; -pub_to_packet({PacketId, Msg}, #channel{clientinfo = ClientInfo}) -> - Msg1 = emqx_hooks:run_fold('message.delivered', [ClientInfo], - emqx_message:update_expiry(Msg)), - Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, ClientInfo), Msg1), - emqx_message:to_packet(PacketId, Msg2). +do_deliver([Publish], Channel) -> + do_deliver(Publish, Channel); + +do_deliver(Publishes, Channel) when is_list(Publishes) -> + lists:reverse( + lists:foldl( + fun(Publish, Acc) -> + lists:append(do_deliver(Publish, Channel), Acc) + end, [], Publishes)). ignore_local(#message{flags = #{nl := true}, from = ClientId}, - #channel{clientinfo = #{clientid := ClientId}}) -> + #{clientid := ClientId}) -> true; -ignore_local(_Msg, _Channel) -> false. +ignore_local(_Msg, _ClientInfo) -> false. %%-------------------------------------------------------------------- %% Handle out suback @@ -1008,6 +1011,8 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId, #channel{clientinfo = ClientInfo} = Channel) -> case emqx_access_control:authenticate(ClientInfo#{password => Password}) of {ok, AuthResult} -> + is_anonymous(AuthResult) andalso + emqx_metrics:inc('client.auth.anonymous'), NClientInfo = maps:merge(ClientInfo, AuthResult), {ok, Channel#channel{clientinfo = NClientInfo}}; {error, Reason} -> @@ -1016,6 +1021,9 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId, {error, emqx_reason_codes:connack_error(Reason)} end. +is_anonymous(#{anonymous := true}) -> true; +is_anonymous(_AuthResult) -> false. + %%-------------------------------------------------------------------- %% Process Topic Alias diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 605e78bf7..774c0ba42 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -136,6 +136,8 @@ info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> ActiveN; +info(stats_timer, #state{stats_timer = Stats_timer}) -> + Stats_timer; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). @@ -558,6 +560,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> case Serialize(Packet) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), + ok = emqx_metrics:inc('delivery.dropped.too_large'), + ok = emqx_metrics:inc('delivery.dropped'), <<>>; Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), @@ -625,8 +629,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> run_gc(Stats, State = #state{gc_state = GcSt}) -> case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of false -> State; - {IsGC, GcSt1} -> - IsGC andalso emqx_metrics:inc('channel.gc'), + {_IsGC, GcSt1} -> State#state{gc_state = GcSt1} end. diff --git a/src/emqx_message.erl b/src/emqx_message.erl index d117036a4..3f0d874dc 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -38,7 +38,8 @@ ]). %% Flags --export([ clean_dup/1 +-export([ is_sys/1 + , clean_dup/1 , get_flag/2 , get_flag/3 , get_flags/1 @@ -112,6 +113,13 @@ payload(#message{payload = Payload}) -> Payload. -spec(timestamp(emqx_types:message()) -> integer()). timestamp(#message{timestamp = TS}) -> TS. +-spec(is_sys(emqx_types:message()) -> boolean()). +is_sys(#message{flags = #{sys := true}}) -> + true; +is_sys(#message{topic = <<"$SYS/", _/binary>>}) -> + true; +is_sys(_Msg) -> false. + -spec(clean_dup(emqx_types:message()) -> emqx_types:message()). clean_dup(Msg = #message{flags = Flags = #{dup := true}}) -> Msg#message{flags = Flags#{dup => false}}; diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index d92402caa..b6c06bcef 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -62,88 +62,118 @@ -export_type([metric_idx/0]). +-compile({inline, [inc/1, inc/2, dec/1, dec/2]}). +-compile({inline, [inc_recv/1, inc_sent/1]}). + -opaque(metric_idx() :: 1..1024). -type(metric_name() :: atom() | string() | binary()). -define(MAX_SIZE, 1024). --define(RESERVED_IDX, 256). +-define(RESERVED_IDX, 512). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). -%% Bytes sent and received of broker --define(BYTES_METRICS, [ - {counter, 'bytes.received'}, % Total bytes received - {counter, 'bytes.sent'} % Total bytes sent -]). +%% Bytes sent and received +-define(BYTES_METRICS, + [{counter, 'bytes.received'}, % Total bytes received + {counter, 'bytes.sent'} % Total bytes sent + ]). -%% Packets sent and received of broker --define(PACKET_METRICS, [ - {counter, 'packets.received'}, % All Packets received - {counter, 'packets.sent'}, % All Packets sent - {counter, 'packets.connect.received'}, % CONNECT Packets received - {counter, 'packets.connack.sent'}, % CONNACK Packets sent - {counter, 'packets.connack.error'}, % CONNACK error sent - {counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent - {counter, 'packets.publish.received'}, % PUBLISH packets received - {counter, 'packets.publish.sent'}, % PUBLISH packets sent - {counter, 'packets.publish.error'}, % PUBLISH failed for error - {counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error - {counter, 'packets.puback.received'}, % PUBACK packets received - {counter, 'packets.puback.sent'}, % PUBACK packets sent - {counter, 'packets.puback.missed'}, % PUBACK packets missed - {counter, 'packets.pubrec.received'}, % PUBREC packets received - {counter, 'packets.pubrec.sent'}, % PUBREC packets sent - {counter, 'packets.pubrec.inuse'}, % PUBREC packet_id inuse - {counter, 'packets.pubrec.missed'}, % PUBREC packets missed - {counter, 'packets.pubrel.received'}, % PUBREL packets received - {counter, 'packets.pubrel.sent'}, % PUBREL packets sent - {counter, 'packets.pubrel.missed'}, % PUBREL packets missed - {counter, 'packets.pubcomp.received'}, % PUBCOMP packets received - {counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent - {counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse - {counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed - {counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received - {counter, 'packets.subscribe.error'}, % SUBSCRIBE error - {counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth - {counter, 'packets.suback.sent'}, % SUBACK packets sent - {counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received - {counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error - {counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent - {counter, 'packets.pingreq.received'}, % PINGREQ packets received - {counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent - {counter, 'packets.disconnect.received'}, % DISCONNECT Packets received - {counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent - {counter, 'packets.auth.received'}, % Auth Packets received - {counter, 'packets.auth.sent'} % Auth Packets sent -]). +%% Packets sent and received +-define(PACKET_METRICS, + [{counter, 'packets.received'}, % All Packets received + {counter, 'packets.sent'}, % All Packets sent + {counter, 'packets.connect.received'}, % CONNECT Packets received + {counter, 'packets.connack.sent'}, % CONNACK Packets sent + {counter, 'packets.connack.error'}, % CONNACK error sent + {counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent + {counter, 'packets.publish.received'}, % PUBLISH packets received + {counter, 'packets.publish.sent'}, % PUBLISH packets sent + {counter, 'packets.publish.error'}, % PUBLISH failed for error + {counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error + {counter, 'packets.publish.dropped'}, % PUBLISH(QoS2) packets dropped + {counter, 'packets.puback.received'}, % PUBACK packets received + {counter, 'packets.puback.sent'}, % PUBACK packets sent + {counter, 'packets.puback.inuse'}, % PUBACK packet_id inuse + {counter, 'packets.puback.missed'}, % PUBACK packets missed + {counter, 'packets.pubrec.received'}, % PUBREC packets received + {counter, 'packets.pubrec.sent'}, % PUBREC packets sent + {counter, 'packets.pubrec.inuse'}, % PUBREC packet_id inuse + {counter, 'packets.pubrec.missed'}, % PUBREC packets missed + {counter, 'packets.pubrel.received'}, % PUBREL packets received + {counter, 'packets.pubrel.sent'}, % PUBREL packets sent + {counter, 'packets.pubrel.missed'}, % PUBREL packets missed + {counter, 'packets.pubcomp.received'}, % PUBCOMP packets received + {counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent + {counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse + {counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed + {counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received + {counter, 'packets.subscribe.error'}, % SUBSCRIBE error + {counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth + {counter, 'packets.suback.sent'}, % SUBACK packets sent + {counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received + {counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error + {counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent + {counter, 'packets.pingreq.received'}, % PINGREQ packets received + {counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent + {counter, 'packets.disconnect.received'}, % DISCONNECT Packets received + {counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent + {counter, 'packets.auth.received'}, % Auth Packets received + {counter, 'packets.auth.sent'} % Auth Packets sent + ]). -%% Messages sent and received of broker --define(MESSAGE_METRICS, [ - {counter, 'messages.received'}, % All Messages received - {counter, 'messages.sent'}, % All Messages sent - {counter, 'messages.qos0.received'}, % QoS0 Messages received - {counter, 'messages.qos0.sent'}, % QoS0 Messages sent - {counter, 'messages.qos1.received'}, % QoS1 Messages received - {counter, 'messages.qos1.sent'}, % QoS1 Messages sent - {counter, 'messages.qos2.received'}, % QoS2 Messages received - {counter, 'messages.qos2.expired'}, % QoS2 Messages expired - {counter, 'messages.qos2.sent'}, % QoS2 Messages sent - {counter, 'messages.qos2.dropped'}, % QoS2 Messages dropped - {gauge, 'messages.retained'}, % Messagea retained - {counter, 'messages.dropped'}, % Messages dropped - {counter, 'messages.expired'}, % Messages expired - {counter, 'messages.forward'} % Messages forward -]). +%% Messages sent/received and pubsub +-define(MESSAGE_METRICS, + [{counter, 'messages.received'}, % All Messages received + {counter, 'messages.sent'}, % All Messages sent + {counter, 'messages.qos0.received'}, % QoS0 Messages received + {counter, 'messages.qos0.sent'}, % QoS0 Messages sent + {counter, 'messages.qos1.received'}, % QoS1 Messages received + {counter, 'messages.qos1.sent'}, % QoS1 Messages sent + {counter, 'messages.qos2.received'}, % QoS2 Messages received + {counter, 'messages.qos2.sent'}, % QoS2 Messages sent + %% PubSub Metrics + {counter, 'messages.publish'}, % Messages Publish + {counter, 'messages.dropped'}, % Messages dropped due to no subscribers + {counter, 'messages.dropped.expired'}, % QoS2 Messages expired + {counter, 'messages.dropped.no_subscribers'}, % Messages dropped + {counter, 'messages.forward'}, % Messages forward + {gauge, 'messages.retained'}, % Messages retained + {gauge, 'messages.delayed'}, % Messages delayed + {counter, 'messages.delivered'}, % Messages delivered + {counter, 'messages.acked'} % Messages acked + ]). --define(CHAN_METRICS, [ - {counter, 'channel.gc'} -]). +%% Delivery metrics +-define(DELIVERY_METRICS, + [{counter, 'delivery.dropped'}, + {counter, 'delivery.dropped.no_local'}, + {counter, 'delivery.dropped.too_large'}, + {counter, 'delivery.dropped.qos0_msg'}, + {counter, 'delivery.dropped.queue_full'}, + {counter, 'delivery.dropped.expired'} + ]). --define(MQTT_METRICS, [ - {counter, 'auth.mqtt.anonymous'} -]). +%% Client Lifecircle metrics +-define(CLIENT_METRICS, + [{counter, 'client.connected'}, + {counter, 'client.authenticate'}, + {counter, 'client.auth.anonymous'}, + {counter, 'client.check_acl'}, + {counter, 'client.subscribe'}, + {counter, 'client.unsubscribe'}, + {counter, 'client.disconnected'} + ]). +%% Session Lifecircle metrics +-define(SESSION_METRICS, + [{counter, 'session.created'}, + {counter, 'session.resumed'}, + {counter, 'session.takeovered'}, + {counter, 'session.discarded'}, + {counter, 'session.terminated'} + ]). -record(state, {next_idx = 1}). @@ -155,8 +185,7 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec(stop() -> ok). -stop() -> - gen_server:stop(?SERVER). +stop() -> gen_server:stop(?SERVER). %%-------------------------------------------------------------------- %% Metrics API @@ -265,7 +294,7 @@ update_counter(Name, Value) -> counters:add(CRef, CIdx, Value). %%-------------------------------------------------------------------- -%% Inc Received/Sent metrics +%% Inc received/sent metrics %%-------------------------------------------------------------------- %% @doc Inc packets received. @@ -276,13 +305,13 @@ inc_recv(Packet) -> do_inc_recv(?PACKET(?CONNECT)) -> inc('packets.connect.received'); -do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) -> +do_inc_recv(?PUBLISH_PACKET(QoS)) -> inc('messages.received'), case QoS of ?QOS_0 -> inc('messages.qos0.received'); ?QOS_1 -> inc('messages.qos1.received'); ?QOS_2 -> inc('messages.qos2.received'); - _ -> ok + _other -> ok end, inc('packets.publish.received'); do_inc_recv(?PACKET(?PUBACK)) -> @@ -319,12 +348,13 @@ do_inc_sent(?CONNACK_PACKET(ReasonCode)) -> (ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'), inc('packets.connack.sent'); -do_inc_sent(?PUBLISH_PACKET(QoS, _PacketId)) -> +do_inc_sent(?PUBLISH_PACKET(QoS)) -> inc('messages.sent'), case QoS of ?QOS_0 -> inc('messages.qos0.sent'); ?QOS_1 -> inc('messages.qos1.sent'); - ?QOS_2 -> inc('messages.qos2.sent') + ?QOS_2 -> inc('messages.qos2.sent'); + _other -> ok end, inc('packets.publish.sent'); do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) -> @@ -360,14 +390,21 @@ init([]) -> CRef = counters:new(?MAX_SIZE, [write_concurrency]), ok = persistent_term:put(?MODULE, CRef), % Create index mapping table - ok = emqx_tables:new(?TAB, [protected, {keypos, 2}, {read_concurrency, true}]), + ok = emqx_tables:new(?TAB, [{keypos, 2}, {read_concurrency, true}]), + Metrics = lists:append([?BYTES_METRICS, + ?PACKET_METRICS, + ?MESSAGE_METRICS, + ?DELIVERY_METRICS, + ?CLIENT_METRICS, + ?SESSION_METRICS + ]), % Store reserved indices - lists:foreach(fun({Type, Name}) -> - Idx = reserved_idx(Name), - Metric = #metric{name = Name, type = Type, idx = Idx}, - true = ets:insert(?TAB, Metric), - ok = counters:put(CRef, Idx, 0) - end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS), + ok = lists:foreach(fun({Type, Name}) -> + Idx = reserved_idx(Name), + Metric = #metric{name = Name, type = Type, idx = Idx}, + true = ets:insert(?TAB, Metric), + ok = counters:put(CRef, Idx, 0) + end, Metrics), {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> @@ -409,58 +446,85 @@ code_change(_OldVsn, State, _Extra) -> reserved_idx('bytes.received') -> 01; reserved_idx('bytes.sent') -> 02; -reserved_idx('packets.received') -> 03; -reserved_idx('packets.sent') -> 04; -reserved_idx('packets.connect.received') -> 05; -reserved_idx('packets.connack.sent') -> 06; -reserved_idx('packets.connack.error') -> 07; -reserved_idx('packets.connack.auth_error') -> 08; -reserved_idx('packets.publish.received') -> 09; -reserved_idx('packets.publish.sent') -> 10; -reserved_idx('packets.publish.error') -> 11; -reserved_idx('packets.publish.auth_error') -> 12; -reserved_idx('packets.puback.received') -> 13; -reserved_idx('packets.puback.sent') -> 14; -reserved_idx('packets.puback.missed') -> 15; -reserved_idx('packets.pubrec.received') -> 16; -reserved_idx('packets.pubrec.sent') -> 17; -reserved_idx('packets.pubrec.missed') -> 18; -reserved_idx('packets.pubrel.received') -> 19; -reserved_idx('packets.pubrel.sent') -> 20; -reserved_idx('packets.pubrel.missed') -> 21; -reserved_idx('packets.pubcomp.received') -> 22; -reserved_idx('packets.pubcomp.sent') -> 23; -reserved_idx('packets.pubcomp.missed') -> 24; -reserved_idx('packets.subscribe.received') -> 25; -reserved_idx('packets.subscribe.error') -> 26; -reserved_idx('packets.subscribe.auth_error') -> 27; -reserved_idx('packets.suback.sent') -> 28; -reserved_idx('packets.unsubscribe.received') -> 29; -reserved_idx('packets.unsubscribe.error') -> 30; -reserved_idx('packets.unsuback.sent') -> 31; -reserved_idx('packets.pingreq.received') -> 32; -reserved_idx('packets.pingresp.sent') -> 33; -reserved_idx('packets.disconnect.received') -> 34; -reserved_idx('packets.disconnect.sent') -> 35; -reserved_idx('packets.auth.received') -> 36; -reserved_idx('packets.auth.sent') -> 37; -reserved_idx('messages.received') -> 38; -reserved_idx('messages.sent') -> 39; -reserved_idx('messages.qos0.received') -> 40; -reserved_idx('messages.qos0.sent') -> 41; -reserved_idx('messages.qos1.received') -> 42; -reserved_idx('messages.qos1.sent') -> 43; -reserved_idx('messages.qos2.received') -> 44; -reserved_idx('messages.qos2.expired') -> 45; -reserved_idx('messages.qos2.sent') -> 46; -reserved_idx('messages.qos2.dropped') -> 47; -reserved_idx('messages.retained') -> 48; -reserved_idx('messages.dropped') -> 49; -reserved_idx('messages.expired') -> 50; -reserved_idx('messages.forward') -> 51; -reserved_idx('auth.mqtt.anonymous') -> 52; -reserved_idx('channel.gc') -> 53; -reserved_idx('packets.pubrec.inuse') -> 54; -reserved_idx('packets.pubcomp.inuse') -> 55; +%% Reserved indices of packet's metrics +reserved_idx('packets.received') -> 10; +reserved_idx('packets.sent') -> 11; +reserved_idx('packets.connect.received') -> 12; +reserved_idx('packets.connack.sent') -> 13; +reserved_idx('packets.connack.error') -> 14; +reserved_idx('packets.connack.auth_error') -> 15; +reserved_idx('packets.publish.received') -> 16; +reserved_idx('packets.publish.sent') -> 17; +reserved_idx('packets.publish.error') -> 18; +reserved_idx('packets.publish.auth_error') -> 19; +reserved_idx('packets.puback.received') -> 20; +reserved_idx('packets.puback.sent') -> 21; +reserved_idx('packets.puback.inuse') -> 22; +reserved_idx('packets.puback.missed') -> 23; +reserved_idx('packets.pubrec.received') -> 24; +reserved_idx('packets.pubrec.sent') -> 25; +reserved_idx('packets.pubrec.inuse') -> 26; +reserved_idx('packets.pubrec.missed') -> 27; +reserved_idx('packets.pubrel.received') -> 28; +reserved_idx('packets.pubrel.sent') -> 29; +reserved_idx('packets.pubrel.missed') -> 30; +reserved_idx('packets.pubcomp.received') -> 31; +reserved_idx('packets.pubcomp.sent') -> 32; +reserved_idx('packets.pubcomp.inuse') -> 33; +reserved_idx('packets.pubcomp.missed') -> 34; +reserved_idx('packets.subscribe.received') -> 35; +reserved_idx('packets.subscribe.error') -> 36; +reserved_idx('packets.subscribe.auth_error') -> 37; +reserved_idx('packets.suback.sent') -> 38; +reserved_idx('packets.unsubscribe.received') -> 39; +reserved_idx('packets.unsubscribe.error') -> 40; +reserved_idx('packets.unsuback.sent') -> 41; +reserved_idx('packets.pingreq.received') -> 42; +reserved_idx('packets.pingresp.sent') -> 43; +reserved_idx('packets.disconnect.received') -> 44; +reserved_idx('packets.disconnect.sent') -> 45; +reserved_idx('packets.auth.received') -> 46; +reserved_idx('packets.auth.sent') -> 47; +reserved_idx('packets.publish.dropped') -> 48; +%% Reserved indices of message's metrics +reserved_idx('messages.received') -> 100; +reserved_idx('messages.sent') -> 101; +reserved_idx('messages.qos0.received') -> 102; +reserved_idx('messages.qos0.sent') -> 103; +reserved_idx('messages.qos1.received') -> 104; +reserved_idx('messages.qos1.sent') -> 105; +reserved_idx('messages.qos2.received') -> 106; +reserved_idx('messages.qos2.sent') -> 107; +reserved_idx('messages.publish') -> 108; +reserved_idx('messages.dropped') -> 109; +reserved_idx('messages.dropped.expired') -> 110; +reserved_idx('messages.dropped.no_subscribers') -> 111; +reserved_idx('messages.forward') -> 112; +reserved_idx('messages.retained') -> 113; +reserved_idx('messages.delayed') -> 114; +reserved_idx('messages.delivered') -> 115; +reserved_idx('messages.acked') -> 116; +reserved_idx('delivery.expired') -> 117; +reserved_idx('delivery.dropped') -> 118; +reserved_idx('delivery.dropped.no_local') -> 119; +reserved_idx('delivery.dropped.too_large') -> 120; +reserved_idx('delivery.dropped.qos0_msg') -> 121; +reserved_idx('delivery.dropped.queue_full') -> 122; +reserved_idx('delivery.dropped.expired') -> 123; + +reserved_idx('client.connected') -> 200; +reserved_idx('client.authenticate') -> 201; +reserved_idx('client.auth.anonymous') -> 202; +reserved_idx('client.check_acl') -> 203; +reserved_idx('client.subscribe') -> 204; +reserved_idx('client.unsubscribe') -> 205; +reserved_idx('client.disconnected') -> 206; + +reserved_idx('session.created') -> 220; +reserved_idx('session.resumed') -> 221; +reserved_idx('session.takeovered') -> 222; +reserved_idx('session.discarded') -> 223; +reserved_idx('session.terminated') -> 224; + reserved_idx(_) -> undefined. diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 398bc2964..69d813cd6 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -118,41 +118,21 @@ do_check_sub(_Flags, _Caps) -> ok. -spec(get_caps(emqx_zone:zone()) -> caps()). get_caps(Zone) -> - with_env(Zone, '$mqtt_caps', fun all_caps/1). + maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS). -spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). get_caps(Zone, publish) -> - with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1); + filter_caps(?PUBCAP_KEYS, get_caps(Zone)); get_caps(Zone, subscribe) -> - with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). + filter_caps(?SUBCAP_KEYS, get_caps(Zone)). -spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()). get_caps(Zone, Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def). -pub_caps(Zone) -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)). - -sub_caps(Zone) -> - filter_caps(?SUBCAP_KEYS, get_caps(Zone)). - -all_caps(Zone) -> - maps:map(fun(Cap, Def) -> - emqx_zone:get_env(Zone, Cap, Def) - end, ?DEFAULT_CAPS). - filter_caps(Keys, Caps) -> maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). -with_env(Zone, Key, InitFun) -> - case emqx_zone:get_env(Zone, Key) of - undefined -> - Caps = InitFun(Zone), - ok = emqx_zone:set_env(Zone, Key, Caps), - Caps; - Caps -> Caps - end. - -spec(default() -> caps()). default() -> ?DEFAULT_CAPS. diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 12154c184..f02304f24 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -53,7 +53,10 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). --export([init/1]). +-export([ init/1 + , info/1 + , info/2 + ]). -export([ is_empty/1 , len/1 @@ -86,6 +89,7 @@ -define(LOWEST_PRIORITY, 0). -define(HIGHEST_PRIORITY, infinity). -define(MAX_LEN_INFINITY, 0). +-define(INFO_KEYS, [store_qos0, max_len, len, dropped]). -record(mqueue, { store_qos0 = false :: boolean(), @@ -111,6 +115,20 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> default_p = get_priority_opt(Opts) }. +-spec(info(mqueue()) -> emqx_types:infos()). +info(MQ) -> + maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]). + +-spec(info(atom(), mqueue()) -> term()). +info(store_qos0, #mqueue{store_qos0 = True}) -> + True; +info(max_len, #mqueue{max_len = MaxLen}) -> + MaxLen; +info(len, #mqueue{len = Len}) -> + Len; +info(dropped, #mqueue{dropped = Dropped}) -> + Dropped. + is_empty(#mqueue{len = Len}) -> Len =:= 0. len(#mqueue{len = Len}) -> Len. diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index d17053894..fb80152f5 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -213,7 +213,7 @@ start_app(App, SuccFun) -> ?LOG(info, "Started plugins: ~p", [Started]), ?LOG(info, "Load plugin ~s successfully", [App]), SuccFun(App), - {ok, Started}; + ok; {error, {ErrApp, Reason}} -> ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9561efef9..f9e304aa0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -398,18 +398,17 @@ dequeue(0, Msgs, Q) -> dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> dequeue(0, Msgs, Q); - {{value, Msg = #message{qos = ?QOS_0}}, Q1} -> - dequeue(Cnt, acc_msg(Msg, Msgs), Q1); {{value, Msg}, Q1} -> - dequeue(Cnt-1, acc_msg(Msg, Msgs), Q1) + case emqx_message:is_expired(Msg) of + true -> ok = inc_expired_cnt(delivery), + dequeue(Cnt, Msgs, Q1); + false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) + end end. --compile({inline, [acc_msg/2]}). -acc_msg(Msg, Msgs) -> - case emqx_message:is_expired(Msg) of - true -> Msgs; - false -> [Msg|Msgs] - end. +-compile({inline, [acc_cnt/2]}). +acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt; +acc_cnt(_Msg, Cnt) -> Cnt - 1. %%-------------------------------------------------------------------- %% Broker -> Client: Deliver @@ -467,13 +466,20 @@ enqueue(Delivers, Session) when is_list(Delivers) -> enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - if is_record(Dropped, message) -> - ?LOG(warning, "Dropped msg due to mqueue is full: ~s", - [emqx_message:format(Dropped)]); - true -> ok - end, + (Dropped =/= undefined) andalso log_dropped(Dropped, Session), Session#session{mqueue = NewQ}. +log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> + case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of + true -> + ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), + ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); + false -> + ok = emqx_metrics:inc('delivery.dropped.queue_full'), + ?LOG(warning, "Dropped msg due to mqueue is full: ~s", + [emqx_message:format(Msg)]) + end. + enrich_fun(Session = #session{subscriptions = Subs}) -> fun({deliver, Topic, Msg}) -> enrich_subopts(get_subopts(Topic, Subs), Msg, Session) @@ -555,6 +561,7 @@ retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> + ok = inc_expired_cnt(delivery), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), @@ -581,6 +588,8 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), + ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1), + (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt), NSession = Session#session{awaiting_rel = AwaitingRel1}, case maps:size(AwaitingRel1) of 0 -> {ok, NSession}; @@ -617,10 +626,28 @@ replay(Inflight) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)). +%%-------------------------------------------------------------------- +%% Inc message/delivery expired counter +%%-------------------------------------------------------------------- + +-compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}). + +inc_expired_cnt(K) -> inc_expired_cnt(K, 1). + +inc_expired_cnt(delivery, N) -> + ok = emqx_metrics:inc('delivery.dropped', N), + emqx_metrics:inc('delivery.dropped.expired', N); + +inc_expired_cnt(message, N) -> + ok = emqx_metrics:inc('messages.dropped', N), + emqx_metrics:inc('messages.dropped.expired', N). + %%-------------------------------------------------------------------- %% Next Packet Id %%-------------------------------------------------------------------- +-compile({inline, [next_pkt_id/1]}). + next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> Session#session{next_pkt_id = 1}; diff --git a/src/emqx_time.erl b/src/emqx_time.erl deleted file mode 100644 index 211ea3d9e..000000000 --- a/src/emqx_time.erl +++ /dev/null @@ -1,51 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_time). - --export([ seed/0 - , now_secs/0 - , now_secs/1 - , now_ms/0 - , now_ms/1 - ]). - --compile({inline, - [ seed/0 - , now_secs/0 - , now_secs/1 - , now_ms/0 - , now_ms/1 - ]}). - -seed() -> - rand:seed(exsplus, erlang:timestamp()). - --spec(now_secs() -> pos_integer()). -now_secs() -> - erlang:system_time(second). - --spec(now_secs(erlang:timestamp()) -> pos_integer()). -now_secs({MegaSecs, Secs, _MicroSecs}) -> - MegaSecs * 1000000 + Secs. - --spec(now_ms() -> pos_integer()). -now_ms() -> - erlang:system_time(millisecond). - --spec(now_ms(erlang:timestamp()) -> pos_integer()). -now_ms({MegaSecs, Secs, MicroSecs}) -> - (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). \ No newline at end of file diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 7b887faf8..b5caa49d7 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -124,6 +124,7 @@ -type(clientinfo() :: #{zone := zone(), protocol := protocol(), peerhost := peerhost(), + sockport := non_neg_integer(), clientid := clientid(), username := username(), peercert := esockd_peercert:peercert(), diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 16f87ff4a..c9be74c80 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -427,8 +427,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> run_gc(Stats, State = #state{gc_state = GcSt}) -> case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of - {IsGC, GcSt1} -> - IsGC andalso emqx_metrics:inc('channel.gc'), + {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}; false -> State end. @@ -521,6 +520,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> case Serialize(Packet) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", [emqx_packet:format(Packet)]), + ok = emqx_metrics:inc('delivery.dropped.too_large'), + ok = emqx_metrics:inc('delivery.dropped'), <<>>; Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index e2cb977df..aa8dfb743 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -53,13 +53,13 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - ok = meck:unload(emqx_access_control), - ok = meck:unload(emqx_metrics), - ok = meck:unload(emqx_session), - ok = meck:unload(emqx_broker), - ok = meck:unload(emqx_hooks), - ok = meck:unload(emqx_cm), - ok. + meck:unload([emqx_access_control, + emqx_metrics, + emqx_session, + emqx_broker, + emqx_hooks, + emqx_cm + ]). init_per_testcase(_TestCase, Config) -> Config. @@ -334,15 +334,15 @@ t_handle_out_publish(_) -> t_handle_out_publish_1(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} = - emqx_channel:handle_out(publish, [{1, Msg}], channel()). + {ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan} + = emqx_channel:handle_out(publish, [{1, Msg}], channel()). t_handle_out_publish_nl(_) -> ClientInfo = clientinfo(#{clientid => <<"clientid">>}), Channel = channel(#{clientinfo => ClientInfo}), Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>), Pubs = [{1, emqx_message:set_flag(nl, Msg)}], - {ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel). + {ok, {outgoing,[]}, Channel} = emqx_channel:handle_out(publish, Pubs, Channel). t_handle_out_connack_sucess(_) -> {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 9f87ce3b2..d0c3f23f5 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -35,6 +35,10 @@ init_per_suite(Config) -> ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + %% Meck Limiter + ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), + %% Meck Pd + ok = meck:new(emqx_pd, [passthrough, no_history, no_link]), %% Meck Metrics ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]), ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), @@ -46,6 +50,8 @@ end_per_suite(_Config) -> ok = meck:unload(emqx_transport), ok = meck:unload(emqx_channel), ok = meck:unload(emqx_cm), + ok = meck:unload(emqx_limiter), + ok = meck:unload(emqx_pd), ok = meck:unload(emqx_metrics), ok. @@ -72,6 +78,233 @@ end_per_testcase(_TestCase, Config) -> %% Test cases %%-------------------------------------------------------------------- +t_info(_) -> + CPid = spawn(fun() -> + receive + {'$gen_call', From, info} -> + gen_server:reply(From, emqx_connection:info(st())) + after + 0 -> error("error") + end + end), + #{sockinfo := SockInfo} = emqx_connection:info(CPid), + ?assertMatch(#{active_n := 100, + peername := {{127,0,0,1},3456}, + sockname := {{127,0,0,1},1883}, + sockstate := idle, + socktype := tcp}, SockInfo). + +t_info_limiter(_) -> + St = st(#{limiter => emqx_limiter:init([])}), + ?assertEqual(undefined, emqx_connection:info(limiter, St)). + +t_stats(_) -> + CPid = spawn(fun() -> + receive + {'$gen_call', From, stats} -> + gen_server:reply(From, emqx_connection:stats(st())) + after + 0 -> error("error") + end + end), + Stats = emqx_connection:stats(CPid), + ?assertMatch([{recv_oct,0}, + {recv_cnt,0}, + {send_oct,0}, + {send_cnt,0}, + {send_pend,0}| _] , Stats). + +t_process_msg(_) -> + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, + fun(_Packet, Channel) -> + {ok, Channel} + end), + CPid ! {incoming, ?PACKET(?PINGREQ)}, + CPid ! {incoming, undefined}, + CPid ! {tcp_passive, sock}, + CPid ! {tcp_closed, sock}, + timer:sleep(100), + ok = trap_exit(CPid, {shutdown, tcp_closed}) + end, #{trap_exit => true}). + +t_ensure_stats_timer(_) -> + NStats = emqx_connection:ensure_stats_timer(100, st()), + Stats_timer = emqx_connection:info(stats_timer, NStats), + ?assert(is_reference(Stats_timer)), + ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)). + +t_cancel_stats_timer(_) -> + NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})), + Stats_timer = emqx_connection:info(stats_timer, NStats), + ?assertEqual(undefined, Stats_timer), + ?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)). + +t_append_msg(_) -> + ?assertEqual([msg], emqx_connection:append_msg([], [msg])), + ?assertEqual([msg], emqx_connection:append_msg([], msg)), + ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], [msg])), + ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], msg)). + +t_handle_msg(_) -> + From = {make_ref(), self()}, + ?assertMatch({ok, _St}, emqx_connection:handle_msg({'$gen_call', From, for_testing}, st())), + ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())), + ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())), + ?assertMatch({ok, [], _St}, emqx_connection:handle_msg({tcp, From, <<"for_testing">>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg(for_testing, st())). + +t_handle_msg_incoming(_) -> + ?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())), + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, undefined}, st())). + +t_handle_msg_outgoing(_) -> + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())). + +t_handle_msg_tcp_error(_) -> + ?assertMatch({stop, {shutdown, econnreset}, _St}, emqx_connection:handle_msg({tcp_error, sock, econnreset}, st())). + +t_handle_msg_tcp_closed(_) -> + ?assertMatch({stop, {shutdown, tcp_closed}, _St}, emqx_connection:handle_msg({tcp_closed, sock}, st())). + +t_handle_msg_passive(_) -> + ?assertMatch({ok, _Event, _St}, emqx_connection:handle_msg({tcp_passive, sock}, st())). + +t_handle_msg_deliver(_) -> + ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({deliver, topic, msg}, st())). + +t_handle_msg_inet_reply(_) -> + ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))), + ?assertEqual(ok, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))), + ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). + +t_handle_msg_connack(_) -> + ?assertEqual(ok, emqx_connection:handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())). + +t_handle_msg_close(_) -> + ?assertMatch({stop, {shutdown, normal}, _St}, emqx_connection:handle_msg({close, normal}, st())). + +t_handle_msg_event(_) -> + ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end), + ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), + ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), + ?assertEqual(ok, emqx_connection:handle_msg({event, connected}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, disconnected}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, st())). + +t_handle_msg_timeout(_) -> + ?assertMatch({ok, _St}, emqx_connection:handle_msg({timeout, make_ref(), for_testing}, st())). + +t_handle_msg_shutdown(_) -> + ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({shutdown, for_testing}, st())). + +t_handle_call(_) -> + St = st(), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)), + ?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)), + ?assertMatch({reply, _Stats, _NSt }, emqx_connection:handle_call(self(), stats, St)), + ?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)), + ?assertEqual({stop, {shutdown,kicked}, ok, St}, emqx_connection:handle_call(self(), kick, St)). + +t_handle_timeout(_) -> + TRef = make_ref(), + State = st(#{idle_timer => TRef, limit_timer => TRef, stats_timer => TRef}), + ?assertMatch({stop, {shutdown,idle_timeout}, _NState}, emqx_connection:handle_timeout(TRef, idle_timeout, State)), + ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_timeout(TRef, limit_timeout, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, emit_stats, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)), + + ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), + ?assertMatch({stop, {shutdown,for_testing}, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)). + +t_parse_incoming(_) -> + ?assertMatch({ok, [], _NState}, emqx_connection:parse_incoming(<<>>, st())), + ?assertMatch({[], _NState}, emqx_connection:parse_incoming(<<"for_testing">>, [], st())). + +t_next_incoming_msgs(_) -> + ?assertEqual({incoming, packet}, emqx_connection:next_incoming_msgs([packet])), + ?assertEqual([{incoming, packet2}, {incoming, packet1}], emqx_connection:next_incoming_msgs([packet1, packet2])). + +t_handle_incoming(_) -> + ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())), + ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())). + +t_with_channel(_) -> + State = st(), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> ok end), + ?assertEqual({ok, State}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, Channel} end), + ?assertMatch({ok, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, ?DISCONNECT_PACKET(),Channel} end), + ?assertMatch({ok, _Out, _NChannel}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], Channel} end), + ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel} end), + ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)). + +t_handle_outgoing(_) -> + ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), + ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). + +t_handle_info(_) -> + ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())), + ?assertMatch({stop, {shutdown, for_testing}, _NStats}, emqx_connection:handle_info({sock_error, for_testing}, st())), + ?assertMatch({ok, _NState}, emqx_connection:handle_info(for_testing, st())). + +t_ensure_rate_limit(_) -> + State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})), + ?assertEqual(undefined, emqx_connection:info(limiter, State)), + + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init([])} end), + State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), + ?assertEqual(undefined, emqx_connection:info(limiter, State1)), + + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init([])} end), + State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), + ?assertEqual(undefined, emqx_connection:info(limiter, State2)), + ?assertEqual(blocked, emqx_connection:info(sockstate, State2)). + +t_activate_socket(_) -> + State = st(), + {ok, NStats} = emqx_connection:activate_socket(State), + ?assertEqual(running, emqx_connection:info(sockstate, NStats)), + + State1 = st(#{sockstate => blocked}), + ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)), + + State2 = st(#{sockstate => closed}), + ?assertEqual({ok, State2}, emqx_connection:activate_socket(State2)). + +t_close_socket(_) -> + State = emqx_connection:close_socket(st(#{sockstate => closed})), + ?assertEqual(closed, emqx_connection:info(sockstate, State)), + State1 = emqx_connection:close_socket(st()), + ?assertEqual(closed, emqx_connection:info(sockstate, State1)). + +t_system_code_change(_) -> + State = st(), + ?assertEqual({ok, State}, emqx_connection:system_code_change(State, [], [], [])). + +t_next_msgs(_) -> + ?assertEqual({outgoing, ?CONNECT_PACKET()}, emqx_connection:next_msgs(?CONNECT_PACKET())), + ?assertEqual({}, emqx_connection:next_msgs({})), + ?assertEqual([], emqx_connection:next_msgs([])). + t_start_link_ok(_) -> with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end). @@ -99,262 +332,6 @@ t_get_conn_info(_) -> }, SockInfo) end). -t_handle_call_discard(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}). - -t_handle_call_takeover(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun({takeover, 'begin'}, Channel) -> - {reply, session, Channel}; - ({takeover, 'end'}, Channel) -> - {shutdown, takeovered, [], Channel} - end), - session = emqx_connection:call(CPid, {takeover, 'begin'}), - [] = emqx_connection:call(CPid, {takeover, 'end'}), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, takeovered}) - end, #{trap_exit => true}). - -t_handle_call_any(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(_Req, Channel) -> {reply, ok, Channel} end), - ok = emqx_connection:call(CPid, req) - end). - -t_handle_incoming_connect(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, - proto_name = <<"MQTT">>, - clientid = <<>>, - clean_start = true, - keepalive = 60 - }, - Frame = make_frame(?CONNECT_PACKET(ConnPkt)), - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_publish(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)), - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_subscribe(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_unsubscribe(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_undefined(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - CPid ! {incoming, undefined} - end). - -t_handle_sock_error(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({_, Reason}, Channel) -> - {shutdown, Reason, Channel} - end), - %% TODO: fixme later - CPid ! {tcp_error, sock, econnreset}, - timer:sleep(100), - trap_exit(CPid, {shutdown, econnreset}) - end, #{trap_exit => true}). - -t_handle_sock_activate(_) -> - with_conn(fun(CPid) -> CPid ! activate_socket end). - -t_handle_sock_closed(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({sock_closed, Reason}, Channel) -> - {shutdown, Reason, Channel} - end), - CPid ! {tcp_closed, sock}, - timer:sleep(100), - trap_exit(CPid, {shutdown, tcp_closed}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({sock_closed, Reason}, Channel) -> - {shutdown, Reason, ?DISCONNECT_PACKET(), Channel} - end), - CPid ! {tcp_closed, sock}, - timer:sleep(100), - trap_exit(CPid, {shutdown, tcp_closed}) - end, #{trap_exit => true}). - -t_handle_outgoing(_) -> - with_conn(fun(CPid) -> - Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), - CPid ! {outgoing, Publish}, - CPid ! {outgoing, ?PUBREL_PACKET(1)}, - CPid ! {outgoing, [?PUBCOMP_PACKET(1)]} - end). - -t_conn_rate_limit(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - lists:foreach(fun(I) -> - Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)), - CPid ! {tcp, sock, make_frame(Publish)} - end, [1, 2]) - end, #{active_n => 1, rate_limit => {1, 1024}}). - -t_conn_pub_limit(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - ok = lists:foreach(fun(I) -> - CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)} - end, lists:seq(1, 3)) - %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) - end, #{active_n => 1, publish_limit => {1, 2}}). - -t_conn_pingreq(_) -> - with_conn(fun(CPid) -> CPid ! {incoming, ?PACKET(?PINGREQ)} end). - -t_inet_reply(_) -> - ok = meck:new(emqx_pd, [passthrough, no_history]), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - CPid ! {inet_reply, for_testing, ok}, - timer:sleep(100) - end, #{active_n => 1, trap_exit => true}), - ok = meck:unload(emqx_pd), - with_conn(fun(CPid) -> - CPid ! {inet_reply, for_testing, {error, for_testing}}, - timer:sleep(100), - trap_exit(CPid, {shutdown, for_testing}) - end, #{trap_exit => true}). - -t_deliver(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_deliver, - fun(_, Channel) -> {ok, Channel} end), - CPid ! {deliver, topic, msg} - end). - -t_event_disconnected(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), - CPid ! {event, disconnected} - end). - -t_event_undefined(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {event, undefined} - end). - -t_cloes(_) -> - with_conn(fun(CPid) -> - CPid ! {close, normal}, - timer:sleep(100), - trap_exit(CPid, {shutdown, normal}) - end, #{trap_exit => true}). - -t_oom_shutdown(_) -> - with_conn(fun(CPid) -> - CPid ! {shutdown, message_queue_too_long}, - timer:sleep(100), - trap_exit(CPid, {shutdown, message_queue_too_long}) - end, #{trap_exit => true}). - -t_handle_idle_timeout(_) -> - ok = emqx_zone:set_env(external, idle_timeout, 10), - with_conn(fun(CPid) -> - timer:sleep(100), - trap_exit(CPid, {shutdown, idle_timeout}) - end, #{zone => external, trap_exit => true}). - -t_handle_emit_stats(_) -> - ok = emqx_zone:set_env(external, idle_timeout, 1000), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false, - max_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4 - })}, - timer:sleep(1000) - end,#{zone => external, trap_exit => true}). - -t_handle_limit_timeout(_) -> - with_conn(fun(CPid) -> - CPid ! {timeout, undefined, limit_timeout}, - timer:sleep(100), - true = erlang:is_process_alive(CPid) - end). - -t_handle_keepalive_timeout(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - trap_exit(CPid, {shutdown, keepalive_timeout}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - false = erlang:is_process_alive(CPid) - end, #{trap_exit => true}). - -t_handle_shutdown(_) -> - with_conn(fun(CPid) -> - CPid ! Shutdown = {shutdown, reason}, - timer:sleep(100), - trap_exit(CPid, Shutdown) - end, #{trap_exit => true}). - -t_exit_message(_) -> - with_conn(fun(CPid) -> - CPid ! {'EXIT', CPid, for_testing}, - timer:sleep(1000) - end, #{trap_exit => true}). - %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -406,3 +383,45 @@ make_frame(Packet) -> payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). +st() -> st(#{}). +st(InitFields) when is_map(InitFields) -> + St = emqx_connection:init_state(emqx_transport, sock, [#{zone => external}]), + maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end, + emqx_connection:set_field(channel, channel(), St), + InitFields + ). + +channel() -> channel(#{}). +channel(InitFields) -> + ConnInfo = #{peername => {{127,0,0,1}, 3456}, + sockname => {{127,0,0,1}, 18083}, + conn_mod => emqx_connection, + proto_name => <<"MQTT">>, + proto_ver => ?MQTT_PROTO_V5, + clean_start => true, + keepalive => 30, + clientid => <<"clientid">>, + username => <<"username">>, + receive_maximum => 100, + expiry_interval => 0 + }, + ClientInfo = #{zone => zone, + protocol => mqtt, + peerhost => {127,0,0,1}, + clientid => <<"clientid">>, + username => <<"username">>, + is_superuser => false, + peercert => undefined, + mountpoint => undefined + }, + Session = emqx_session:init(#{zone => external}, + #{receive_maximum => 0} + ), + maps:fold(fun(Field, Value, Channel) -> + emqx_channel:set_field(Field, Value, Channel) + end, + emqx_channel:init(ConnInfo, [{zone, zone}]), + maps:merge(#{clientinfo => ClientInfo, + session => Session, + conn_state => connected + }, InitFields)). \ No newline at end of file diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 24eedccd0..a2d9b17c4 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -68,6 +68,14 @@ t_timestamp(_) -> timer:sleep(1), ?assert(erlang:system_time(millisecond) > emqx_message:timestamp(Msg)). +t_is_sys(_) -> + Msg0 = emqx_message:make(<<"t">>, <<"payload">>), + ?assertNot(emqx_message:is_sys(Msg0)), + Msg1 = emqx_message:set_flag(sys, Msg0), + ?assert(emqx_message:is_sys(Msg1)), + Msg2 = emqx_message:make(<<"$SYS/events">>, <<"payload">>), + ?assert(emqx_message:is_sys(Msg2)). + t_clean_dup(_) -> Msg = emqx_message:make(<<"topic">>, <<"payload">>), ?assertNot(emqx_message:get_flag(dup, Msg)), diff --git a/test/emqx_mqtt_SUITE.erl b/test/emqx_mqtt_SUITE.erl index ffdf07800..c113702c3 100644 --- a/test/emqx_mqtt_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -49,54 +49,66 @@ t_tcp_sock_passive(_) -> with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []). t_message_expiry_interval_1(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). + ClientA = message_expiry_interval_init(), + [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], + emqtt:stop(ClientA). t_message_expiry_interval_2(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). + ClientA = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], + emqtt:stop(ClientA). message_expiry_interval_init() -> - {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientA), - {ok, _} = emqtt:connect(ClientB), - %% subscribe and disconnect client-b - emqtt:subscribe(ClientB, <<"t/a">>, 1), - emqtt:stop(ClientB), - ClientA. + {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-a">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-b">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientA), + {ok, _} = emqtt:connect(ClientB), + %% subscribe and disconnect client-b + emqtt:subscribe(ClientB, <<"t/a">>, 1), + emqtt:stop(ClientB), + ClientA. message_expiry_interval_exipred(ClientA, QoS) -> - ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), - %% publish to t/a and waiting for the message expired - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), - ct:sleep(1500), + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a and waiting for the message expired + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), + ct:sleep(1500), - %% resume the session for client-b - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + %% resume the session for client-b + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-b">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), - %% verify client-b could not receive the publish message - receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> - ct:fail(should_have_expired) - after 300 -> - ok - end, - emqtt:stop(ClientB1). + %% verify client-b could not receive the publish message + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + ct:fail(should_have_expired) + after 300 -> + ok + end, + emqtt:stop(ClientB1). message_expiry_interval_not_exipred(ClientA, QoS) -> - ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), - %% publish to t/a - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), - %% wait for 1s and then resume the session for client-b, the message should not expires - %% as Message-Expiry-Interval = 20s - ct:sleep(1000), - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + %% wait for 1s and then resume the session for client-b, the message should not expires + %% as Message-Expiry-Interval = 20s + ct:sleep(1000), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-b">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), %% verify client-b could receive the publish message and the Message-Expiry-Interval is set receive diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 961229ed0..590753709 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -28,7 +28,9 @@ t_check_pub(_) -> PubCaps = #{max_qos_allowed => ?QOS_1, retain_available => false }, - ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), + lists:foreach(fun({Key, Val}) -> + ok = emqx_zone:set_env(zone, Key, Val) + end, maps:to_list(PubCaps)), ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, retain => false}), PubFlags1 = #{qos => ?QOS_2, retain => false}, @@ -37,7 +39,9 @@ t_check_pub(_) -> PubFlags2 = #{qos => ?QOS_1, retain => true}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, emqx_mqtt_caps:check_pub(zone, PubFlags2)), - true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). + lists:foreach(fun({Key, _Val}) -> + true = emqx_zone:unset_env(zone, Key) + end, maps:to_list(PubCaps)). t_check_sub(_) -> SubOpts = #{rh => 0, @@ -50,7 +54,9 @@ t_check_sub(_) -> shared_subscription => false, wildcard_subscription => false }, - ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), + lists:foreach(fun({Key, Val}) -> + ok = emqx_zone:set_env(zone, Key, Val) + end, maps:to_list(SubCaps)), ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), @@ -58,4 +64,6 @@ t_check_sub(_) -> emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), - true = emqx_zone:unset_env(zone, '$mqtt_sub_caps'). + lists:foreach(fun({Key, _Val}) -> + true = emqx_zone:unset_env(zone, Key) + end, maps:to_list(SubCaps)). diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index a421b6391..921491dff 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -28,24 +28,17 @@ all() -> emqx_ct:all(?MODULE). - -% t_init(_) -> -% error('TODO'). - -% t_is_empty(_) -> -% error('TODO'). - -% t_len(_) -> -% error('TODO'). - -% t_max_len(_) -> -% error('TODO'). - -% t_dropped(_) -> -% error('TODO'). - -% t_stats(_) -> -% error('TODO'). +t_info(_) -> + Q = ?Q:init(#{max_len => 5, store_qos0 => true}), + true = ?Q:info(store_qos0, Q), + 5 = ?Q:info(max_len, Q), + 0 = ?Q:info(len, Q), + 0 = ?Q:info(dropped, Q), + #{store_qos0 := true, + max_len := 5, + len := 0, + dropped := 0 + } = ?Q:info(Q). t_in(_) -> Opts = #{max_len => 5, store_qos0 => true}, @@ -163,3 +156,10 @@ t_length_priority_mqueue(_) -> {{value, _Val}, Q5} = ?Q:out(Q4), ?assertEqual(1, ?Q:len(Q5)). +t_dropped(_) -> + Q = ?Q:init(#{max_len => 1, store_qos0 => true}), + Msg = emqx_message:make(<<"t">>, <<"payload">>), + {undefined, Q1} = ?Q:in(Msg, Q), + {Msg, Q2} = ?Q:in(Msg, Q1), + ?assertEqual(1, ?Q:dropped(Q2)). + diff --git a/test/emqx_plugins_SUITE.erl b/test/emqx_plugins_SUITE.erl index 29d3d5964..1343076d6 100644 --- a/test/emqx_plugins_SUITE.erl +++ b/test/emqx_plugins_SUITE.erl @@ -59,7 +59,7 @@ t_load(_) -> ?assertEqual([], emqx_plugins:unload()), ?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)), - ?assertMatch({ok, _}, emqx_plugins:load(emqx_mini_plugin)), + ?assertMatch(ok, emqx_plugins:load(emqx_mini_plugin)), ?assertEqual({error, already_started}, emqx_plugins:load(emqx_mini_plugin)), ?assertEqual(ok, emqx_plugins:unload(emqx_mini_plugin)), ?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)), @@ -127,7 +127,7 @@ t_load_plugin(_) -> (App) -> {ok, App} end), ?assertMatch({error, _}, emqx_plugins:load_plugin(#plugin{name = already_loaded_app}, true)), - ?assertMatch({ok, _}, emqx_plugins:load_plugin(#plugin{name = normal}, true)), + ?assertMatch(ok, emqx_plugins:load_plugin(#plugin{name = normal}, true)), ?assertMatch({error,_}, emqx_plugins:load_plugin(#plugin{name = error_app}, true)), ok = meck:unload(application). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 9f9aff825..457afc71e 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -31,14 +31,15 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> %% Broker - ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), - ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]), + ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker], + [passthrough, no_history, no_link]), + ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), + ok = meck:expect(emqx_metrics, inc, fun(_K, _V) -> ok end), ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), Config. end_per_suite(_Config) -> - ok = meck:unload(emqx_broker), - ok = meck:unload(emqx_hooks). + meck:unload([emqx_broker, emqx_hooks, emqx_metrics]). init_per_testcase(_TestCase, Config) -> Config. diff --git a/test/emqx_time_SUITE.erl b/test/emqx_time_SUITE.erl deleted file mode 100644 index 620fa1ee5..000000000 --- a/test/emqx_time_SUITE.erl +++ /dev/null @@ -1,33 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_time_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -t_seed(_) -> - ?assert(is_tuple(emqx_time:seed())). - -t_now_secs(_) -> - ?assert(emqx_time:now_secs() =< emqx_time:now_secs(os:timestamp())). - -t_now_ms(_) -> - ?assert(emqx_time:now_ms() =< emqx_time:now_ms(os:timestamp())). \ No newline at end of file