feat(event-topic): fix check dialyzer

This commit is contained in:
Turtle 2021-07-29 23:53:29 +08:00 committed by turtleDeng
parent 459d2154c7
commit d6c375ab5b
2 changed files with 40 additions and 55 deletions

View File

@ -71,7 +71,7 @@ disable() ->
<<"$event/session_subscribed">> ->
emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed});
<<"$event/session_unsubscribed">> ->
emqx_hooks:del('session.unsubscribed', {?MODULE, session_unsubscribed});
emqx_hooks:del('session.unsubscribed', {?MODULE, on_session_unsubscribed});
<<"$event/message_delivered">> ->
emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered});
<<"$event/message_acked">> ->
@ -88,66 +88,52 @@ disable() ->
%%--------------------------------------------------------------------
on_client_connected(ClientInfo, ConnInfo) ->
Payload0 = connected_payload(ClientInfo, ConnInfo),
emqx_broker:safe_publish(
make_msg(<<"$event/client_connected">>,
emqx_json:encode(Payload0))),
ok.
Payload = connected_payload(ClientInfo, ConnInfo),
publish_event_msg(<<"$event/client_connected">>, Payload).
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) ->
Payload0 = #{clientid => ClientId,
username => Username,
reason => reason(Reason),
disconnected_at => DisconnectedAt,
ts => erlang:system_time(millisecond)
},
emqx_broker:safe_publish(
make_msg(<<"$event/client_disconnected">>,
emqx_json:encode(Payload0))),
ok.
Payload = #{clientid => ClientId,
username => Username,
reason => reason(Reason),
disconnected_at => DisconnectedAt,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/client_disconnected">>, Payload).
on_session_subscribed(_ClientInfo = #{clientid := ClientId,
username := Username},
Topic, SubOpts) ->
Payload0 = #{clientid => ClientId,
username => Username,
topic => Topic,
subopts => SubOpts,
ts => erlang:system_time(millisecond)
},
emqx_broker:safe_publish(
make_msg(<<"$event/session_subscribed">>,
emqx_json:encode(Payload0))),
ok.
Payload = #{clientid => ClientId,
username => Username,
topic => Topic,
subopts => SubOpts,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/session_subscribed">>, Payload).
on_session_unsubscribed(_ClientInfo = #{clientid := ClientId,
username := Username},
Topic, _SubOpts) ->
Payload0 = #{clientid => ClientId,
username => Username,
topic => Topic,
ts => erlang:system_time(millisecond)
},
emqx_broker:safe_publish(
make_msg(<<"$event/session_unsubscribed">>,
emqx_json:encode(Payload0))),
ok.
Payload = #{clientid => ClientId,
username => Username,
topic => Topic,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/session_unsubscribed">>, Payload).
on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
case ignore_sys_message(Message) of
true -> ok;
false ->
Payload0 = base_message(Message),
Payload1 = Payload0#{
Payload = Payload0#{
reason => Reason,
clientid => ClientId,
username => emqx_message:get_header(username, Message, undefined),
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined))
},
emqx_broker:safe_publish(
make_msg(<<"$event/message_dropped">>, emqx_json:encode(Payload1))),
ok
publish_event_msg(<<"$event/message_dropped">>, Payload)
end,
{ok, Message}.
@ -160,16 +146,14 @@ on_message_delivered(_ClientInfo = #{
true -> ok;
false ->
Payload0 = base_message(Message),
Payload1 = Payload0#{
Payload = Payload0#{
from_clientid => ClientId,
from_username => emqx_message:get_header(username, Message, undefined),
clientid => ReceiverCId,
username => ReceiverUsername,
peerhost => ntoa(PeerHost)
},
emqx_broker:safe_publish(
make_msg(<<"$event/message_delivered">>, emqx_json:encode(Payload1))),
ok
publish_event_msg(<<"$event/message_delivered">>, Payload)
end,
{ok, Message}.
@ -182,16 +166,14 @@ on_message_acked(_ClientInfo = #{
true -> ok;
false ->
Payload0 = base_message(Message),
Payload1 = Payload0#{
Payload = Payload0#{
from_clientid => ClientId,
from_username => emqx_message:get_header(username, Message, undefined),
clientid => ReceiverCId,
username => ReceiverUsername,
peerhost => ntoa(PeerHost)
},
emqx_broker:safe_publish(
make_msg(<<"$event/message_acked">>, emqx_json:encode(Payload1))),
ok
publish_event_msg(<<"$event/message_acked">>, Payload)
end,
{ok, Message}.
@ -280,3 +262,7 @@ base_message(Message) ->
ignore_sys_message(#message{flags = Flags}) ->
maps:get(sys, Flags, false).
publish_event_msg(Topic, Payload) ->
_ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))),
ok.

View File

@ -64,8 +64,10 @@ t_event_topic(_) ->
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1),
_ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)),
recv_message_publish(<<"clientid">>),
recv_message_delivered(<<"clientid">>),
{ok, #{qos := QOS1, topic := Topic1}} = receive_publish(100),
{ok, #{qos := QOS2, topic := Topic2}} = receive_publish(100),
recv_message_publish_or_delivered(<<"clientid">>, QOS1, Topic1),
recv_message_publish_or_delivered(<<"clientid">>, QOS2, Topic2),
recv_message_acked(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1),
@ -107,12 +109,9 @@ recv_message_dropped(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/message_dropped">>, Topic).
recv_message_delivered(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/message_delivered">>, Topic).
recv_message_publish(_ClientId) ->
{ok, #{qos := ?QOS_1, topic := Topic}} = receive_publish(100),
recv_message_publish_or_delivered(_ClientId, 0, Topic) ->
?assertMatch(<<"$event/message_delivered">>, Topic);
recv_message_publish_or_delivered(_ClientId, 1, Topic) ->
?assertMatch(<<"test_sub">>, Topic).
recv_message_acked(_ClientId) ->