fix(mqtt_bridge): refine the message format from a mqtt bridge source

This commit is contained in:
Shawn 2022-03-10 15:57:33 +08:00
parent 67e39150d0
commit 247b14c95f
2 changed files with 49 additions and 21 deletions

View File

@ -213,15 +213,14 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi
format_msg_received(#{dup := Dup, payload := Payload, properties := Props, format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) -> qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt', #{ id => emqx_guid:to_hexstr(emqx_guid:gen())
id => emqx_guid:to_hexstr(emqx_guid:gen()), , payload => Payload
payload => Payload, , topic => Topic
topic => Topic, , qos => QoS
qos => QoS, , dup => Dup
dup => Dup, , retain => Retain
retain => Retain, , pub_props => printable_maps(Props)
pub_props => printable_maps(Props), , message_received_at => erlang:system_time(millisecond)
timestamp => erlang:system_time(millisecond)
}. }.
printable_maps(undefined) -> #{}; printable_maps(undefined) -> #{};

View File

@ -74,10 +74,10 @@ reload() ->
load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) -> load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) ->
emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received, emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
[#{bridge_topic => BridgeTopic}]}); [#{bridge_event_name => BridgeTopic}]});
load(Topic) -> load(Topic) ->
HookPoint = event_name(Topic), HookPoint = event_name(Topic),
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}). emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{}]}).
unload() -> unload() ->
lists:foreach(fun(HookPoint) -> lists:foreach(fun(HookPoint) ->
@ -91,12 +91,6 @@ unload(Topic) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Callbacks %% Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) ->
case emqx_rule_engine:get_rules_for_topic(BridgeTopic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, Message)
end.
on_message_publish(Message = #message{topic = Topic}, _Env) -> on_message_publish(Message = #message{topic = Topic}, _Env) ->
case ignore_sys_message(Message) of case ignore_sys_message(Message) of
true -> ok; true -> ok;
@ -108,6 +102,9 @@ on_message_publish(Message = #message{topic = Topic}, _Env) ->
end, end,
{ok, Message}. {ok, Message}.
on_bridge_message_received(Message, Env = #{bridge_event_name := BridgeTopic}) ->
apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env).
on_client_connected(ClientInfo, ConnInfo, Env) -> on_client_connected(ClientInfo, ConnInfo, Env) ->
apply_event('client.connected', apply_event('client.connected',
fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env). fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env).
@ -364,7 +361,9 @@ apply_event(EventName, GenEventMsg, _Env) ->
EventTopic = event_topic(EventName), EventTopic = event_topic(EventName),
case emqx_rule_engine:get_rules_for_topic(EventTopic) of case emqx_rule_engine:get_rules_for_topic(EventTopic) of
[] -> ok; [] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) Rules ->
%% delay the generating of eventmsg after we have found some rules to apply
emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -383,6 +382,7 @@ event_info() ->
, event_info_session_subscribed() , event_info_session_subscribed()
, event_info_session_unsubscribed() , event_info_session_unsubscribed()
, event_info_delivery_dropped() , event_info_delivery_dropped()
, event_info_bridge_mqtt()
]. ].
event_info_message_publish() -> event_info_message_publish() ->
@ -449,6 +449,13 @@ event_info_session_unsubscribed() ->
{<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>},
<<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">> <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">>
). ).
event_info_bridge_mqtt()->
event_info_common(
<<"$bridges/mqtt">>,
{<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>},
{<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>},
<<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">>
).
event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
#{event => event_topic(Event), #{event => event_topic(Event),
@ -500,6 +507,11 @@ test_columns('session.subscribed') ->
, {<<"username">>, <<"u_emqx">>} , {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>} , {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1} , {<<"qos">>, 1}
];
test_columns(<<"$bridges/mqtt">>) ->
[ {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
]. ].
columns_with_exam('message.publish') -> columns_with_exam('message.publish') ->
@ -584,7 +596,22 @@ columns_with_exam('client.disconnected') ->
columns_with_exam('session.subscribed') -> columns_with_exam('session.subscribed') ->
columns_message_sub_unsub('session.subscribed'); columns_message_sub_unsub('session.subscribed');
columns_with_exam('session.unsubscribed') -> columns_with_exam('session.unsubscribed') ->
columns_message_sub_unsub('session.unsubscribed'). columns_message_sub_unsub('session.unsubscribed');
columns_with_exam(<<"$bridges/mqtt">>) ->
[ {<<"event">>, <<"$bridges/mqtt">>}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"dup">>, false}
, {<<"retain">>, false}
%% the time that we receiced the message from remote broker
, {<<"message_received_at">>, erlang:system_time(millisecond)}
%% the time that the rule is triggered
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
].
columns_message_sub_unsub(EventName) -> columns_message_sub_unsub(EventName) ->
[ {<<"event">>, EventName} [ {<<"event">>, EventName}
@ -646,6 +673,7 @@ event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
event_name(<<"$bridges/", _/binary>> = Topic) -> Topic;
event_name(_) -> 'message.publish'. event_name(_) -> 'message.publish'.
event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.connected') -> <<"$events/client_connected">>;
@ -656,7 +684,8 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>;
event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.acked') -> <<"$events/message_acked">>;
event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('message.dropped') -> <<"$events/message_dropped">>;
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
event_topic('message.publish') -> <<"$events/message_publish">>. event_topic('message.publish') -> <<"$events/message_publish">>;
event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic.
printable_maps(undefined) -> #{}; printable_maps(undefined) -> #{};
printable_maps(Headers) -> printable_maps(Headers) ->