fix(rule): rules not triggered after the ingress mqtt bridge received some msg
This commit is contained in:
parent
9ba454a63d
commit
e299d8d138
|
@ -90,7 +90,14 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
||||||
send_message(BridgeId, Message) ->
|
send_message(BridgeId, Message) ->
|
||||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
||||||
emqx_resource:query(ResId, {send_message, Message}).
|
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
|
||||||
|
not_found ->
|
||||||
|
throw({bridge_not_found, BridgeId});
|
||||||
|
#{enable := true} ->
|
||||||
|
emqx_resource:query(ResId, {send_message, Message});
|
||||||
|
#{enable := false} ->
|
||||||
|
throw({bridge_stopped, BridgeId})
|
||||||
|
end.
|
||||||
|
|
||||||
config_key_path() ->
|
config_key_path() ->
|
||||||
[bridges].
|
[bridges].
|
||||||
|
@ -279,6 +286,8 @@ get_matched_bridges(Topic) ->
|
||||||
end, Acc0, Conf)
|
end, Acc0, Conf)
|
||||||
end, [], Bridges).
|
end, [], Bridges).
|
||||||
|
|
||||||
|
get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
|
||||||
|
Acc;
|
||||||
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
|
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
|
||||||
case emqx_topic:match(Topic, Filter) of
|
case emqx_topic:match(Topic, Filter) of
|
||||||
true -> [bridge_id(BType, BName) | Acc];
|
true -> [bridge_id(BType, BName) | Acc];
|
||||||
|
@ -309,21 +318,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
|
||||||
{Type, ConnName} ->
|
{Type, ConnName} ->
|
||||||
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
|
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
|
||||||
make_resource_confs(Direction, ConnectorConfs,
|
make_resource_confs(Direction, ConnectorConfs,
|
||||||
maps:without([connector, direction], Conf), Name);
|
maps:without([connector, direction], Conf), Type, Name);
|
||||||
{_ConnType, _ConnName} ->
|
{_ConnType, _ConnName} ->
|
||||||
error({cannot_use_connector_with_different_type, ConnId})
|
error({cannot_use_connector_with_different_type, ConnId})
|
||||||
end;
|
end;
|
||||||
parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
|
parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
|
||||||
when is_map(ConnectorConfs) ->
|
when is_map(ConnectorConfs) ->
|
||||||
make_resource_confs(Direction, ConnectorConfs,
|
make_resource_confs(Direction, ConnectorConfs,
|
||||||
maps:without([connector, direction], Conf), Name).
|
maps:without([connector, direction], Conf), Type, Name).
|
||||||
|
|
||||||
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) ->
|
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
|
||||||
BName = bin(Name),
|
BName = bridge_id(Type, Name),
|
||||||
ConnectorConfs#{
|
ConnectorConfs#{
|
||||||
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
|
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
|
||||||
};
|
};
|
||||||
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) ->
|
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
|
||||||
ConnectorConfs#{
|
ConnectorConfs#{
|
||||||
egress => BridgeConf
|
egress => BridgeConf
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -101,7 +101,7 @@ do_apply_rule(#{
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
||||||
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
||||||
{ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]};
|
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
|
||||||
false ->
|
false ->
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end;
|
end;
|
||||||
|
@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId,
|
||||||
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
||||||
{ok, handle_output_list(Outputs, Selected, Input)};
|
{ok, handle_output_list(RuleId, Outputs, Selected, Input)};
|
||||||
false ->
|
false ->
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end.
|
end.
|
||||||
|
@ -231,14 +231,15 @@ number(Bin) ->
|
||||||
catch error:badarg -> binary_to_float(Bin)
|
catch error:badarg -> binary_to_float(Bin)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_output_list(Outputs, Selected, Envs) ->
|
handle_output_list(RuleId, Outputs, Selected, Envs) ->
|
||||||
[handle_output(Out, Selected, Envs) || Out <- Outputs].
|
[handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
|
||||||
|
|
||||||
handle_output(OutId, Selected, Envs) ->
|
handle_output(RuleId, OutId, Selected, Envs) ->
|
||||||
try
|
try
|
||||||
do_handle_output(OutId, Selected, Envs)
|
do_handle_output(OutId, Selected, Envs)
|
||||||
catch
|
catch
|
||||||
Err:Reason:ST ->
|
Err:Reason:ST ->
|
||||||
|
ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId),
|
||||||
?SLOG(error, #{msg => "output_failed",
|
?SLOG(error, #{msg => "output_failed",
|
||||||
output => OutId,
|
output => OutId,
|
||||||
exception => Err,
|
exception => Err,
|
||||||
|
|
Loading…
Reference in New Issue