fix(bridges): ingress MQTT bridges didn't increase counters on msg received

This commit is contained in:
Shawn 2021-12-29 17:40:56 +08:00
parent c23436166b
commit aefcd6275b
3 changed files with 45 additions and 12 deletions

View File

@ -29,7 +29,7 @@
, bridges/0 , bridges/0
]). ]).
-export([on_message_received/2]). -export([on_message_received/3]).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([ on_start/2
@ -105,14 +105,17 @@ drop_bridge(Name) ->
case supervisor:terminate_child(?MODULE, Name) of case supervisor:terminate_child(?MODULE, Name) of
ok -> ok ->
supervisor:delete_child(?MODULE, Name); supervisor:delete_child(?MODULE, Name);
{error, not_found} ->
ok;
{error, Error} -> {error, Error} ->
{error, Error} {error, Error}
end. end.
%% =================================================================== %% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called %% When use this bridge as a data source, ?MODULE:on_message_received will be called
%% if the bridge received msgs from the remote broker. %% if the bridge received msgs from the remote broker.
on_message_received(Msg, HookPoint) -> on_message_received(Msg, HookPoint, InstId) ->
_ = emqx_resource:query(InstId, {message_received, Msg}),
emqx:run_hook(HookPoint, [Msg]). emqx:run_hook(HookPoint, [Msg]).
%% =================================================================== %% ===================================================================
@ -123,8 +126,8 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf), BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{ BridgeConf = BasicConf#{
name => InstanceId, name => InstanceId,
clientid => clientid(maps:get(clientid, Conf, InstId)), clientid => clientid(InstId),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)), subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), InstId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined)) forwards => make_forward_confs(maps:get(egress, Conf, undefined))
}, },
case ?MODULE:create_bridge(BridgeConf) of case ?MODULE:create_bridge(BridgeConf) of
@ -149,6 +152,9 @@ on_stop(_InstId, #{name := InstanceId}) ->
connector => InstanceId, reason => Reason}) connector => InstanceId, reason => Reason})
end. end.
on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) ->
emqx_resource:query_success(AfterQuery);
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
@ -166,15 +172,15 @@ ensure_mqtt_worker_started(InstanceId) ->
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 -> make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 ->
undefined; undefined;
make_sub_confs(undefined) -> make_sub_confs(undefined, _) ->
undefined; undefined;
make_sub_confs(SubRemoteConf) -> make_sub_confs(SubRemoteConf, InstId) ->
case maps:take(hookpoint, SubRemoteConf) of case maps:take(hookpoint, SubRemoteConf) of
error -> SubRemoteConf; error -> SubRemoteConf;
{HookPoint, SubConf} -> {HookPoint, SubConf} ->
MFA = {?MODULE, on_message_received, [HookPoint]}, MFA = {?MODULE, on_message_received, [HookPoint, InstId]},
SubConf#{on_message_received => MFA} SubConf#{on_message_received => MFA}
end. end.

View File

@ -168,7 +168,6 @@ handle_publish(Msg, undefined) ->
handle_publish(Msg, Vars) -> handle_publish(Msg, Vars) ->
?SLOG(debug, #{msg => "publish_to_local_broker", ?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}), message => Msg, vars => Vars}),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
case Vars of case Vars of
#{on_message_received := {Mod, Func, Args}} -> #{on_message_received := {Mod, Func, Args}} ->
_ = erlang:apply(Mod, Func, [Msg | Args]); _ = erlang:apply(Mod, Func, [Msg | Args]);

View File

@ -61,7 +61,7 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
-> exp_msg(). -> exp_msg().
to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false), Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)),
to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
@ -78,9 +78,10 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}. Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection %% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
#{local_topic := TopicToken, payload := PayloadToken, #{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
MapMsg = format_msg_received(MapMsg0),
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(PayloadToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),
@ -89,6 +90,33 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
flags => #{dup => Dup, retain => Retain},
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond),
node => node()
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).
process_payload([], Msg) -> process_payload([], Msg) ->
emqx_json:encode(Msg); emqx_json:encode(Msg);
process_payload(Tks, Msg) -> process_payload(Tks, Msg) ->