From d6c375ab5b460cf0ff4c0e999eb226464c0d2052 Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 29 Jul 2021 23:53:29 +0800 Subject: [PATCH] feat(event-topic): fix check dialyzer --- apps/emqx_modules/src/emqx_event_topic.erl | 80 ++++++++----------- .../test/emqx_event_topic_SUITE.erl | 15 ++-- 2 files changed, 40 insertions(+), 55 deletions(-) diff --git a/apps/emqx_modules/src/emqx_event_topic.erl b/apps/emqx_modules/src/emqx_event_topic.erl index 41dc76746..93b77a1a8 100644 --- a/apps/emqx_modules/src/emqx_event_topic.erl +++ b/apps/emqx_modules/src/emqx_event_topic.erl @@ -71,7 +71,7 @@ disable() -> <<"$event/session_subscribed">> -> emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}); <<"$event/session_unsubscribed">> -> - emqx_hooks:del('session.unsubscribed', {?MODULE, session_unsubscribed}); + emqx_hooks:del('session.unsubscribed', {?MODULE, on_session_unsubscribed}); <<"$event/message_delivered">> -> emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}); <<"$event/message_acked">> -> @@ -88,66 +88,52 @@ disable() -> %%-------------------------------------------------------------------- on_client_connected(ClientInfo, ConnInfo) -> - Payload0 = connected_payload(ClientInfo, ConnInfo), - emqx_broker:safe_publish( - make_msg(<<"$event/client_connected">>, - emqx_json:encode(Payload0))), - ok. + Payload = connected_payload(ClientInfo, ConnInfo), + publish_event_msg(<<"$event/client_connected">>, Payload). on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username}, Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) -> - Payload0 = #{clientid => ClientId, - username => Username, - reason => reason(Reason), - disconnected_at => DisconnectedAt, - ts => erlang:system_time(millisecond) - }, - emqx_broker:safe_publish( - make_msg(<<"$event/client_disconnected">>, - emqx_json:encode(Payload0))), - ok. + Payload = #{clientid => ClientId, + username => Username, + reason => reason(Reason), + disconnected_at => DisconnectedAt, + ts => erlang:system_time(millisecond) + }, + publish_event_msg(<<"$event/client_disconnected">>, Payload). on_session_subscribed(_ClientInfo = #{clientid := ClientId, username := Username}, Topic, SubOpts) -> - Payload0 = #{clientid => ClientId, - username => Username, - topic => Topic, - subopts => SubOpts, - ts => erlang:system_time(millisecond) - }, - emqx_broker:safe_publish( - make_msg(<<"$event/session_subscribed">>, - emqx_json:encode(Payload0))), - ok. + Payload = #{clientid => ClientId, + username => Username, + topic => Topic, + subopts => SubOpts, + ts => erlang:system_time(millisecond) + }, + publish_event_msg(<<"$event/session_subscribed">>, Payload). on_session_unsubscribed(_ClientInfo = #{clientid := ClientId, username := Username}, Topic, _SubOpts) -> - Payload0 = #{clientid => ClientId, - username => Username, - topic => Topic, - ts => erlang:system_time(millisecond) - }, - emqx_broker:safe_publish( - make_msg(<<"$event/session_unsubscribed">>, - emqx_json:encode(Payload0))), - ok. + Payload = #{clientid => ClientId, + username => Username, + topic => Topic, + ts => erlang:system_time(millisecond) + }, + publish_event_msg(<<"$event/session_unsubscribed">>, Payload). on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> case ignore_sys_message(Message) of true -> ok; false -> Payload0 = base_message(Message), - Payload1 = Payload0#{ + Payload = Payload0#{ reason => Reason, clientid => ClientId, username => emqx_message:get_header(username, Message, undefined), peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)) }, - emqx_broker:safe_publish( - make_msg(<<"$event/message_dropped">>, emqx_json:encode(Payload1))), - ok + publish_event_msg(<<"$event/message_dropped">>, Payload) end, {ok, Message}. @@ -160,16 +146,14 @@ on_message_delivered(_ClientInfo = #{ true -> ok; false -> Payload0 = base_message(Message), - Payload1 = Payload0#{ + Payload = Payload0#{ from_clientid => ClientId, from_username => emqx_message:get_header(username, Message, undefined), clientid => ReceiverCId, username => ReceiverUsername, peerhost => ntoa(PeerHost) }, - emqx_broker:safe_publish( - make_msg(<<"$event/message_delivered">>, emqx_json:encode(Payload1))), - ok + publish_event_msg(<<"$event/message_delivered">>, Payload) end, {ok, Message}. @@ -182,16 +166,14 @@ on_message_acked(_ClientInfo = #{ true -> ok; false -> Payload0 = base_message(Message), - Payload1 = Payload0#{ + Payload = Payload0#{ from_clientid => ClientId, from_username => emqx_message:get_header(username, Message, undefined), clientid => ReceiverCId, username => ReceiverUsername, peerhost => ntoa(PeerHost) }, - emqx_broker:safe_publish( - make_msg(<<"$event/message_acked">>, emqx_json:encode(Payload1))), - ok + publish_event_msg(<<"$event/message_acked">>, Payload) end, {ok, Message}. @@ -280,3 +262,7 @@ base_message(Message) -> ignore_sys_message(#message{flags = Flags}) -> maps:get(sys, Flags, false). + +publish_event_msg(Topic, Payload) -> + _ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))), + ok. diff --git a/apps/emqx_modules/test/emqx_event_topic_SUITE.erl b/apps/emqx_modules/test/emqx_event_topic_SUITE.erl index 26898730a..39c34879c 100644 --- a/apps/emqx_modules/test/emqx_event_topic_SUITE.erl +++ b/apps/emqx_modules/test/emqx_event_topic_SUITE.erl @@ -64,8 +64,10 @@ t_event_topic(_) -> {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1), _ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)), - recv_message_publish(<<"clientid">>), - recv_message_delivered(<<"clientid">>), + {ok, #{qos := QOS1, topic := Topic1}} = receive_publish(100), + {ok, #{qos := QOS2, topic := Topic2}} = receive_publish(100), + recv_message_publish_or_delivered(<<"clientid">>, QOS1, Topic1), + recv_message_publish_or_delivered(<<"clientid">>, QOS2, Topic2), recv_message_acked(<<"clientid">>), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1), @@ -107,12 +109,9 @@ recv_message_dropped(_ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), ?assertMatch(<<"$event/message_dropped">>, Topic). -recv_message_delivered(_ClientId) -> - {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), - ?assertMatch(<<"$event/message_delivered">>, Topic). - -recv_message_publish(_ClientId) -> - {ok, #{qos := ?QOS_1, topic := Topic}} = receive_publish(100), +recv_message_publish_or_delivered(_ClientId, 0, Topic) -> + ?assertMatch(<<"$event/message_delivered">>, Topic); +recv_message_publish_or_delivered(_ClientId, 1, Topic) -> ?assertMatch(<<"test_sub">>, Topic). recv_message_acked(_ClientId) ->