Merge pull request #6603 from terry-xiaoyu/bridge_bug_fixes_3

fix(bridge): prohibit deleting connectors that are in use
This commit is contained in:
Shawn 2022-01-02 12:38:09 +08:00 committed by GitHub
commit e5d9d4c83e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 35 deletions

View File

@ -259,7 +259,7 @@ update(Type, Name, {OldConf, Conf}) ->
end.
recreate(Type, Name) ->
recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
recreate(Type, Name, emqx:get_config([bridges, Type, Name])).
recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name),

View File

@ -37,31 +37,26 @@
config_key_path() ->
[connectors].
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
LinkedBridgeIds = lists:foldl(fun
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc)
when ConnId0 == ConnId ->
[BId | Acc];
(_, Acc) -> Acc
end, [], emqx_bridge:list()),
case LinkedBridgeIds of
[] -> ok;
_ -> {error, {dependency_bridges_exist, LinkedBridgeIds}}
try foreach_linked_bridges(ConnId, fun(#{id := BId}) ->
throw({dependency_bridges_exist, BId})
end)
catch throw:Error -> {error, Error}
end;
post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) ->
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
lists:foreach(fun
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId ->
foreach_linked_bridges(ConnId,
fun(#{id := BId}) ->
{BType, BName} = emqx_bridge:parse_bridge_id(BId),
BridgeConf = emqx:get_config([bridges, BType, BName]),
case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of
{ok, _} -> ok;
case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf},
BridgeConf#{connector => NewConf}}) of
ok -> ok;
{error, Reason} -> error({update_bridge_error, Reason})
end;
(_) ->
ok
end, emqx_bridge:list()).
end
end).
connector_id(Type0, Name0) ->
Type = bin(Type0),
@ -112,3 +107,10 @@ delete(Type, Name) ->
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
foreach_linked_bridges(ConnId, Do) ->
lists:foreach(fun
(#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId ->
Do(Bridge);
(_) -> ok
end, emqx_bridge:list()).

View File

@ -253,6 +253,10 @@ schema("/connectors/:id") ->
{ok, _} ->
case emqx_connector:delete(ConnType, ConnName) of
{ok, _} -> {204};
{error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} ->
{403, error_msg('DEPENDENCY_EXISTS',
<<"Cannot remove the connector as it's in use by a bridge: ",
BridgeID/binary>>)};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end;
{error, not_found} ->

View File

@ -108,6 +108,9 @@ end_per_suite(_Config) ->
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
@ -200,10 +203,6 @@ t_mqtt_crud_apis(_) ->
ok.
t_mqtt_conn_bridge_ingress(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
@ -272,10 +271,6 @@ t_mqtt_conn_bridge_ingress(_) ->
ok.
t_mqtt_conn_bridge_egress(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
@ -350,10 +345,6 @@ t_mqtt_conn_bridge_egress(_) ->
%% - update a connector should also update all of the the bridges
%% - cannot delete a connector that is used by at least one bridge
t_mqtt_conn_update(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
@ -396,10 +387,6 @@ t_mqtt_conn_update(_) ->
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update2(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST
%% but this connector is point to a unreachable server "2603"
{ok, 201, Connector} = request(post, uri(["connectors"]),
@ -440,6 +427,34 @@ t_mqtt_conn_update2(_) ->
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update3(_) ->
%% we add a mqtt connector, using POST
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
#{ <<"id">> := BridgeIDEgress
, <<"status">> := <<"connected">>
, <<"connector">> := ConnctorID
} = jsx:decode(Bridge),
%% delete the connector should fail because it is in use by a bridge
{ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
%% the connector now can be deleted without problems
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
t_mqtt_conn_testing(_) ->
%% APIs for testing the connectivity
%% then we add a mqtt connector, using POST