From a0e81226d7ffbf85db7a1d1c0e3f99ab6effba2a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 22 Nov 2021 19:10:00 +0800 Subject: [PATCH] fix(elvis): improve some code format --- apps/emqx/src/emqx_shared_sub.erl | 2 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 79 +++++++++---------- 2 files changed, 38 insertions(+), 43 deletions(-) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 2c91bbbcc..434f38694 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -292,7 +292,7 @@ subscribers(Group, Topic) -> %%-------------------------------------------------------------------- init([]) -> - mria:wait_for_tables([?TAB]), + ok = mria:wait_for_tables([?TAB]), {ok, _} = mnesia:subscribe({table, ?TAB, simple}), {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index e80eb2e5f..0aff9f018 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -114,11 +114,15 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> apply_event('session.subscribed', - fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env). + fun() -> + eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) + end, Env). on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) -> apply_event('session.unsubscribed', - fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env). + fun() -> + eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) + end, Env). on_message_dropped(Message, _, Reason, Env) -> case ignore_sys_message(Message) of @@ -151,7 +155,8 @@ on_message_acked(ClientInfo, Message, Env) -> %% Event Messages %%-------------------------------------------------------------------- -eventmsg_publish(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) -> +eventmsg_publish(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, + topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) -> with_basic_columns('message.publish', #{id => emqx_guid:to_hexstr(Id), clientid => ClientId, @@ -236,7 +241,8 @@ eventmsg_sub_or_unsub(Event, _ClientInfo = #{ qos => QoS }). -eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}, Reason) -> +eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, + topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}, Reason) -> with_basic_columns('message.dropped', #{id => emqx_guid:to_hexstr(Id), reason => Reason, @@ -257,7 +263,9 @@ eventmsg_delivered(_ClientInfo = #{ peerhost := PeerHost, clientid := ReceiverCId, username := ReceiverUsername - }, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) -> + }, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, + topic = Topic, headers = Headers, payload = Payload, + timestamp = Timestamp}) -> with_basic_columns('message.delivered', #{id => emqx_guid:to_hexstr(Id), from_clientid => ClientId, @@ -279,7 +287,10 @@ eventmsg_acked(_ClientInfo = #{ peerhost := PeerHost, clientid := ReceiverCId, username := ReceiverUsername - }, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) -> + }, + Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, + topic = Topic, headers = Headers, payload = Payload, + timestamp = Timestamp}) -> with_basic_columns('message.acked', #{id => emqx_guid:to_hexstr(Id), from_clientid => ClientId, @@ -455,37 +466,9 @@ columns_with_exam('message.publish') -> , {<<"node">>, node()} ]; columns_with_exam('message.delivered') -> - [ {<<"event">>, 'message.delivered'} - , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} - , {<<"from_clientid">>, <<"c_emqx_1">>} - , {<<"from_username">>, <<"u_emqx_1">>} - , {<<"clientid">>, <<"c_emqx_2">>} - , {<<"username">>, <<"u_emqx_2">>} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"flags">>, #{}} - , {<<"publish_received_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; + columns_message_ack_delivered('message.delivered'); columns_with_exam('message.acked') -> - [ {<<"event">>, 'message.acked'} - , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} - , {<<"from_clientid">>, <<"c_emqx_1">>} - , {<<"from_username">>, <<"u_emqx_1">>} - , {<<"clientid">>, <<"c_emqx_2">>} - , {<<"username">>, <<"u_emqx_2">>} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"flags">>, #{}} - , {<<"publish_received_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; + columns_message_ack_delivered('message.acked'); columns_with_exam('message.dropped') -> [ {<<"event">>, 'message.dropped'} , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} @@ -530,7 +513,12 @@ columns_with_exam('client.disconnected') -> , {<<"node">>, node()} ]; columns_with_exam('session.subscribed') -> - [ {<<"event">>, 'session.subscribed'} + columns_message_sub_unsub('session.subscribed'); +columns_with_exam('session.unsubscribed') -> + columns_message_sub_unsub('session.unsubscribed'). + +columns_message_sub_unsub(EventName) -> + [ {<<"event">>, EventName} , {<<"clientid">>, <<"c_emqx">>} , {<<"username">>, <<"u_emqx">>} , {<<"peerhost">>, <<"192.168.0.10">>} @@ -538,14 +526,21 @@ columns_with_exam('session.subscribed') -> , {<<"qos">>, 1} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} - ]; -columns_with_exam('session.unsubscribed') -> - [ {<<"event">>, 'session.unsubscribed'} - , {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} + ]. + +columns_message_ack_delivered(EventName) -> + [ {<<"event">>, EventName} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} , {<<"peerhost">>, <<"192.168.0.10">>} , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ].