fix(bridge): prohibit deleting connectors that are in use
This commit is contained in:
parent
59e2614574
commit
808646c2a1
|
@ -259,7 +259,7 @@ update(Type, Name, {OldConf, Conf}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
recreate(Type, Name) ->
|
recreate(Type, Name) ->
|
||||||
recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
|
recreate(Type, Name, emqx:get_config([bridges, Type, Name])).
|
||||||
|
|
||||||
recreate(Type, Name, Conf) ->
|
recreate(Type, Name, Conf) ->
|
||||||
emqx_resource:recreate_local(resource_id(Type, Name),
|
emqx_resource:recreate_local(resource_id(Type, Name),
|
||||||
|
|
|
@ -37,31 +37,26 @@
|
||||||
config_key_path() ->
|
config_key_path() ->
|
||||||
[connectors].
|
[connectors].
|
||||||
|
|
||||||
|
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
|
||||||
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
|
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
|
||||||
ConnId = connector_id(Type, Name),
|
ConnId = connector_id(Type, Name),
|
||||||
LinkedBridgeIds = lists:foldl(fun
|
try foreach_linked_bridges(ConnId, fun(#{id := BId}) ->
|
||||||
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc)
|
throw({dependency_bridges_exist, BId})
|
||||||
when ConnId0 == ConnId ->
|
end)
|
||||||
[BId | Acc];
|
catch throw:Error -> {error, Error}
|
||||||
(_, Acc) -> Acc
|
|
||||||
end, [], emqx_bridge:list()),
|
|
||||||
case LinkedBridgeIds of
|
|
||||||
[] -> ok;
|
|
||||||
_ -> {error, {dependency_bridges_exist, LinkedBridgeIds}}
|
|
||||||
end;
|
end;
|
||||||
post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) ->
|
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
|
||||||
ConnId = connector_id(Type, Name),
|
ConnId = connector_id(Type, Name),
|
||||||
lists:foreach(fun
|
foreach_linked_bridges(ConnId,
|
||||||
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId ->
|
fun(#{id := BId}) ->
|
||||||
{BType, BName} = emqx_bridge:parse_bridge_id(BId),
|
{BType, BName} = emqx_bridge:parse_bridge_id(BId),
|
||||||
BridgeConf = emqx:get_config([bridges, BType, BName]),
|
BridgeConf = emqx:get_config([bridges, BType, BName]),
|
||||||
case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of
|
case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf},
|
||||||
{ok, _} -> ok;
|
BridgeConf#{connector => NewConf}}) of
|
||||||
|
ok -> ok;
|
||||||
{error, Reason} -> error({update_bridge_error, Reason})
|
{error, Reason} -> error({update_bridge_error, Reason})
|
||||||
end;
|
end
|
||||||
(_) ->
|
end).
|
||||||
ok
|
|
||||||
end, emqx_bridge:list()).
|
|
||||||
|
|
||||||
connector_id(Type0, Name0) ->
|
connector_id(Type0, Name0) ->
|
||||||
Type = bin(Type0),
|
Type = bin(Type0),
|
||||||
|
@ -112,3 +107,10 @@ delete(Type, Name) ->
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
||||||
|
foreach_linked_bridges(ConnId, Do) ->
|
||||||
|
lists:foreach(fun
|
||||||
|
(#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId ->
|
||||||
|
Do(Bridge);
|
||||||
|
(_) -> ok
|
||||||
|
end, emqx_bridge:list()).
|
||||||
|
|
|
@ -253,6 +253,10 @@ schema("/connectors/:id") ->
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
case emqx_connector:delete(ConnType, ConnName) of
|
case emqx_connector:delete(ConnType, ConnName) of
|
||||||
{ok, _} -> {204};
|
{ok, _} -> {204};
|
||||||
|
{error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} ->
|
||||||
|
{403, error_msg('DEPENDENCY_EXISTS',
|
||||||
|
<<"Cannot remove the connector as it's in use by a bridge: ",
|
||||||
|
BridgeID/binary>>)};
|
||||||
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
|
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
|
|
|
@ -108,6 +108,9 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||||
|
%% assert we there's no connectors and no bridges at first
|
||||||
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
||||||
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
Config.
|
Config.
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
clear_resources(),
|
clear_resources(),
|
||||||
|
@ -200,10 +203,6 @@ t_mqtt_crud_apis(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_mqtt_conn_bridge_ingress(_) ->
|
t_mqtt_conn_bridge_ingress(_) ->
|
||||||
%% assert we there's no connectors and no bridges at first
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
||||||
|
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||||
|
@ -272,10 +271,6 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_mqtt_conn_bridge_egress(_) ->
|
t_mqtt_conn_bridge_egress(_) ->
|
||||||
%% assert we there's no connectors and no bridges at first
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
||||||
|
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||||
|
@ -350,10 +345,6 @@ t_mqtt_conn_bridge_egress(_) ->
|
||||||
%% - update a connector should also update all of the the bridges
|
%% - update a connector should also update all of the the bridges
|
||||||
%% - cannot delete a connector that is used by at least one bridge
|
%% - cannot delete a connector that is used by at least one bridge
|
||||||
t_mqtt_conn_update(_) ->
|
t_mqtt_conn_update(_) ->
|
||||||
%% assert we there's no connectors and no bridges at first
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
||||||
|
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||||
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
|
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
|
||||||
|
@ -396,10 +387,6 @@ t_mqtt_conn_update(_) ->
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
|
||||||
|
|
||||||
t_mqtt_conn_update2(_) ->
|
t_mqtt_conn_update2(_) ->
|
||||||
%% assert we there's no connectors and no bridges at first
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
||||||
|
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
%% but this connector is point to a unreachable server "2603"
|
%% but this connector is point to a unreachable server "2603"
|
||||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||||
|
@ -440,6 +427,34 @@ t_mqtt_conn_update2(_) ->
|
||||||
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
|
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
|
||||||
|
|
||||||
|
t_mqtt_conn_update3(_) ->
|
||||||
|
%% we add a mqtt connector, using POST
|
||||||
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||||
|
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
|
||||||
|
#{ <<"type">> => ?CONNECTR_TYPE
|
||||||
|
, <<"name">> => ?CONNECTR_NAME
|
||||||
|
}),
|
||||||
|
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||||
|
|
||||||
|
%% ... and a MQTT bridge, using POST
|
||||||
|
%% we bind this bridge to the connector created just now
|
||||||
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||||
|
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
||||||
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
|
}),
|
||||||
|
#{ <<"id">> := BridgeIDEgress
|
||||||
|
, <<"status">> := <<"connected">>
|
||||||
|
, <<"connector">> := ConnctorID
|
||||||
|
} = jsx:decode(Bridge),
|
||||||
|
|
||||||
|
%% delete the connector should fail because it is in use by a bridge
|
||||||
|
{ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
|
||||||
|
%% delete the bridge
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||||
|
%% the connector now can be deleted without problems
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
||||||
|
|
||||||
t_mqtt_conn_testing(_) ->
|
t_mqtt_conn_testing(_) ->
|
||||||
%% APIs for testing the connectivity
|
%% APIs for testing the connectivity
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
|
|
Loading…
Reference in New Issue