fix(bridges): keep multiple bridges from affecting each other on crash
This commit is contained in:
parent
f65eca4c47
commit
658f819aab
|
@ -80,26 +80,35 @@ unload_hook() ->
|
||||||
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
||||||
case maps:get(sys, Flags, false) of
|
case maps:get(sys, Flags, false) of
|
||||||
false ->
|
false ->
|
||||||
lists:foreach(fun (Id) ->
|
Msg = emqx_rule_events:eventmsg_publish(Message),
|
||||||
send_message(Id, emqx_rule_events:eventmsg_publish(Message))
|
send_to_egress_matched_bridges(Topic, Msg);
|
||||||
end, get_matched_bridges(Topic));
|
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
|
send_to_egress_matched_bridges(Topic, Msg) ->
|
||||||
|
lists:foreach(fun (Id) ->
|
||||||
|
try send_message(Id, Msg) of
|
||||||
|
ok -> ok;
|
||||||
|
Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed",
|
||||||
|
bridge => Id, error => Error})
|
||||||
|
catch Err:Reason:ST ->
|
||||||
|
?SLOG(error, #{msg => "send_message_to_bridge_crash",
|
||||||
|
bridge => Id, error => Err, reason => Reason,
|
||||||
|
stacktrace => ST})
|
||||||
|
end
|
||||||
|
end, get_matched_bridges(Topic)).
|
||||||
|
|
||||||
send_message(BridgeId, Message) ->
|
send_message(BridgeId, Message) ->
|
||||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
||||||
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
|
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
|
||||||
not_found ->
|
not_found ->
|
||||||
throw({bridge_not_found, BridgeId});
|
{error, {bridge_not_found, BridgeId}};
|
||||||
#{enable := true} ->
|
#{enable := true} ->
|
||||||
case emqx_resource:query(ResId, {send_message, Message}) of
|
emqx_resource:query(ResId, {send_message, Message});
|
||||||
{error, {emqx_resource, Reason}} -> throw({bridge_not_ready, Reason});
|
|
||||||
Result -> Result
|
|
||||||
end;
|
|
||||||
#{enable := false} ->
|
#{enable := false} ->
|
||||||
throw({bridge_stopped, BridgeId})
|
{error, {bridge_stopped, BridgeId}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
config_key_path() ->
|
config_key_path() ->
|
||||||
|
|
Loading…
Reference in New Issue