Merge pull request #3130 from emqx/master
Auto-pull-request-by-2019-12-21
This commit is contained in:
commit
c57c037c53
|
@ -261,8 +261,6 @@ node.fullsweep_after = 1000
|
|||
## Value: Log file
|
||||
node.crash_dump = {{ platform_log_dir }}/crash.dump
|
||||
|
||||
|
||||
|
||||
## Specify SSL Options in the file if using SSL for Erlang Distribution.
|
||||
##
|
||||
## Value: File
|
||||
|
|
|
@ -43,6 +43,8 @@
|
|||
|
||||
-define(LOG(Level, Format, Args),
|
||||
begin
|
||||
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}))
|
||||
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end,
|
||||
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY},
|
||||
line => ?LINE}))
|
||||
end).
|
||||
|
||||
|
|
|
@ -35,9 +35,6 @@
|
|||
-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
|
||||
authenticate(ClientInfo = #{zone := Zone}) ->
|
||||
case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
|
||||
Result = #{auth_result := success, anonymous := true} ->
|
||||
emqx_metrics:inc('auth.mqtt.anonymous'),
|
||||
{ok, Result};
|
||||
Result = #{auth_result := success} ->
|
||||
{ok, Result};
|
||||
Result ->
|
||||
|
|
|
@ -160,9 +160,9 @@ do_subscribe(Group, Topic, SubPid, SubOpts) ->
|
|||
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
||||
emqx_shared_sub:subscribe(Group, Topic, SubPid).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Unsubscribe API
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(unsubscribe(emqx_topic:topic()) -> ok).
|
||||
unsubscribe(Topic) when is_binary(Topic) ->
|
||||
|
@ -198,43 +198,46 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
|
|||
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
|
||||
publish(Msg) when is_record(Msg, message) ->
|
||||
_ = emqx_tracer:trace(publish, Msg),
|
||||
Msg1 = emqx_message:set_header(allow_publish, true,
|
||||
emqx_message:clean_dup(Msg)),
|
||||
case emqx_hooks:run_fold('message.publish', [], Msg1) of
|
||||
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
||||
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
||||
#message{headers = #{allow_publish := false}} ->
|
||||
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg1)]),
|
||||
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
|
||||
[];
|
||||
#message{topic = Topic} = Msg2 ->
|
||||
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg2))
|
||||
Msg1 = #message{topic = Topic} ->
|
||||
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
|
||||
end.
|
||||
|
||||
%% Called internally
|
||||
-spec(safe_publish(emqx_types:message()) -> ok | emqx_types:publish_result()).
|
||||
-spec(safe_publish(emqx_types:message()) -> emqx_types:publish_result()).
|
||||
safe_publish(Msg) when is_record(Msg, message) ->
|
||||
try
|
||||
publish(Msg)
|
||||
catch
|
||||
_:Error:Stacktrace ->
|
||||
?LOG(error, "Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
|
||||
_:Error:Stk->
|
||||
?LOG(error, "Publish error: ~p~n~s~n~p",
|
||||
[Error, emqx_message:format(Msg), Stk])
|
||||
after
|
||||
ok
|
||||
[]
|
||||
end.
|
||||
|
||||
delivery(Msg) ->
|
||||
#delivery{sender = self(), message = Msg}.
|
||||
-compile({inline, [delivery/1]}).
|
||||
delivery(Msg) -> #delivery{sender = self(), message = Msg}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Route
|
||||
%%------------------------------------------------------------------------------
|
||||
-spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()).
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(route([emqx_types:route_entry()], emqx_types:delivery())
|
||||
-> emqx_types:publish_result()).
|
||||
route([], #delivery{message = Msg}) ->
|
||||
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
||||
inc_dropped_cnt(Msg#message.topic),
|
||||
ok = inc_dropped_cnt(Msg),
|
||||
[];
|
||||
|
||||
route(Routes, Delivery) ->
|
||||
lists:foldl(fun(Route, Acc) ->
|
||||
[do_route(Route, Delivery) | Acc]
|
||||
end, [], Routes).
|
||||
[do_route(Route, Delivery) | Acc]
|
||||
end, [], Routes).
|
||||
|
||||
do_route({To, Node}, Delivery) when Node =:= node() ->
|
||||
{Node, To, dispatch(To, Delivery)};
|
||||
|
@ -243,8 +246,7 @@ do_route({To, Node}, Delivery) when is_atom(Node) ->
|
|||
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
|
||||
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
|
||||
|
||||
aggre([]) ->
|
||||
[];
|
||||
aggre([]) -> [];
|
||||
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
|
||||
[{To, Node}];
|
||||
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
|
||||
|
@ -258,41 +260,40 @@ aggre(Routes) ->
|
|||
end, [], Routes).
|
||||
|
||||
%% @doc Forward message to another node.
|
||||
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
|
||||
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync|async)
|
||||
-> emqx_types:deliver_result()).
|
||||
forward(Node, To, Delivery, async) ->
|
||||
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||
true -> ok;
|
||||
true -> emqx_metrics:inc('messages.forward');
|
||||
{badrpc, Reason} ->
|
||||
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||
?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]),
|
||||
{error, badrpc}
|
||||
end;
|
||||
|
||||
forward(Node, To, Delivery, sync) ->
|
||||
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||
{badrpc, Reason} ->
|
||||
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||
?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]),
|
||||
{error, badrpc};
|
||||
Result -> Result
|
||||
Result ->
|
||||
emqx_metrics:inc('messages.forward'), Result
|
||||
end.
|
||||
|
||||
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
||||
dispatch(Topic, #delivery{message = Msg}) ->
|
||||
case subscribers(Topic) of
|
||||
[] ->
|
||||
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
||||
inc_dropped_cnt(Topic),
|
||||
{error, no_subscribers};
|
||||
[] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
||||
_ = inc_dropped_cnt(Topic),
|
||||
{error, no_subscribers};
|
||||
[Sub] -> %% optimize?
|
||||
dispatch(Sub, Topic, Msg);
|
||||
Subs ->
|
||||
lists:foldl(
|
||||
fun(Sub, Res) ->
|
||||
case dispatch(Sub, Topic, Msg) of
|
||||
ok -> Res;
|
||||
Err -> Err
|
||||
end
|
||||
end, ok, Subs)
|
||||
Subs -> lists:foldl(
|
||||
fun(Sub, Res) ->
|
||||
case dispatch(Sub, Topic, Msg) of
|
||||
ok -> Res;
|
||||
Err -> Err
|
||||
end
|
||||
end, ok, Subs)
|
||||
end.
|
||||
|
||||
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
||||
|
@ -302,6 +303,7 @@ dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
|||
ok;
|
||||
false -> {error, subscriber_die}
|
||||
end;
|
||||
|
||||
dispatch({shard, I}, Topic, Msg) ->
|
||||
lists:foldl(
|
||||
fun(SubPid, Res) ->
|
||||
|
@ -311,20 +313,24 @@ dispatch({shard, I}, Topic, Msg) ->
|
|||
end
|
||||
end, ok, subscribers({shard, Topic, I})).
|
||||
|
||||
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
|
||||
ok;
|
||||
inc_dropped_cnt(_Topic) ->
|
||||
emqx_metrics:inc('messages.dropped').
|
||||
-compile({inline, [inc_dropped_cnt/1]}).
|
||||
inc_dropped_cnt(Msg) ->
|
||||
case emqx_message:is_sys(Msg) of
|
||||
true -> ok;
|
||||
false -> ok = emqx_metrics:inc('messages.dropped'),
|
||||
emqx_metrics:inc('messages.dropped.no_subscribers')
|
||||
end.
|
||||
|
||||
-compile({inline, [subscribers/1]}).
|
||||
-spec(subscribers(emqx_topic:topic()) -> [pid()]).
|
||||
subscribers(Topic) when is_binary(Topic) ->
|
||||
lookup_value(?SUBSCRIBER, Topic, []);
|
||||
subscribers(Shard = {shard, _Topic, _I}) ->
|
||||
lookup_value(?SUBSCRIBER, Shard, []).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Subscriber is down
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(subscriber_down(pid()) -> true).
|
||||
subscriber_down(SubPid) ->
|
||||
|
@ -345,9 +351,9 @@ subscriber_down(SubPid) ->
|
|||
end, lookup_value(?SUBSCRIPTION, SubPid, [])),
|
||||
ets:delete(?SUBSCRIPTION, SubPid).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Management APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(subscriptions(pid() | emqx_types:subid())
|
||||
-> [{emqx_topic:topic(), emqx_types:subopts()}]).
|
||||
|
@ -410,9 +416,11 @@ safe_update_stats(Tab, Stat, MaxStat) ->
|
|||
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% call, cast, pick
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-compile({inline, [call/2, cast/2, pick/1]}).
|
||||
|
||||
call(Broker, Req) ->
|
||||
gen_server:call(Broker, Req).
|
||||
|
@ -424,9 +432,9 @@ cast(Broker, Msg) ->
|
|||
pick(Topic) ->
|
||||
gproc_pool:pick_worker(broker_pool, Topic).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([Pool, Id]) ->
|
||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||
|
@ -490,7 +498,7 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
|
|||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -155,7 +155,8 @@ caps(#channel{clientinfo = #{zone := Zone}}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()).
|
||||
init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
|
||||
init(ConnInfo = #{peername := {PeerHost, _Port},
|
||||
sockname := {_Host, SockPort}}, Options) ->
|
||||
Zone = proplists:get_value(zone, Options),
|
||||
Peercert = maps:get(peercert, ConnInfo, undefined),
|
||||
Username = case peer_cert_as_username(Options) of
|
||||
|
@ -169,6 +170,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
|
|||
ClientInfo = #{zone => Zone,
|
||||
protocol => Protocol,
|
||||
peerhost => PeerHost,
|
||||
sockport => SockPort,
|
||||
peercert => Peercert,
|
||||
clientid => undefined,
|
||||
username => Username,
|
||||
|
@ -221,30 +223,30 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
|||
handle_out(disconnect, ReasonCode, Channel)
|
||||
end;
|
||||
|
||||
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
||||
Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel
|
||||
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||
case emqx_session:puback(PacketId, Session) of
|
||||
{ok, Msg, NSession} ->
|
||||
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||
ok = after_message_acked(ClientInfo, Msg),
|
||||
{ok, Channel#channel{session = NSession}};
|
||||
{ok, Msg, Publishes, NSession} ->
|
||||
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||
ok = after_message_acked(ClientInfo, Msg),
|
||||
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.puback.inuse'),
|
||||
{ok, Channel};
|
||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
|
||||
?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.puback.missed'),
|
||||
{ok, Channel}
|
||||
end;
|
||||
|
||||
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
|
||||
Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel
|
||||
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||
case emqx_session:pubrec(PacketId, Session) of
|
||||
{ok, Msg, NSession} ->
|
||||
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||
ok = after_message_acked(ClientInfo, Msg),
|
||||
NChannel = Channel#channel{session = NSession},
|
||||
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||
|
@ -263,8 +265,8 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
|
|||
NChannel = Channel#channel{session = NSession},
|
||||
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||
?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.pubrel.missed'),
|
||||
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
|
||||
handle_out(pubcomp, {PacketId, RC}, Channel)
|
||||
end;
|
||||
|
||||
|
@ -344,8 +346,7 @@ handle_in(Packet, Channel) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
||||
Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo}) ->
|
||||
Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
||||
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
||||
{ok, #{session := Session, present := false}} ->
|
||||
NChannel = Channel#channel{session = Session},
|
||||
|
@ -376,17 +377,17 @@ process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) ->
|
|||
fun check_pub_caps/2
|
||||
], Packet, Channel) of
|
||||
{ok, NPacket, NChannel} ->
|
||||
Msg = packet_to_msg(NPacket, NChannel),
|
||||
Msg = packet_to_message(NPacket, NChannel),
|
||||
do_publish(PacketId, Msg, NChannel);
|
||||
{error, ReasonCode, NChannel} ->
|
||||
?LOG(warning, "Cannot publish message to ~s due to ~s",
|
||||
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||
handle_out(disconnect, ReasonCode, NChannel)
|
||||
end.
|
||||
|
||||
packet_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
|
||||
clientinfo = ClientInfo =
|
||||
#{mountpoint := MountPoint}}) ->
|
||||
packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
|
||||
clientinfo = ClientInfo =
|
||||
#{mountpoint := MountPoint}}) ->
|
||||
emqx_mountpoint:mount(
|
||||
MountPoint, emqx_packet:to_message(
|
||||
ClientInfo, #{proto_ver => ProtoVer}, Packet)).
|
||||
|
@ -412,8 +413,9 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|||
ok = emqx_metrics:inc('packets.publish.inuse'),
|
||||
handle_out(pubrec, {PacketId, RC}, Channel);
|
||||
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
||||
?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full.", [PacketId]),
|
||||
ok = emqx_metrics:inc('messages.qos2.dropped'),
|
||||
?LOG(warning, "Dropped the qos2 packet ~w "
|
||||
"due to awaiting_rel is full.", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.publish.dropped'),
|
||||
handle_out(pubrec, {PacketId, RC}, Channel)
|
||||
end.
|
||||
|
||||
|
@ -421,6 +423,11 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|||
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
||||
puback_reason_code([_|_]) -> ?RC_SUCCESS.
|
||||
|
||||
-compile({inline, [after_message_acked/2]}).
|
||||
after_message_acked(ClientInfo, Msg) ->
|
||||
ok = emqx_metrics:inc('messages.acked'),
|
||||
emqx_hooks:run('message.acked', [ClientInfo, Msg]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process Subscribe
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -567,17 +574,8 @@ handle_out(connack, {ReasonCode, _ConnPkt},
|
|||
handle_out(publish, [], Channel) ->
|
||||
{ok, Channel};
|
||||
|
||||
handle_out(publish, [{pubrel, PacketId}], Channel) ->
|
||||
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, Channel);
|
||||
|
||||
handle_out(publish, [Pub = {_PacketId, Msg}], Channel) ->
|
||||
case ignore_local(Msg, Channel) of
|
||||
true -> {ok, Channel};
|
||||
false -> {ok, pub_to_packet(Pub, Channel), Channel}
|
||||
end;
|
||||
|
||||
handle_out(publish, Publishes, Channel) ->
|
||||
Packets = handle_publish(Publishes, Channel),
|
||||
Packets = do_deliver(Publishes, Channel),
|
||||
{ok, {outgoing, Packets}, Channel};
|
||||
|
||||
handle_out(puback, {PacketId, ReasonCode}, Channel) ->
|
||||
|
@ -639,45 +637,50 @@ return_connack(AckPacket, Channel = #channel{conninfo = ConnInfo,
|
|||
resuming = false,
|
||||
pendings = []
|
||||
},
|
||||
Packets = handle_publish(Publishes, NChannel),
|
||||
Packets = do_deliver(Publishes, NChannel),
|
||||
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
||||
{ok, Replies ++ Outgoing, NChannel}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle out Publish
|
||||
%% Deliver publish: broker -> client
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-compile({inline, [handle_publish/2]}).
|
||||
handle_publish(Publishes, Channel) ->
|
||||
handle_publish(Publishes, [], Channel).
|
||||
%% return list(emqx_types:packet())
|
||||
do_deliver({pubrel, PacketId}, _Channel) ->
|
||||
[?PUBREL_PACKET(PacketId, ?RC_SUCCESS)];
|
||||
|
||||
%% Handle out publish
|
||||
handle_publish([], Acc, _Channel) ->
|
||||
lists:reverse(Acc);
|
||||
|
||||
handle_publish([{pubrel, PacketId}|More], Acc, Channel) ->
|
||||
Packet = ?PUBREL_PACKET(PacketId, ?RC_SUCCESS),
|
||||
handle_publish(More, [Packet|Acc], Channel);
|
||||
|
||||
handle_publish([Pub = {_PacketId, Msg}|More], Acc, Channel) ->
|
||||
case ignore_local(Msg, Channel) of
|
||||
true -> handle_publish(More, Acc, Channel);
|
||||
do_deliver({PacketId, Msg}, #channel{clientinfo = ClientInfo =
|
||||
#{mountpoint := MountPoint}}) ->
|
||||
case ignore_local(Msg, ClientInfo) of
|
||||
true ->
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
ok = emqx_metrics:inc('delivery.dropped.no_local'),
|
||||
[];
|
||||
false ->
|
||||
Packet = pub_to_packet(Pub, Channel),
|
||||
handle_publish(More, [Packet|Acc], Channel)
|
||||
end.
|
||||
ok = emqx_metrics:inc('messages.delivered'),
|
||||
Msg1 = emqx_hooks:run_fold('message.delivered',
|
||||
[ClientInfo],
|
||||
emqx_message:update_expiry(Msg)
|
||||
),
|
||||
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
|
||||
[emqx_message:to_packet(PacketId, Msg2)]
|
||||
end;
|
||||
|
||||
pub_to_packet({PacketId, Msg}, #channel{clientinfo = ClientInfo}) ->
|
||||
Msg1 = emqx_hooks:run_fold('message.delivered', [ClientInfo],
|
||||
emqx_message:update_expiry(Msg)),
|
||||
Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, ClientInfo), Msg1),
|
||||
emqx_message:to_packet(PacketId, Msg2).
|
||||
do_deliver([Publish], Channel) ->
|
||||
do_deliver(Publish, Channel);
|
||||
|
||||
do_deliver(Publishes, Channel) when is_list(Publishes) ->
|
||||
lists:reverse(
|
||||
lists:foldl(
|
||||
fun(Publish, Acc) ->
|
||||
lists:append(do_deliver(Publish, Channel), Acc)
|
||||
end, [], Publishes)).
|
||||
|
||||
ignore_local(#message{flags = #{nl := true}, from = ClientId},
|
||||
#channel{clientinfo = #{clientid := ClientId}}) ->
|
||||
#{clientid := ClientId}) ->
|
||||
true;
|
||||
ignore_local(_Msg, _Channel) -> false.
|
||||
ignore_local(_Msg, _ClientInfo) -> false.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle out suback
|
||||
|
@ -1008,6 +1011,8 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId,
|
|||
#channel{clientinfo = ClientInfo} = Channel) ->
|
||||
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
|
||||
{ok, AuthResult} ->
|
||||
is_anonymous(AuthResult) andalso
|
||||
emqx_metrics:inc('client.auth.anonymous'),
|
||||
NClientInfo = maps:merge(ClientInfo, AuthResult),
|
||||
{ok, Channel#channel{clientinfo = NClientInfo}};
|
||||
{error, Reason} ->
|
||||
|
@ -1016,6 +1021,9 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId,
|
|||
{error, emqx_reason_codes:connack_error(Reason)}
|
||||
end.
|
||||
|
||||
is_anonymous(#{anonymous := true}) -> true;
|
||||
is_anonymous(_AuthResult) -> false.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process Topic Alias
|
||||
|
||||
|
|
|
@ -136,6 +136,8 @@ info(sockstate, #state{sockstate = SockSt}) ->
|
|||
SockSt;
|
||||
info(active_n, #state{active_n = ActiveN}) ->
|
||||
ActiveN;
|
||||
info(stats_timer, #state{stats_timer = Stats_timer}) ->
|
||||
Stats_timer;
|
||||
info(limiter, #state{limiter = Limiter}) ->
|
||||
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
||||
|
||||
|
@ -558,6 +560,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
case Serialize(Packet) of
|
||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
||||
[emqx_packet:format(Packet)]),
|
||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
<<>>;
|
||||
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
|
@ -625,8 +629,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||
false -> State;
|
||||
{IsGC, GcSt1} ->
|
||||
IsGC andalso emqx_metrics:inc('channel.gc'),
|
||||
{_IsGC, GcSt1} ->
|
||||
State#state{gc_state = GcSt1}
|
||||
end.
|
||||
|
||||
|
|
|
@ -38,7 +38,8 @@
|
|||
]).
|
||||
|
||||
%% Flags
|
||||
-export([ clean_dup/1
|
||||
-export([ is_sys/1
|
||||
, clean_dup/1
|
||||
, get_flag/2
|
||||
, get_flag/3
|
||||
, get_flags/1
|
||||
|
@ -112,6 +113,13 @@ payload(#message{payload = Payload}) -> Payload.
|
|||
-spec(timestamp(emqx_types:message()) -> integer()).
|
||||
timestamp(#message{timestamp = TS}) -> TS.
|
||||
|
||||
-spec(is_sys(emqx_types:message()) -> boolean()).
|
||||
is_sys(#message{flags = #{sys := true}}) ->
|
||||
true;
|
||||
is_sys(#message{topic = <<"$SYS/", _/binary>>}) ->
|
||||
true;
|
||||
is_sys(_Msg) -> false.
|
||||
|
||||
-spec(clean_dup(emqx_types:message()) -> emqx_types:message()).
|
||||
clean_dup(Msg = #message{flags = Flags = #{dup := true}}) ->
|
||||
Msg#message{flags = Flags#{dup => false}};
|
||||
|
|
|
@ -62,88 +62,118 @@
|
|||
|
||||
-export_type([metric_idx/0]).
|
||||
|
||||
-compile({inline, [inc/1, inc/2, dec/1, dec/2]}).
|
||||
-compile({inline, [inc_recv/1, inc_sent/1]}).
|
||||
|
||||
-opaque(metric_idx() :: 1..1024).
|
||||
|
||||
-type(metric_name() :: atom() | string() | binary()).
|
||||
|
||||
-define(MAX_SIZE, 1024).
|
||||
-define(RESERVED_IDX, 256).
|
||||
-define(RESERVED_IDX, 512).
|
||||
-define(TAB, ?MODULE).
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
%% Bytes sent and received of broker
|
||||
-define(BYTES_METRICS, [
|
||||
{counter, 'bytes.received'}, % Total bytes received
|
||||
{counter, 'bytes.sent'} % Total bytes sent
|
||||
]).
|
||||
%% Bytes sent and received
|
||||
-define(BYTES_METRICS,
|
||||
[{counter, 'bytes.received'}, % Total bytes received
|
||||
{counter, 'bytes.sent'} % Total bytes sent
|
||||
]).
|
||||
|
||||
%% Packets sent and received of broker
|
||||
-define(PACKET_METRICS, [
|
||||
{counter, 'packets.received'}, % All Packets received
|
||||
{counter, 'packets.sent'}, % All Packets sent
|
||||
{counter, 'packets.connect.received'}, % CONNECT Packets received
|
||||
{counter, 'packets.connack.sent'}, % CONNACK Packets sent
|
||||
{counter, 'packets.connack.error'}, % CONNACK error sent
|
||||
{counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent
|
||||
{counter, 'packets.publish.received'}, % PUBLISH packets received
|
||||
{counter, 'packets.publish.sent'}, % PUBLISH packets sent
|
||||
{counter, 'packets.publish.error'}, % PUBLISH failed for error
|
||||
{counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error
|
||||
{counter, 'packets.puback.received'}, % PUBACK packets received
|
||||
{counter, 'packets.puback.sent'}, % PUBACK packets sent
|
||||
{counter, 'packets.puback.missed'}, % PUBACK packets missed
|
||||
{counter, 'packets.pubrec.received'}, % PUBREC packets received
|
||||
{counter, 'packets.pubrec.sent'}, % PUBREC packets sent
|
||||
{counter, 'packets.pubrec.inuse'}, % PUBREC packet_id inuse
|
||||
{counter, 'packets.pubrec.missed'}, % PUBREC packets missed
|
||||
{counter, 'packets.pubrel.received'}, % PUBREL packets received
|
||||
{counter, 'packets.pubrel.sent'}, % PUBREL packets sent
|
||||
{counter, 'packets.pubrel.missed'}, % PUBREL packets missed
|
||||
{counter, 'packets.pubcomp.received'}, % PUBCOMP packets received
|
||||
{counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent
|
||||
{counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse
|
||||
{counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed
|
||||
{counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received
|
||||
{counter, 'packets.subscribe.error'}, % SUBSCRIBE error
|
||||
{counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth
|
||||
{counter, 'packets.suback.sent'}, % SUBACK packets sent
|
||||
{counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received
|
||||
{counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error
|
||||
{counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent
|
||||
{counter, 'packets.pingreq.received'}, % PINGREQ packets received
|
||||
{counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent
|
||||
{counter, 'packets.disconnect.received'}, % DISCONNECT Packets received
|
||||
{counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent
|
||||
{counter, 'packets.auth.received'}, % Auth Packets received
|
||||
{counter, 'packets.auth.sent'} % Auth Packets sent
|
||||
]).
|
||||
%% Packets sent and received
|
||||
-define(PACKET_METRICS,
|
||||
[{counter, 'packets.received'}, % All Packets received
|
||||
{counter, 'packets.sent'}, % All Packets sent
|
||||
{counter, 'packets.connect.received'}, % CONNECT Packets received
|
||||
{counter, 'packets.connack.sent'}, % CONNACK Packets sent
|
||||
{counter, 'packets.connack.error'}, % CONNACK error sent
|
||||
{counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent
|
||||
{counter, 'packets.publish.received'}, % PUBLISH packets received
|
||||
{counter, 'packets.publish.sent'}, % PUBLISH packets sent
|
||||
{counter, 'packets.publish.error'}, % PUBLISH failed for error
|
||||
{counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error
|
||||
{counter, 'packets.publish.dropped'}, % PUBLISH(QoS2) packets dropped
|
||||
{counter, 'packets.puback.received'}, % PUBACK packets received
|
||||
{counter, 'packets.puback.sent'}, % PUBACK packets sent
|
||||
{counter, 'packets.puback.inuse'}, % PUBACK packet_id inuse
|
||||
{counter, 'packets.puback.missed'}, % PUBACK packets missed
|
||||
{counter, 'packets.pubrec.received'}, % PUBREC packets received
|
||||
{counter, 'packets.pubrec.sent'}, % PUBREC packets sent
|
||||
{counter, 'packets.pubrec.inuse'}, % PUBREC packet_id inuse
|
||||
{counter, 'packets.pubrec.missed'}, % PUBREC packets missed
|
||||
{counter, 'packets.pubrel.received'}, % PUBREL packets received
|
||||
{counter, 'packets.pubrel.sent'}, % PUBREL packets sent
|
||||
{counter, 'packets.pubrel.missed'}, % PUBREL packets missed
|
||||
{counter, 'packets.pubcomp.received'}, % PUBCOMP packets received
|
||||
{counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent
|
||||
{counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse
|
||||
{counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed
|
||||
{counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received
|
||||
{counter, 'packets.subscribe.error'}, % SUBSCRIBE error
|
||||
{counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth
|
||||
{counter, 'packets.suback.sent'}, % SUBACK packets sent
|
||||
{counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received
|
||||
{counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error
|
||||
{counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent
|
||||
{counter, 'packets.pingreq.received'}, % PINGREQ packets received
|
||||
{counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent
|
||||
{counter, 'packets.disconnect.received'}, % DISCONNECT Packets received
|
||||
{counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent
|
||||
{counter, 'packets.auth.received'}, % Auth Packets received
|
||||
{counter, 'packets.auth.sent'} % Auth Packets sent
|
||||
]).
|
||||
|
||||
%% Messages sent and received of broker
|
||||
-define(MESSAGE_METRICS, [
|
||||
{counter, 'messages.received'}, % All Messages received
|
||||
{counter, 'messages.sent'}, % All Messages sent
|
||||
{counter, 'messages.qos0.received'}, % QoS0 Messages received
|
||||
{counter, 'messages.qos0.sent'}, % QoS0 Messages sent
|
||||
{counter, 'messages.qos1.received'}, % QoS1 Messages received
|
||||
{counter, 'messages.qos1.sent'}, % QoS1 Messages sent
|
||||
{counter, 'messages.qos2.received'}, % QoS2 Messages received
|
||||
{counter, 'messages.qos2.expired'}, % QoS2 Messages expired
|
||||
{counter, 'messages.qos2.sent'}, % QoS2 Messages sent
|
||||
{counter, 'messages.qos2.dropped'}, % QoS2 Messages dropped
|
||||
{gauge, 'messages.retained'}, % Messagea retained
|
||||
{counter, 'messages.dropped'}, % Messages dropped
|
||||
{counter, 'messages.expired'}, % Messages expired
|
||||
{counter, 'messages.forward'} % Messages forward
|
||||
]).
|
||||
%% Messages sent/received and pubsub
|
||||
-define(MESSAGE_METRICS,
|
||||
[{counter, 'messages.received'}, % All Messages received
|
||||
{counter, 'messages.sent'}, % All Messages sent
|
||||
{counter, 'messages.qos0.received'}, % QoS0 Messages received
|
||||
{counter, 'messages.qos0.sent'}, % QoS0 Messages sent
|
||||
{counter, 'messages.qos1.received'}, % QoS1 Messages received
|
||||
{counter, 'messages.qos1.sent'}, % QoS1 Messages sent
|
||||
{counter, 'messages.qos2.received'}, % QoS2 Messages received
|
||||
{counter, 'messages.qos2.sent'}, % QoS2 Messages sent
|
||||
%% PubSub Metrics
|
||||
{counter, 'messages.publish'}, % Messages Publish
|
||||
{counter, 'messages.dropped'}, % Messages dropped due to no subscribers
|
||||
{counter, 'messages.dropped.expired'}, % QoS2 Messages expired
|
||||
{counter, 'messages.dropped.no_subscribers'}, % Messages dropped
|
||||
{counter, 'messages.forward'}, % Messages forward
|
||||
{gauge, 'messages.retained'}, % Messages retained
|
||||
{gauge, 'messages.delayed'}, % Messages delayed
|
||||
{counter, 'messages.delivered'}, % Messages delivered
|
||||
{counter, 'messages.acked'} % Messages acked
|
||||
]).
|
||||
|
||||
-define(CHAN_METRICS, [
|
||||
{counter, 'channel.gc'}
|
||||
]).
|
||||
%% Delivery metrics
|
||||
-define(DELIVERY_METRICS,
|
||||
[{counter, 'delivery.dropped'},
|
||||
{counter, 'delivery.dropped.no_local'},
|
||||
{counter, 'delivery.dropped.too_large'},
|
||||
{counter, 'delivery.dropped.qos0_msg'},
|
||||
{counter, 'delivery.dropped.queue_full'},
|
||||
{counter, 'delivery.dropped.expired'}
|
||||
]).
|
||||
|
||||
-define(MQTT_METRICS, [
|
||||
{counter, 'auth.mqtt.anonymous'}
|
||||
]).
|
||||
%% Client Lifecircle metrics
|
||||
-define(CLIENT_METRICS,
|
||||
[{counter, 'client.connected'},
|
||||
{counter, 'client.authenticate'},
|
||||
{counter, 'client.auth.anonymous'},
|
||||
{counter, 'client.check_acl'},
|
||||
{counter, 'client.subscribe'},
|
||||
{counter, 'client.unsubscribe'},
|
||||
{counter, 'client.disconnected'}
|
||||
]).
|
||||
|
||||
%% Session Lifecircle metrics
|
||||
-define(SESSION_METRICS,
|
||||
[{counter, 'session.created'},
|
||||
{counter, 'session.resumed'},
|
||||
{counter, 'session.takeovered'},
|
||||
{counter, 'session.discarded'},
|
||||
{counter, 'session.terminated'}
|
||||
]).
|
||||
|
||||
-record(state, {next_idx = 1}).
|
||||
|
||||
|
@ -155,8 +185,7 @@ start_link() ->
|
|||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
-spec(stop() -> ok).
|
||||
stop() ->
|
||||
gen_server:stop(?SERVER).
|
||||
stop() -> gen_server:stop(?SERVER).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Metrics API
|
||||
|
@ -265,7 +294,7 @@ update_counter(Name, Value) ->
|
|||
counters:add(CRef, CIdx, Value).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Inc Received/Sent metrics
|
||||
%% Inc received/sent metrics
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Inc packets received.
|
||||
|
@ -276,13 +305,13 @@ inc_recv(Packet) ->
|
|||
|
||||
do_inc_recv(?PACKET(?CONNECT)) ->
|
||||
inc('packets.connect.received');
|
||||
do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) ->
|
||||
do_inc_recv(?PUBLISH_PACKET(QoS)) ->
|
||||
inc('messages.received'),
|
||||
case QoS of
|
||||
?QOS_0 -> inc('messages.qos0.received');
|
||||
?QOS_1 -> inc('messages.qos1.received');
|
||||
?QOS_2 -> inc('messages.qos2.received');
|
||||
_ -> ok
|
||||
_other -> ok
|
||||
end,
|
||||
inc('packets.publish.received');
|
||||
do_inc_recv(?PACKET(?PUBACK)) ->
|
||||
|
@ -319,12 +348,13 @@ do_inc_sent(?CONNACK_PACKET(ReasonCode)) ->
|
|||
(ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'),
|
||||
inc('packets.connack.sent');
|
||||
|
||||
do_inc_sent(?PUBLISH_PACKET(QoS, _PacketId)) ->
|
||||
do_inc_sent(?PUBLISH_PACKET(QoS)) ->
|
||||
inc('messages.sent'),
|
||||
case QoS of
|
||||
?QOS_0 -> inc('messages.qos0.sent');
|
||||
?QOS_1 -> inc('messages.qos1.sent');
|
||||
?QOS_2 -> inc('messages.qos2.sent')
|
||||
?QOS_2 -> inc('messages.qos2.sent');
|
||||
_other -> ok
|
||||
end,
|
||||
inc('packets.publish.sent');
|
||||
do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) ->
|
||||
|
@ -360,14 +390,21 @@ init([]) ->
|
|||
CRef = counters:new(?MAX_SIZE, [write_concurrency]),
|
||||
ok = persistent_term:put(?MODULE, CRef),
|
||||
% Create index mapping table
|
||||
ok = emqx_tables:new(?TAB, [protected, {keypos, 2}, {read_concurrency, true}]),
|
||||
ok = emqx_tables:new(?TAB, [{keypos, 2}, {read_concurrency, true}]),
|
||||
Metrics = lists:append([?BYTES_METRICS,
|
||||
?PACKET_METRICS,
|
||||
?MESSAGE_METRICS,
|
||||
?DELIVERY_METRICS,
|
||||
?CLIENT_METRICS,
|
||||
?SESSION_METRICS
|
||||
]),
|
||||
% Store reserved indices
|
||||
lists:foreach(fun({Type, Name}) ->
|
||||
Idx = reserved_idx(Name),
|
||||
Metric = #metric{name = Name, type = Type, idx = Idx},
|
||||
true = ets:insert(?TAB, Metric),
|
||||
ok = counters:put(CRef, Idx, 0)
|
||||
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS),
|
||||
ok = lists:foreach(fun({Type, Name}) ->
|
||||
Idx = reserved_idx(Name),
|
||||
Metric = #metric{name = Name, type = Type, idx = Idx},
|
||||
true = ets:insert(?TAB, Metric),
|
||||
ok = counters:put(CRef, Idx, 0)
|
||||
end, Metrics),
|
||||
{ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
|
||||
|
||||
handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
|
||||
|
@ -409,58 +446,85 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
reserved_idx('bytes.received') -> 01;
|
||||
reserved_idx('bytes.sent') -> 02;
|
||||
reserved_idx('packets.received') -> 03;
|
||||
reserved_idx('packets.sent') -> 04;
|
||||
reserved_idx('packets.connect.received') -> 05;
|
||||
reserved_idx('packets.connack.sent') -> 06;
|
||||
reserved_idx('packets.connack.error') -> 07;
|
||||
reserved_idx('packets.connack.auth_error') -> 08;
|
||||
reserved_idx('packets.publish.received') -> 09;
|
||||
reserved_idx('packets.publish.sent') -> 10;
|
||||
reserved_idx('packets.publish.error') -> 11;
|
||||
reserved_idx('packets.publish.auth_error') -> 12;
|
||||
reserved_idx('packets.puback.received') -> 13;
|
||||
reserved_idx('packets.puback.sent') -> 14;
|
||||
reserved_idx('packets.puback.missed') -> 15;
|
||||
reserved_idx('packets.pubrec.received') -> 16;
|
||||
reserved_idx('packets.pubrec.sent') -> 17;
|
||||
reserved_idx('packets.pubrec.missed') -> 18;
|
||||
reserved_idx('packets.pubrel.received') -> 19;
|
||||
reserved_idx('packets.pubrel.sent') -> 20;
|
||||
reserved_idx('packets.pubrel.missed') -> 21;
|
||||
reserved_idx('packets.pubcomp.received') -> 22;
|
||||
reserved_idx('packets.pubcomp.sent') -> 23;
|
||||
reserved_idx('packets.pubcomp.missed') -> 24;
|
||||
reserved_idx('packets.subscribe.received') -> 25;
|
||||
reserved_idx('packets.subscribe.error') -> 26;
|
||||
reserved_idx('packets.subscribe.auth_error') -> 27;
|
||||
reserved_idx('packets.suback.sent') -> 28;
|
||||
reserved_idx('packets.unsubscribe.received') -> 29;
|
||||
reserved_idx('packets.unsubscribe.error') -> 30;
|
||||
reserved_idx('packets.unsuback.sent') -> 31;
|
||||
reserved_idx('packets.pingreq.received') -> 32;
|
||||
reserved_idx('packets.pingresp.sent') -> 33;
|
||||
reserved_idx('packets.disconnect.received') -> 34;
|
||||
reserved_idx('packets.disconnect.sent') -> 35;
|
||||
reserved_idx('packets.auth.received') -> 36;
|
||||
reserved_idx('packets.auth.sent') -> 37;
|
||||
reserved_idx('messages.received') -> 38;
|
||||
reserved_idx('messages.sent') -> 39;
|
||||
reserved_idx('messages.qos0.received') -> 40;
|
||||
reserved_idx('messages.qos0.sent') -> 41;
|
||||
reserved_idx('messages.qos1.received') -> 42;
|
||||
reserved_idx('messages.qos1.sent') -> 43;
|
||||
reserved_idx('messages.qos2.received') -> 44;
|
||||
reserved_idx('messages.qos2.expired') -> 45;
|
||||
reserved_idx('messages.qos2.sent') -> 46;
|
||||
reserved_idx('messages.qos2.dropped') -> 47;
|
||||
reserved_idx('messages.retained') -> 48;
|
||||
reserved_idx('messages.dropped') -> 49;
|
||||
reserved_idx('messages.expired') -> 50;
|
||||
reserved_idx('messages.forward') -> 51;
|
||||
reserved_idx('auth.mqtt.anonymous') -> 52;
|
||||
reserved_idx('channel.gc') -> 53;
|
||||
reserved_idx('packets.pubrec.inuse') -> 54;
|
||||
reserved_idx('packets.pubcomp.inuse') -> 55;
|
||||
%% Reserved indices of packet's metrics
|
||||
reserved_idx('packets.received') -> 10;
|
||||
reserved_idx('packets.sent') -> 11;
|
||||
reserved_idx('packets.connect.received') -> 12;
|
||||
reserved_idx('packets.connack.sent') -> 13;
|
||||
reserved_idx('packets.connack.error') -> 14;
|
||||
reserved_idx('packets.connack.auth_error') -> 15;
|
||||
reserved_idx('packets.publish.received') -> 16;
|
||||
reserved_idx('packets.publish.sent') -> 17;
|
||||
reserved_idx('packets.publish.error') -> 18;
|
||||
reserved_idx('packets.publish.auth_error') -> 19;
|
||||
reserved_idx('packets.puback.received') -> 20;
|
||||
reserved_idx('packets.puback.sent') -> 21;
|
||||
reserved_idx('packets.puback.inuse') -> 22;
|
||||
reserved_idx('packets.puback.missed') -> 23;
|
||||
reserved_idx('packets.pubrec.received') -> 24;
|
||||
reserved_idx('packets.pubrec.sent') -> 25;
|
||||
reserved_idx('packets.pubrec.inuse') -> 26;
|
||||
reserved_idx('packets.pubrec.missed') -> 27;
|
||||
reserved_idx('packets.pubrel.received') -> 28;
|
||||
reserved_idx('packets.pubrel.sent') -> 29;
|
||||
reserved_idx('packets.pubrel.missed') -> 30;
|
||||
reserved_idx('packets.pubcomp.received') -> 31;
|
||||
reserved_idx('packets.pubcomp.sent') -> 32;
|
||||
reserved_idx('packets.pubcomp.inuse') -> 33;
|
||||
reserved_idx('packets.pubcomp.missed') -> 34;
|
||||
reserved_idx('packets.subscribe.received') -> 35;
|
||||
reserved_idx('packets.subscribe.error') -> 36;
|
||||
reserved_idx('packets.subscribe.auth_error') -> 37;
|
||||
reserved_idx('packets.suback.sent') -> 38;
|
||||
reserved_idx('packets.unsubscribe.received') -> 39;
|
||||
reserved_idx('packets.unsubscribe.error') -> 40;
|
||||
reserved_idx('packets.unsuback.sent') -> 41;
|
||||
reserved_idx('packets.pingreq.received') -> 42;
|
||||
reserved_idx('packets.pingresp.sent') -> 43;
|
||||
reserved_idx('packets.disconnect.received') -> 44;
|
||||
reserved_idx('packets.disconnect.sent') -> 45;
|
||||
reserved_idx('packets.auth.received') -> 46;
|
||||
reserved_idx('packets.auth.sent') -> 47;
|
||||
reserved_idx('packets.publish.dropped') -> 48;
|
||||
%% Reserved indices of message's metrics
|
||||
reserved_idx('messages.received') -> 100;
|
||||
reserved_idx('messages.sent') -> 101;
|
||||
reserved_idx('messages.qos0.received') -> 102;
|
||||
reserved_idx('messages.qos0.sent') -> 103;
|
||||
reserved_idx('messages.qos1.received') -> 104;
|
||||
reserved_idx('messages.qos1.sent') -> 105;
|
||||
reserved_idx('messages.qos2.received') -> 106;
|
||||
reserved_idx('messages.qos2.sent') -> 107;
|
||||
reserved_idx('messages.publish') -> 108;
|
||||
reserved_idx('messages.dropped') -> 109;
|
||||
reserved_idx('messages.dropped.expired') -> 110;
|
||||
reserved_idx('messages.dropped.no_subscribers') -> 111;
|
||||
reserved_idx('messages.forward') -> 112;
|
||||
reserved_idx('messages.retained') -> 113;
|
||||
reserved_idx('messages.delayed') -> 114;
|
||||
reserved_idx('messages.delivered') -> 115;
|
||||
reserved_idx('messages.acked') -> 116;
|
||||
reserved_idx('delivery.expired') -> 117;
|
||||
reserved_idx('delivery.dropped') -> 118;
|
||||
reserved_idx('delivery.dropped.no_local') -> 119;
|
||||
reserved_idx('delivery.dropped.too_large') -> 120;
|
||||
reserved_idx('delivery.dropped.qos0_msg') -> 121;
|
||||
reserved_idx('delivery.dropped.queue_full') -> 122;
|
||||
reserved_idx('delivery.dropped.expired') -> 123;
|
||||
|
||||
reserved_idx('client.connected') -> 200;
|
||||
reserved_idx('client.authenticate') -> 201;
|
||||
reserved_idx('client.auth.anonymous') -> 202;
|
||||
reserved_idx('client.check_acl') -> 203;
|
||||
reserved_idx('client.subscribe') -> 204;
|
||||
reserved_idx('client.unsubscribe') -> 205;
|
||||
reserved_idx('client.disconnected') -> 206;
|
||||
|
||||
reserved_idx('session.created') -> 220;
|
||||
reserved_idx('session.resumed') -> 221;
|
||||
reserved_idx('session.takeovered') -> 222;
|
||||
reserved_idx('session.discarded') -> 223;
|
||||
reserved_idx('session.terminated') -> 224;
|
||||
|
||||
reserved_idx(_) -> undefined.
|
||||
|
||||
|
|
|
@ -118,41 +118,21 @@ do_check_sub(_Flags, _Caps) -> ok.
|
|||
|
||||
-spec(get_caps(emqx_zone:zone()) -> caps()).
|
||||
get_caps(Zone) ->
|
||||
with_env(Zone, '$mqtt_caps', fun all_caps/1).
|
||||
maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS).
|
||||
|
||||
-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
|
||||
get_caps(Zone, publish) ->
|
||||
with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1);
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone));
|
||||
get_caps(Zone, subscribe) ->
|
||||
with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1).
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
-spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()).
|
||||
get_caps(Zone, Cap, Def) ->
|
||||
emqx_zone:get_env(Zone, Cap, Def).
|
||||
|
||||
pub_caps(Zone) ->
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
sub_caps(Zone) ->
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
all_caps(Zone) ->
|
||||
maps:map(fun(Cap, Def) ->
|
||||
emqx_zone:get_env(Zone, Cap, Def)
|
||||
end, ?DEFAULT_CAPS).
|
||||
|
||||
filter_caps(Keys, Caps) ->
|
||||
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
|
||||
|
||||
with_env(Zone, Key, InitFun) ->
|
||||
case emqx_zone:get_env(Zone, Key) of
|
||||
undefined ->
|
||||
Caps = InitFun(Zone),
|
||||
ok = emqx_zone:set_env(Zone, Key, Caps),
|
||||
Caps;
|
||||
Caps -> Caps
|
||||
end.
|
||||
|
||||
-spec(default() -> caps()).
|
||||
default() -> ?DEFAULT_CAPS.
|
||||
|
||||
|
|
|
@ -53,7 +53,10 @@
|
|||
-include("types.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-export([init/1]).
|
||||
-export([ init/1
|
||||
, info/1
|
||||
, info/2
|
||||
]).
|
||||
|
||||
-export([ is_empty/1
|
||||
, len/1
|
||||
|
@ -86,6 +89,7 @@
|
|||
-define(LOWEST_PRIORITY, 0).
|
||||
-define(HIGHEST_PRIORITY, infinity).
|
||||
-define(MAX_LEN_INFINITY, 0).
|
||||
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
|
||||
|
||||
-record(mqueue, {
|
||||
store_qos0 = false :: boolean(),
|
||||
|
@ -111,6 +115,20 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
|||
default_p = get_priority_opt(Opts)
|
||||
}.
|
||||
|
||||
-spec(info(mqueue()) -> emqx_types:infos()).
|
||||
info(MQ) ->
|
||||
maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
|
||||
|
||||
-spec(info(atom(), mqueue()) -> term()).
|
||||
info(store_qos0, #mqueue{store_qos0 = True}) ->
|
||||
True;
|
||||
info(max_len, #mqueue{max_len = MaxLen}) ->
|
||||
MaxLen;
|
||||
info(len, #mqueue{len = Len}) ->
|
||||
Len;
|
||||
info(dropped, #mqueue{dropped = Dropped}) ->
|
||||
Dropped.
|
||||
|
||||
is_empty(#mqueue{len = Len}) -> Len =:= 0.
|
||||
|
||||
len(#mqueue{len = Len}) -> Len.
|
||||
|
|
|
@ -213,7 +213,7 @@ start_app(App, SuccFun) ->
|
|||
?LOG(info, "Started plugins: ~p", [Started]),
|
||||
?LOG(info, "Load plugin ~s successfully", [App]),
|
||||
SuccFun(App),
|
||||
{ok, Started};
|
||||
ok;
|
||||
{error, {ErrApp, Reason}} ->
|
||||
?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]),
|
||||
{error, {ErrApp, Reason}}
|
||||
|
|
|
@ -398,18 +398,17 @@ dequeue(0, Msgs, Q) ->
|
|||
dequeue(Cnt, Msgs, Q) ->
|
||||
case emqx_mqueue:out(Q) of
|
||||
{empty, _Q} -> dequeue(0, Msgs, Q);
|
||||
{{value, Msg = #message{qos = ?QOS_0}}, Q1} ->
|
||||
dequeue(Cnt, acc_msg(Msg, Msgs), Q1);
|
||||
{{value, Msg}, Q1} ->
|
||||
dequeue(Cnt-1, acc_msg(Msg, Msgs), Q1)
|
||||
case emqx_message:is_expired(Msg) of
|
||||
true -> ok = inc_expired_cnt(delivery),
|
||||
dequeue(Cnt, Msgs, Q1);
|
||||
false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
|
||||
end
|
||||
end.
|
||||
|
||||
-compile({inline, [acc_msg/2]}).
|
||||
acc_msg(Msg, Msgs) ->
|
||||
case emqx_message:is_expired(Msg) of
|
||||
true -> Msgs;
|
||||
false -> [Msg|Msgs]
|
||||
end.
|
||||
-compile({inline, [acc_cnt/2]}).
|
||||
acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
|
||||
acc_cnt(_Msg, Cnt) -> Cnt - 1.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Broker -> Client: Deliver
|
||||
|
@ -467,13 +466,20 @@ enqueue(Delivers, Session) when is_list(Delivers) ->
|
|||
|
||||
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
|
||||
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
||||
if is_record(Dropped, message) ->
|
||||
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
|
||||
[emqx_message:format(Dropped)]);
|
||||
true -> ok
|
||||
end,
|
||||
(Dropped =/= undefined) andalso log_dropped(Dropped, Session),
|
||||
Session#session{mqueue = NewQ}.
|
||||
|
||||
log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
|
||||
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
|
||||
true ->
|
||||
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
||||
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
|
||||
false ->
|
||||
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
||||
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
|
||||
[emqx_message:format(Msg)])
|
||||
end.
|
||||
|
||||
enrich_fun(Session = #session{subscriptions = Subs}) ->
|
||||
fun({deliver, Topic, Msg}) ->
|
||||
enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
|
||||
|
@ -555,6 +561,7 @@ retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
|
|||
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
|
||||
case emqx_message:is_expired(Msg) of
|
||||
true ->
|
||||
ok = inc_expired_cnt(delivery),
|
||||
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
||||
false ->
|
||||
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
||||
|
@ -581,6 +588,8 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
|
|||
await_rel_timeout = Timeout}) ->
|
||||
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
|
||||
AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
|
||||
ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
|
||||
(ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt),
|
||||
NSession = Session#session{awaiting_rel = AwaitingRel1},
|
||||
case maps:size(AwaitingRel1) of
|
||||
0 -> {ok, NSession};
|
||||
|
@ -617,10 +626,28 @@ replay(Inflight) ->
|
|||
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
||||
end, emqx_inflight:to_list(Inflight)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Inc message/delivery expired counter
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}).
|
||||
|
||||
inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
|
||||
|
||||
inc_expired_cnt(delivery, N) ->
|
||||
ok = emqx_metrics:inc('delivery.dropped', N),
|
||||
emqx_metrics:inc('delivery.dropped.expired', N);
|
||||
|
||||
inc_expired_cnt(message, N) ->
|
||||
ok = emqx_metrics:inc('messages.dropped', N),
|
||||
emqx_metrics:inc('messages.dropped.expired', N).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Next Packet Id
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-compile({inline, [next_pkt_id/1]}).
|
||||
|
||||
next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) ->
|
||||
Session#session{next_pkt_id = 1};
|
||||
|
||||
|
|
|
@ -1,51 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_time).
|
||||
|
||||
-export([ seed/0
|
||||
, now_secs/0
|
||||
, now_secs/1
|
||||
, now_ms/0
|
||||
, now_ms/1
|
||||
]).
|
||||
|
||||
-compile({inline,
|
||||
[ seed/0
|
||||
, now_secs/0
|
||||
, now_secs/1
|
||||
, now_ms/0
|
||||
, now_ms/1
|
||||
]}).
|
||||
|
||||
seed() ->
|
||||
rand:seed(exsplus, erlang:timestamp()).
|
||||
|
||||
-spec(now_secs() -> pos_integer()).
|
||||
now_secs() ->
|
||||
erlang:system_time(second).
|
||||
|
||||
-spec(now_secs(erlang:timestamp()) -> pos_integer()).
|
||||
now_secs({MegaSecs, Secs, _MicroSecs}) ->
|
||||
MegaSecs * 1000000 + Secs.
|
||||
|
||||
-spec(now_ms() -> pos_integer()).
|
||||
now_ms() ->
|
||||
erlang:system_time(millisecond).
|
||||
|
||||
-spec(now_ms(erlang:timestamp()) -> pos_integer()).
|
||||
now_ms({MegaSecs, Secs, MicroSecs}) ->
|
||||
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
|
|
@ -124,6 +124,7 @@
|
|||
-type(clientinfo() :: #{zone := zone(),
|
||||
protocol := protocol(),
|
||||
peerhost := peerhost(),
|
||||
sockport := non_neg_integer(),
|
||||
clientid := clientid(),
|
||||
username := username(),
|
||||
peercert := esockd_peercert:peercert(),
|
||||
|
|
|
@ -427,8 +427,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|||
|
||||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||
{IsGC, GcSt1} ->
|
||||
IsGC andalso emqx_metrics:inc('channel.gc'),
|
||||
{_IsGC, GcSt1} ->
|
||||
State#state{gc_state = GcSt1};
|
||||
false -> State
|
||||
end.
|
||||
|
@ -521,6 +520,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
case Serialize(Packet) of
|
||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
|
||||
[emqx_packet:format(Packet)]),
|
||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
<<>>;
|
||||
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
|
|
|
@ -53,13 +53,13 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok = meck:unload(emqx_access_control),
|
||||
ok = meck:unload(emqx_metrics),
|
||||
ok = meck:unload(emqx_session),
|
||||
ok = meck:unload(emqx_broker),
|
||||
ok = meck:unload(emqx_hooks),
|
||||
ok = meck:unload(emqx_cm),
|
||||
ok.
|
||||
meck:unload([emqx_access_control,
|
||||
emqx_metrics,
|
||||
emqx_session,
|
||||
emqx_broker,
|
||||
emqx_hooks,
|
||||
emqx_cm
|
||||
]).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
@ -334,15 +334,15 @@ t_handle_out_publish(_) ->
|
|||
|
||||
t_handle_out_publish_1(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
|
||||
{ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} =
|
||||
emqx_channel:handle_out(publish, [{1, Msg}], channel()).
|
||||
{ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan}
|
||||
= emqx_channel:handle_out(publish, [{1, Msg}], channel()).
|
||||
|
||||
t_handle_out_publish_nl(_) ->
|
||||
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
|
||||
Channel = channel(#{clientinfo => ClientInfo}),
|
||||
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
|
||||
Pubs = [{1, emqx_message:set_flag(nl, Msg)}],
|
||||
{ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
|
||||
{ok, {outgoing,[]}, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
|
||||
|
||||
t_handle_out_connack_sucess(_) ->
|
||||
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
|
||||
|
|
|
@ -35,6 +35,10 @@ init_per_suite(Config) ->
|
|||
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
|
||||
%% Meck Cm
|
||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||
%% Meck Limiter
|
||||
ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
|
||||
%% Meck Pd
|
||||
ok = meck:new(emqx_pd, [passthrough, no_history, no_link]),
|
||||
%% Meck Metrics
|
||||
ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
|
||||
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
||||
|
@ -46,6 +50,8 @@ end_per_suite(_Config) ->
|
|||
ok = meck:unload(emqx_transport),
|
||||
ok = meck:unload(emqx_channel),
|
||||
ok = meck:unload(emqx_cm),
|
||||
ok = meck:unload(emqx_limiter),
|
||||
ok = meck:unload(emqx_pd),
|
||||
ok = meck:unload(emqx_metrics),
|
||||
ok.
|
||||
|
||||
|
@ -72,6 +78,233 @@ end_per_testcase(_TestCase, Config) ->
|
|||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_info(_) ->
|
||||
CPid = spawn(fun() ->
|
||||
receive
|
||||
{'$gen_call', From, info} ->
|
||||
gen_server:reply(From, emqx_connection:info(st()))
|
||||
after
|
||||
0 -> error("error")
|
||||
end
|
||||
end),
|
||||
#{sockinfo := SockInfo} = emqx_connection:info(CPid),
|
||||
?assertMatch(#{active_n := 100,
|
||||
peername := {{127,0,0,1},3456},
|
||||
sockname := {{127,0,0,1},1883},
|
||||
sockstate := idle,
|
||||
socktype := tcp}, SockInfo).
|
||||
|
||||
t_info_limiter(_) ->
|
||||
St = st(#{limiter => emqx_limiter:init([])}),
|
||||
?assertEqual(undefined, emqx_connection:info(limiter, St)).
|
||||
|
||||
t_stats(_) ->
|
||||
CPid = spawn(fun() ->
|
||||
receive
|
||||
{'$gen_call', From, stats} ->
|
||||
gen_server:reply(From, emqx_connection:stats(st()))
|
||||
after
|
||||
0 -> error("error")
|
||||
end
|
||||
end),
|
||||
Stats = emqx_connection:stats(CPid),
|
||||
?assertMatch([{recv_oct,0},
|
||||
{recv_cnt,0},
|
||||
{send_oct,0},
|
||||
{send_cnt,0},
|
||||
{send_pend,0}| _] , Stats).
|
||||
|
||||
t_process_msg(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in,
|
||||
fun(_Packet, Channel) ->
|
||||
{ok, Channel}
|
||||
end),
|
||||
CPid ! {incoming, ?PACKET(?PINGREQ)},
|
||||
CPid ! {incoming, undefined},
|
||||
CPid ! {tcp_passive, sock},
|
||||
CPid ! {tcp_closed, sock},
|
||||
timer:sleep(100),
|
||||
ok = trap_exit(CPid, {shutdown, tcp_closed})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_ensure_stats_timer(_) ->
|
||||
NStats = emqx_connection:ensure_stats_timer(100, st()),
|
||||
Stats_timer = emqx_connection:info(stats_timer, NStats),
|
||||
?assert(is_reference(Stats_timer)),
|
||||
?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).
|
||||
|
||||
t_cancel_stats_timer(_) ->
|
||||
NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
|
||||
Stats_timer = emqx_connection:info(stats_timer, NStats),
|
||||
?assertEqual(undefined, Stats_timer),
|
||||
?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)).
|
||||
|
||||
t_append_msg(_) ->
|
||||
?assertEqual([msg], emqx_connection:append_msg([], [msg])),
|
||||
?assertEqual([msg], emqx_connection:append_msg([], msg)),
|
||||
?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], [msg])),
|
||||
?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], msg)).
|
||||
|
||||
t_handle_msg(_) ->
|
||||
From = {make_ref(), self()},
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({'$gen_call', From, for_testing}, st())),
|
||||
?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())),
|
||||
?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())),
|
||||
?assertMatch({ok, [], _St}, emqx_connection:handle_msg({tcp, From, <<"for_testing">>}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg(for_testing, st())).
|
||||
|
||||
t_handle_msg_incoming(_) ->
|
||||
?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())),
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, undefined}, st())).
|
||||
|
||||
t_handle_msg_outgoing(_) ->
|
||||
?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
|
||||
|
||||
t_handle_msg_tcp_error(_) ->
|
||||
?assertMatch({stop, {shutdown, econnreset}, _St}, emqx_connection:handle_msg({tcp_error, sock, econnreset}, st())).
|
||||
|
||||
t_handle_msg_tcp_closed(_) ->
|
||||
?assertMatch({stop, {shutdown, tcp_closed}, _St}, emqx_connection:handle_msg({tcp_closed, sock}, st())).
|
||||
|
||||
t_handle_msg_passive(_) ->
|
||||
?assertMatch({ok, _Event, _St}, emqx_connection:handle_msg({tcp_passive, sock}, st())).
|
||||
|
||||
t_handle_msg_deliver(_) ->
|
||||
ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({deliver, topic, msg}, st())).
|
||||
|
||||
t_handle_msg_inet_reply(_) ->
|
||||
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))),
|
||||
?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({inet_reply, for_testing, {error, for_testing}}, st())).
|
||||
|
||||
t_handle_msg_connack(_) ->
|
||||
?assertEqual(ok, emqx_connection:handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
|
||||
|
||||
t_handle_msg_close(_) ->
|
||||
?assertMatch({stop, {shutdown, normal}, _St}, emqx_connection:handle_msg({close, normal}, st())).
|
||||
|
||||
t_handle_msg_event(_) ->
|
||||
ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
|
||||
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({event, connected}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({event, disconnected}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, st())).
|
||||
|
||||
t_handle_msg_timeout(_) ->
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({timeout, make_ref(), for_testing}, st())).
|
||||
|
||||
t_handle_msg_shutdown(_) ->
|
||||
?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({shutdown, for_testing}, st())).
|
||||
|
||||
t_handle_call(_) ->
|
||||
St = st(),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)),
|
||||
?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)),
|
||||
?assertMatch({reply, _Stats, _NSt }, emqx_connection:handle_call(self(), stats, St)),
|
||||
?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)),
|
||||
?assertEqual({stop, {shutdown,kicked}, ok, St}, emqx_connection:handle_call(self(), kick, St)).
|
||||
|
||||
t_handle_timeout(_) ->
|
||||
TRef = make_ref(),
|
||||
State = st(#{idle_timer => TRef, limit_timer => TRef, stats_timer => TRef}),
|
||||
?assertMatch({stop, {shutdown,idle_timeout}, _NState}, emqx_connection:handle_timeout(TRef, idle_timeout, State)),
|
||||
?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_timeout(TRef, limit_timeout, State)),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, emit_stats, State)),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)),
|
||||
|
||||
ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
|
||||
?assertMatch({stop, {shutdown,for_testing}, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)).
|
||||
|
||||
t_parse_incoming(_) ->
|
||||
?assertMatch({ok, [], _NState}, emqx_connection:parse_incoming(<<>>, st())),
|
||||
?assertMatch({[], _NState}, emqx_connection:parse_incoming(<<"for_testing">>, [], st())).
|
||||
|
||||
t_next_incoming_msgs(_) ->
|
||||
?assertEqual({incoming, packet}, emqx_connection:next_incoming_msgs([packet])),
|
||||
?assertEqual([{incoming, packet2}, {incoming, packet1}], emqx_connection:next_incoming_msgs([packet1, packet2])).
|
||||
|
||||
t_handle_incoming(_) ->
|
||||
?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())),
|
||||
?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())).
|
||||
|
||||
t_with_channel(_) ->
|
||||
State = st(),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> ok end),
|
||||
?assertEqual({ok, State}, emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, Channel} end),
|
||||
?assertMatch({ok, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, ?DISCONNECT_PACKET(),Channel} end),
|
||||
?assertMatch({ok, _Out, _NChannel}, emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], Channel} end),
|
||||
?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel} end),
|
||||
?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)).
|
||||
|
||||
t_handle_outgoing(_) ->
|
||||
?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
|
||||
?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
|
||||
|
||||
t_handle_info(_) ->
|
||||
?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())),
|
||||
?assertMatch({stop, {shutdown, for_testing}, _NStats}, emqx_connection:handle_info({sock_error, for_testing}, st())),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_info(for_testing, st())).
|
||||
|
||||
t_ensure_rate_limit(_) ->
|
||||
State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})),
|
||||
?assertEqual(undefined, emqx_connection:info(limiter, State)),
|
||||
|
||||
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init([])} end),
|
||||
State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
||||
?assertEqual(undefined, emqx_connection:info(limiter, State1)),
|
||||
|
||||
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init([])} end),
|
||||
State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
||||
?assertEqual(undefined, emqx_connection:info(limiter, State2)),
|
||||
?assertEqual(blocked, emqx_connection:info(sockstate, State2)).
|
||||
|
||||
t_activate_socket(_) ->
|
||||
State = st(),
|
||||
{ok, NStats} = emqx_connection:activate_socket(State),
|
||||
?assertEqual(running, emqx_connection:info(sockstate, NStats)),
|
||||
|
||||
State1 = st(#{sockstate => blocked}),
|
||||
?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)),
|
||||
|
||||
State2 = st(#{sockstate => closed}),
|
||||
?assertEqual({ok, State2}, emqx_connection:activate_socket(State2)).
|
||||
|
||||
t_close_socket(_) ->
|
||||
State = emqx_connection:close_socket(st(#{sockstate => closed})),
|
||||
?assertEqual(closed, emqx_connection:info(sockstate, State)),
|
||||
State1 = emqx_connection:close_socket(st()),
|
||||
?assertEqual(closed, emqx_connection:info(sockstate, State1)).
|
||||
|
||||
t_system_code_change(_) ->
|
||||
State = st(),
|
||||
?assertEqual({ok, State}, emqx_connection:system_code_change(State, [], [], [])).
|
||||
|
||||
t_next_msgs(_) ->
|
||||
?assertEqual({outgoing, ?CONNECT_PACKET()}, emqx_connection:next_msgs(?CONNECT_PACKET())),
|
||||
?assertEqual({}, emqx_connection:next_msgs({})),
|
||||
?assertEqual([], emqx_connection:next_msgs([])).
|
||||
|
||||
t_start_link_ok(_) ->
|
||||
with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end).
|
||||
|
||||
|
@ -99,262 +332,6 @@ t_get_conn_info(_) ->
|
|||
}, SockInfo)
|
||||
end).
|
||||
|
||||
t_handle_call_discard(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_call,
|
||||
fun(discard, Channel) ->
|
||||
{shutdown, discarded, ok, Channel}
|
||||
end),
|
||||
ok = emqx_connection:call(CPid, discard),
|
||||
timer:sleep(100),
|
||||
ok = trap_exit(CPid, {shutdown, discarded})
|
||||
end, #{trap_exit => true}),
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_call,
|
||||
fun(discard, Channel) ->
|
||||
{shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel}
|
||||
end),
|
||||
ok = emqx_connection:call(CPid, discard),
|
||||
timer:sleep(100),
|
||||
ok = trap_exit(CPid, {shutdown, discarded})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_handle_call_takeover(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_call,
|
||||
fun({takeover, 'begin'}, Channel) ->
|
||||
{reply, session, Channel};
|
||||
({takeover, 'end'}, Channel) ->
|
||||
{shutdown, takeovered, [], Channel}
|
||||
end),
|
||||
session = emqx_connection:call(CPid, {takeover, 'begin'}),
|
||||
[] = emqx_connection:call(CPid, {takeover, 'end'}),
|
||||
timer:sleep(100),
|
||||
ok = trap_exit(CPid, {shutdown, takeovered})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_handle_call_any(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_call,
|
||||
fun(_Req, Channel) -> {reply, ok, Channel} end),
|
||||
ok = emqx_connection:call(CPid, req)
|
||||
end).
|
||||
|
||||
t_handle_incoming_connect(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
||||
proto_name = <<"MQTT">>,
|
||||
clientid = <<>>,
|
||||
clean_start = true,
|
||||
keepalive = 60
|
||||
},
|
||||
Frame = make_frame(?CONNECT_PACKET(ConnPkt)),
|
||||
CPid ! {tcp, sock, Frame}
|
||||
end).
|
||||
|
||||
t_handle_incoming_publish(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)),
|
||||
CPid ! {tcp, sock, Frame}
|
||||
end).
|
||||
|
||||
t_handle_incoming_subscribe(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
Frame = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
|
||||
CPid ! {tcp, sock, Frame}
|
||||
end).
|
||||
|
||||
t_handle_incoming_unsubscribe(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
Frame = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
|
||||
CPid ! {tcp, sock, Frame}
|
||||
end).
|
||||
|
||||
t_handle_incoming_undefined(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
CPid ! {incoming, undefined}
|
||||
end).
|
||||
|
||||
t_handle_sock_error(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_info,
|
||||
fun({_, Reason}, Channel) ->
|
||||
{shutdown, Reason, Channel}
|
||||
end),
|
||||
%% TODO: fixme later
|
||||
CPid ! {tcp_error, sock, econnreset},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, econnreset})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_handle_sock_activate(_) ->
|
||||
with_conn(fun(CPid) -> CPid ! activate_socket end).
|
||||
|
||||
t_handle_sock_closed(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_info,
|
||||
fun({sock_closed, Reason}, Channel) ->
|
||||
{shutdown, Reason, Channel}
|
||||
end),
|
||||
CPid ! {tcp_closed, sock},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, tcp_closed})
|
||||
end, #{trap_exit => true}),
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_info,
|
||||
fun({sock_closed, Reason}, Channel) ->
|
||||
{shutdown, Reason, ?DISCONNECT_PACKET(), Channel}
|
||||
end),
|
||||
CPid ! {tcp_closed, sock},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, tcp_closed})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_handle_outgoing(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
|
||||
CPid ! {outgoing, Publish},
|
||||
CPid ! {outgoing, ?PUBREL_PACKET(1)},
|
||||
CPid ! {outgoing, [?PUBCOMP_PACKET(1)]}
|
||||
end).
|
||||
|
||||
t_conn_rate_limit(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
|
||||
lists:foreach(fun(I) ->
|
||||
Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)),
|
||||
CPid ! {tcp, sock, make_frame(Publish)}
|
||||
end, [1, 2])
|
||||
end, #{active_n => 1, rate_limit => {1, 1024}}).
|
||||
|
||||
t_conn_pub_limit(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
|
||||
ok = lists:foreach(fun(I) ->
|
||||
CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)}
|
||||
end, lists:seq(1, 3))
|
||||
%%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
|
||||
end, #{active_n => 1, publish_limit => {1, 2}}).
|
||||
|
||||
t_conn_pingreq(_) ->
|
||||
with_conn(fun(CPid) -> CPid ! {incoming, ?PACKET(?PINGREQ)} end).
|
||||
|
||||
t_inet_reply(_) ->
|
||||
ok = meck:new(emqx_pd, [passthrough, no_history]),
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
|
||||
CPid ! {inet_reply, for_testing, ok},
|
||||
timer:sleep(100)
|
||||
end, #{active_n => 1, trap_exit => true}),
|
||||
ok = meck:unload(emqx_pd),
|
||||
with_conn(fun(CPid) ->
|
||||
CPid ! {inet_reply, for_testing, {error, for_testing}},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, for_testing})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_deliver(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_deliver,
|
||||
fun(_, Channel) -> {ok, Channel} end),
|
||||
CPid ! {deliver, topic, msg}
|
||||
end).
|
||||
|
||||
t_event_disconnected(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
|
||||
CPid ! {event, disconnected}
|
||||
end).
|
||||
|
||||
t_event_undefined(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
|
||||
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
|
||||
CPid ! {event, undefined}
|
||||
end).
|
||||
|
||||
t_cloes(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
CPid ! {close, normal},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, normal})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_oom_shutdown(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
CPid ! {shutdown, message_queue_too_long},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, message_queue_too_long})
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_handle_idle_timeout(_) ->
|
||||
ok = emqx_zone:set_env(external, idle_timeout, 10),
|
||||
with_conn(fun(CPid) ->
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, idle_timeout})
|
||||
end, #{zone => external, trap_exit => true}).
|
||||
|
||||
t_handle_emit_stats(_) ->
|
||||
ok = emqx_zone:set_env(external, idle_timeout, 1000),
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
|
||||
CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false,
|
||||
max_size => ?MAX_PACKET_SIZE,
|
||||
version => ?MQTT_PROTO_V4
|
||||
})},
|
||||
timer:sleep(1000)
|
||||
end,#{zone => external, trap_exit => true}).
|
||||
|
||||
t_handle_limit_timeout(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
CPid ! {timeout, undefined, limit_timeout},
|
||||
timer:sleep(100),
|
||||
true = erlang:is_process_alive(CPid)
|
||||
end).
|
||||
|
||||
t_handle_keepalive_timeout(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_channel, handle_timeout,
|
||||
fun(_TRef, _TMsg, Channel) ->
|
||||
{shutdown, keepalive_timeout, Channel}
|
||||
end),
|
||||
CPid ! {timeout, make_ref(), keepalive},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, {shutdown, keepalive_timeout})
|
||||
end, #{trap_exit => true}),
|
||||
with_conn(fun(CPid) ->
|
||||
ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
|
||||
ok = meck:expect(emqx_channel, handle_timeout,
|
||||
fun(_TRef, _TMsg, Channel) ->
|
||||
{shutdown, keepalive_timeout, Channel}
|
||||
end),
|
||||
CPid ! {timeout, make_ref(), keepalive},
|
||||
timer:sleep(100),
|
||||
false = erlang:is_process_alive(CPid)
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_handle_shutdown(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
CPid ! Shutdown = {shutdown, reason},
|
||||
timer:sleep(100),
|
||||
trap_exit(CPid, Shutdown)
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
t_exit_message(_) ->
|
||||
with_conn(fun(CPid) ->
|
||||
CPid ! {'EXIT', CPid, for_testing},
|
||||
timer:sleep(1000)
|
||||
end, #{trap_exit => true}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -406,3 +383,45 @@ make_frame(Packet) ->
|
|||
|
||||
payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
|
||||
|
||||
st() -> st(#{}).
|
||||
st(InitFields) when is_map(InitFields) ->
|
||||
St = emqx_connection:init_state(emqx_transport, sock, [#{zone => external}]),
|
||||
maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end,
|
||||
emqx_connection:set_field(channel, channel(), St),
|
||||
InitFields
|
||||
).
|
||||
|
||||
channel() -> channel(#{}).
|
||||
channel(InitFields) ->
|
||||
ConnInfo = #{peername => {{127,0,0,1}, 3456},
|
||||
sockname => {{127,0,0,1}, 18083},
|
||||
conn_mod => emqx_connection,
|
||||
proto_name => <<"MQTT">>,
|
||||
proto_ver => ?MQTT_PROTO_V5,
|
||||
clean_start => true,
|
||||
keepalive => 30,
|
||||
clientid => <<"clientid">>,
|
||||
username => <<"username">>,
|
||||
receive_maximum => 100,
|
||||
expiry_interval => 0
|
||||
},
|
||||
ClientInfo = #{zone => zone,
|
||||
protocol => mqtt,
|
||||
peerhost => {127,0,0,1},
|
||||
clientid => <<"clientid">>,
|
||||
username => <<"username">>,
|
||||
is_superuser => false,
|
||||
peercert => undefined,
|
||||
mountpoint => undefined
|
||||
},
|
||||
Session = emqx_session:init(#{zone => external},
|
||||
#{receive_maximum => 0}
|
||||
),
|
||||
maps:fold(fun(Field, Value, Channel) ->
|
||||
emqx_channel:set_field(Field, Value, Channel)
|
||||
end,
|
||||
emqx_channel:init(ConnInfo, [{zone, zone}]),
|
||||
maps:merge(#{clientinfo => ClientInfo,
|
||||
session => Session,
|
||||
conn_state => connected
|
||||
}, InitFields)).
|
|
@ -68,6 +68,14 @@ t_timestamp(_) ->
|
|||
timer:sleep(1),
|
||||
?assert(erlang:system_time(millisecond) > emqx_message:timestamp(Msg)).
|
||||
|
||||
t_is_sys(_) ->
|
||||
Msg0 = emqx_message:make(<<"t">>, <<"payload">>),
|
||||
?assertNot(emqx_message:is_sys(Msg0)),
|
||||
Msg1 = emqx_message:set_flag(sys, Msg0),
|
||||
?assert(emqx_message:is_sys(Msg1)),
|
||||
Msg2 = emqx_message:make(<<"$SYS/events">>, <<"payload">>),
|
||||
?assert(emqx_message:is_sys(Msg2)).
|
||||
|
||||
t_clean_dup(_) ->
|
||||
Msg = emqx_message:make(<<"topic">>, <<"payload">>),
|
||||
?assertNot(emqx_message:get_flag(dup, Msg)),
|
||||
|
|
|
@ -49,54 +49,66 @@ t_tcp_sock_passive(_) ->
|
|||
with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []).
|
||||
|
||||
t_message_expiry_interval_1(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
|
||||
t_message_expiry_interval_2(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
|
||||
message_expiry_interval_init() ->
|
||||
{ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientA),
|
||||
{ok, _} = emqtt:connect(ClientB),
|
||||
%% subscribe and disconnect client-b
|
||||
emqtt:subscribe(ClientB, <<"t/a">>, 1),
|
||||
emqtt:stop(ClientB),
|
||||
ClientA.
|
||||
{ok, ClientA} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-a">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, ClientB} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-b">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientA),
|
||||
{ok, _} = emqtt:connect(ClientB),
|
||||
%% subscribe and disconnect client-b
|
||||
emqtt:subscribe(ClientB, <<"t/a">>, 1),
|
||||
emqtt:stop(ClientB),
|
||||
ClientA.
|
||||
|
||||
message_expiry_interval_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:sleep(1500),
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:sleep(1500),
|
||||
|
||||
%% resume the session for client-b
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
%% resume the session for client-b
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-b">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
|
||||
%% verify client-b could not receive the publish message
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
||||
ct:fail(should_have_expired)
|
||||
after 300 ->
|
||||
ok
|
||||
end,
|
||||
emqtt:stop(ClientB1).
|
||||
%% verify client-b could not receive the publish message
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
||||
ct:fail(should_have_expired)
|
||||
after 300 ->
|
||||
ok
|
||||
end,
|
||||
emqtt:stop(ClientB1).
|
||||
|
||||
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
|
||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||
%% as Message-Expiry-Interval = 20s
|
||||
ct:sleep(1000),
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||
%% as Message-Expiry-Interval = 20s
|
||||
ct:sleep(1000),
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-b">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
|
||||
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
||||
receive
|
||||
|
|
|
@ -28,7 +28,9 @@ t_check_pub(_) ->
|
|||
PubCaps = #{max_qos_allowed => ?QOS_1,
|
||||
retain_available => false
|
||||
},
|
||||
ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
|
||||
lists:foreach(fun({Key, Val}) ->
|
||||
ok = emqx_zone:set_env(zone, Key, Val)
|
||||
end, maps:to_list(PubCaps)),
|
||||
ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
|
||||
retain => false}),
|
||||
PubFlags1 = #{qos => ?QOS_2, retain => false},
|
||||
|
@ -37,7 +39,9 @@ t_check_pub(_) ->
|
|||
PubFlags2 = #{qos => ?QOS_1, retain => true},
|
||||
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_pub(zone, PubFlags2)),
|
||||
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps').
|
||||
lists:foreach(fun({Key, _Val}) ->
|
||||
true = emqx_zone:unset_env(zone, Key)
|
||||
end, maps:to_list(PubCaps)).
|
||||
|
||||
t_check_sub(_) ->
|
||||
SubOpts = #{rh => 0,
|
||||
|
@ -50,7 +54,9 @@ t_check_sub(_) ->
|
|||
shared_subscription => false,
|
||||
wildcard_subscription => false
|
||||
},
|
||||
ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
|
||||
lists:foreach(fun({Key, Val}) ->
|
||||
ok = emqx_zone:set_env(zone, Key, Val)
|
||||
end, maps:to_list(SubCaps)),
|
||||
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
|
||||
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
|
||||
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
|
||||
|
@ -58,4 +64,6 @@ t_check_sub(_) ->
|
|||
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
|
||||
true = emqx_zone:unset_env(zone, '$mqtt_sub_caps').
|
||||
lists:foreach(fun({Key, _Val}) ->
|
||||
true = emqx_zone:unset_env(zone, Key)
|
||||
end, maps:to_list(SubCaps)).
|
||||
|
|
|
@ -28,24 +28,17 @@
|
|||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
|
||||
% t_init(_) ->
|
||||
% error('TODO').
|
||||
|
||||
% t_is_empty(_) ->
|
||||
% error('TODO').
|
||||
|
||||
% t_len(_) ->
|
||||
% error('TODO').
|
||||
|
||||
% t_max_len(_) ->
|
||||
% error('TODO').
|
||||
|
||||
% t_dropped(_) ->
|
||||
% error('TODO').
|
||||
|
||||
% t_stats(_) ->
|
||||
% error('TODO').
|
||||
t_info(_) ->
|
||||
Q = ?Q:init(#{max_len => 5, store_qos0 => true}),
|
||||
true = ?Q:info(store_qos0, Q),
|
||||
5 = ?Q:info(max_len, Q),
|
||||
0 = ?Q:info(len, Q),
|
||||
0 = ?Q:info(dropped, Q),
|
||||
#{store_qos0 := true,
|
||||
max_len := 5,
|
||||
len := 0,
|
||||
dropped := 0
|
||||
} = ?Q:info(Q).
|
||||
|
||||
t_in(_) ->
|
||||
Opts = #{max_len => 5, store_qos0 => true},
|
||||
|
@ -163,3 +156,10 @@ t_length_priority_mqueue(_) ->
|
|||
{{value, _Val}, Q5} = ?Q:out(Q4),
|
||||
?assertEqual(1, ?Q:len(Q5)).
|
||||
|
||||
t_dropped(_) ->
|
||||
Q = ?Q:init(#{max_len => 1, store_qos0 => true}),
|
||||
Msg = emqx_message:make(<<"t">>, <<"payload">>),
|
||||
{undefined, Q1} = ?Q:in(Msg, Q),
|
||||
{Msg, Q2} = ?Q:in(Msg, Q1),
|
||||
?assertEqual(1, ?Q:dropped(Q2)).
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ t_load(_) ->
|
|||
?assertEqual([], emqx_plugins:unload()),
|
||||
|
||||
?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)),
|
||||
?assertMatch({ok, _}, emqx_plugins:load(emqx_mini_plugin)),
|
||||
?assertMatch(ok, emqx_plugins:load(emqx_mini_plugin)),
|
||||
?assertEqual({error, already_started}, emqx_plugins:load(emqx_mini_plugin)),
|
||||
?assertEqual(ok, emqx_plugins:unload(emqx_mini_plugin)),
|
||||
?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)),
|
||||
|
@ -127,7 +127,7 @@ t_load_plugin(_) ->
|
|||
(App) -> {ok, App} end),
|
||||
|
||||
?assertMatch({error, _}, emqx_plugins:load_plugin(#plugin{name = already_loaded_app}, true)),
|
||||
?assertMatch({ok, _}, emqx_plugins:load_plugin(#plugin{name = normal}, true)),
|
||||
?assertMatch(ok, emqx_plugins:load_plugin(#plugin{name = normal}, true)),
|
||||
?assertMatch({error,_}, emqx_plugins:load_plugin(#plugin{name = error_app}, true)),
|
||||
|
||||
ok = meck:unload(application).
|
||||
|
|
|
@ -31,14 +31,15 @@ all() -> emqx_ct:all(?MODULE).
|
|||
|
||||
init_per_suite(Config) ->
|
||||
%% Broker
|
||||
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
||||
ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
|
||||
ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker],
|
||||
[passthrough, no_history, no_link]),
|
||||
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||
ok = meck:expect(emqx_metrics, inc, fun(_K, _V) -> ok end),
|
||||
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok = meck:unload(emqx_broker),
|
||||
ok = meck:unload(emqx_hooks).
|
||||
meck:unload([emqx_broker, emqx_hooks, emqx_metrics]).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_time_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_seed(_) ->
|
||||
?assert(is_tuple(emqx_time:seed())).
|
||||
|
||||
t_now_secs(_) ->
|
||||
?assert(emqx_time:now_secs() =< emqx_time:now_secs(os:timestamp())).
|
||||
|
||||
t_now_ms(_) ->
|
||||
?assert(emqx_time:now_ms() =< emqx_time:now_ms(os:timestamp())).
|
Loading…
Reference in New Issue