From 247b14c95f697b29277351e47f0cc4242d7870a3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 10 Mar 2022 15:57:33 +0800 Subject: [PATCH] fix(mqtt_bridge): refine the message format from a mqtt bridge source --- .../src/mqtt/emqx_connector_mqtt_mod.erl | 19 ++++--- .../emqx_rule_engine/src/emqx_rule_events.erl | 51 +++++++++++++++---- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index f02836b22..591bfa8c3 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -213,16 +213,15 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi format_msg_received(#{dup := Dup, payload := Payload, properties := Props, qos := QoS, retain := Retain, topic := Topic}) -> - #{event => '$bridges/mqtt', - id => emqx_guid:to_hexstr(emqx_guid:gen()), - payload => Payload, - topic => Topic, - qos => QoS, - dup => Dup, - retain => Retain, - pub_props => printable_maps(Props), - timestamp => erlang:system_time(millisecond) - }. + #{ id => emqx_guid:to_hexstr(emqx_guid:gen()) + , payload => Payload + , topic => Topic + , qos => QoS + , dup => Dup + , retain => Retain + , pub_props => printable_maps(Props) + , message_received_at => erlang:system_time(millisecond) + }. printable_maps(undefined) -> #{}; printable_maps(Headers) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index a0a33470b..492caabc8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -74,10 +74,10 @@ reload() -> load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) -> emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received, - [#{bridge_topic => BridgeTopic}]}); + [#{bridge_event_name => BridgeTopic}]}); load(Topic) -> HookPoint = event_name(Topic), - emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}). + emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{}]}). unload() -> lists:foreach(fun(HookPoint) -> @@ -91,12 +91,6 @@ unload(Topic) -> %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- -on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) -> - case emqx_rule_engine:get_rules_for_topic(BridgeTopic) of - [] -> ok; - Rules -> emqx_rule_runtime:apply_rules(Rules, Message) - end. - on_message_publish(Message = #message{topic = Topic}, _Env) -> case ignore_sys_message(Message) of true -> ok; @@ -108,6 +102,9 @@ on_message_publish(Message = #message{topic = Topic}, _Env) -> end, {ok, Message}. +on_bridge_message_received(Message, Env = #{bridge_event_name := BridgeTopic}) -> + apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env). + on_client_connected(ClientInfo, ConnInfo, Env) -> apply_event('client.connected', fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env). @@ -364,7 +361,9 @@ apply_event(EventName, GenEventMsg, _Env) -> EventTopic = event_topic(EventName), case emqx_rule_engine:get_rules_for_topic(EventTopic) of [] -> ok; - Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) + Rules -> + %% delay the generating of eventmsg after we have found some rules to apply + emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) end. %%-------------------------------------------------------------------- @@ -383,6 +382,7 @@ event_info() -> , event_info_session_subscribed() , event_info_session_unsubscribed() , event_info_delivery_dropped() + , event_info_bridge_mqtt() ]. event_info_message_publish() -> @@ -449,6 +449,13 @@ event_info_session_unsubscribed() -> {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">> ). +event_info_bridge_mqtt()-> + event_info_common( + <<"$bridges/mqtt">>, + {<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>}, + {<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>}, + <<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">> + ). event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> #{event => event_topic(Event), @@ -500,6 +507,11 @@ test_columns('session.subscribed') -> , {<<"username">>, <<"u_emqx">>} , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} + ]; +test_columns(<<"$bridges/mqtt">>) -> + [ {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} ]. columns_with_exam('message.publish') -> @@ -584,7 +596,22 @@ columns_with_exam('client.disconnected') -> columns_with_exam('session.subscribed') -> columns_message_sub_unsub('session.subscribed'); columns_with_exam('session.unsubscribed') -> - columns_message_sub_unsub('session.unsubscribed'). + columns_message_sub_unsub('session.unsubscribed'); +columns_with_exam(<<"$bridges/mqtt">>) -> + [ {<<"event">>, <<"$bridges/mqtt">>} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"dup">>, false} + , {<<"retain">>, false} + %% the time that we receiced the message from remote broker + , {<<"message_received_at">>, erlang:system_time(millisecond)} + %% the time that the rule is triggered + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]. columns_message_sub_unsub(EventName) -> [ {<<"event">>, EventName} @@ -646,6 +673,7 @@ event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; +event_name(<<"$bridges/", _/binary>> = Topic) -> Topic; event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; @@ -656,7 +684,8 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; -event_topic('message.publish') -> <<"$events/message_publish">>. +event_topic('message.publish') -> <<"$events/message_publish">>; +event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic. printable_maps(undefined) -> #{}; printable_maps(Headers) ->