diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 591bfa8c3..fcad0ef7a 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -30,7 +30,7 @@ %% callbacks for emqtt -export([ handle_puback/2 - , handle_publish/2 + , handle_publish/3 , handle_disconnected/2 ]). @@ -52,7 +52,7 @@ start(Config) -> Mountpoint = maps:get(receive_mountpoint, Config, undefined), Subscriptions = maps:get(subscriptions, Config, undefined), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), - Handlers = make_hdlr(Parent, Vars), + Handlers = make_hdlr(Parent, Vars, #{server => ip_port_to_server(Host, Port)}), Config1 = Config#{ msg_handler => Handlers, host => Host, @@ -161,12 +161,12 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> ?SLOG(warning, #{msg => "publish_to_remote_node_falied", packet_id => PktId, reason_code => RC}). -handle_publish(Msg, undefined) -> +handle_publish(Msg, undefined, _Opts) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(#{properties := Props} = Msg0, Vars) -> - Msg = format_msg_received(Msg0), +handle_publish(#{properties := Props} = Msg0, Vars, Opts) -> + Msg = format_msg_received(Msg0, Opts), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), case Vars of @@ -179,9 +179,9 @@ handle_publish(#{properties := Props} = Msg0, Vars) -> handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. -make_hdlr(Parent, Vars) -> +make_hdlr(Parent, Vars, Opts) -> #{puback => {fun ?MODULE:handle_puback/2, [Parent]}, - publish => {fun ?MODULE:handle_publish/2, [Vars]}, + publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} }. @@ -212,8 +212,9 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi end. format_msg_received(#{dup := Dup, payload := Payload, properties := Props, - qos := QoS, retain := Retain, topic := Topic}) -> + qos := QoS, retain := Retain, topic := Topic}, #{server := Server}) -> #{ id => emqx_guid:to_hexstr(emqx_guid:gen()) + , server => Server , payload => Payload , topic => Topic , qos => QoS @@ -236,3 +237,10 @@ printable_maps(Headers) -> }; (K, V0, AccIn) -> AccIn#{K => V0} end, #{}, Headers). + +ip_port_to_server(Host, Port) -> + HostStr = case inet:ntoa(Host) of + {error, einval} -> Host; + IPStr -> IPStr + end, + list_to_binary(io_lib:format("~s:~w", [HostStr, Port])). diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 66064ac24..142a98487 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -72,6 +72,7 @@ fields("rule_test") -> , ref("ctx_dropped") , ref("ctx_connected") , ref("ctx_disconnected") + , ref("ctx_bridge_mqtt") ]), #{desc => "The context of the event for testing", default => #{}})} @@ -204,7 +205,20 @@ fields("ctx_disconnected") -> , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})} , {"disconnected_at", sc(integer(), #{ desc => "The Time that this Client is Disconnected"})} - ]. + ]; + +fields("ctx_bridge_mqtt") -> + [ {"event_type", sc('$bridges/mqtt:*', #{desc => "Event Type", required => true})} + , {"id", sc(binary(), #{desc => "Message ID"})} + , {"payload", sc(binary(), #{desc => "The Message Payload"})} + , {"topic", sc(binary(), #{desc => "Message Topic"})} + , {"server", sc(binary(), #{desc => "The IP address (or hostname) and port of the MQTT broker," + " in IP:Port format"})} + , {"dup", sc(binary(), #{desc => "The DUP flag of the MQTT message"})} + , {"retain", sc(binary(), #{desc => "If is a retain message"})} + , {"message_received_at", sc(integer(), #{ + desc => "The Time that this Message is Received"})} + ] ++ [qos()]. qos() -> {"qos", sc(emqx_schema:qos(), #{desc => "The Message QoS"})}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 1b45ddb0f..10e3e41ef 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -607,7 +607,7 @@ columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) -> [ {<<"event">>, EventName} , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"server">>, <<"192.168.0.10:1883">>} , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} , {<<"dup">>, false}