From 808646c2a173a3982bd7c73cd07c7b155b09c8dc Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 15:47:27 +0800 Subject: [PATCH] fix(bridge): prohibit deleting connectors that are in use --- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- apps/emqx_connector/src/emqx_connector.erl | 38 ++++++++------- .../emqx_connector/src/emqx_connector_api.erl | 4 ++ .../test/emqx_connector_api_SUITE.erl | 47 ++++++++++++------- 4 files changed, 56 insertions(+), 35 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 2e610b2b9..6e014f2ec 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -259,7 +259,7 @@ update(Type, Name, {OldConf, Conf}) -> end. recreate(Type, Name) -> - recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])). + recreate(Type, Name, emqx:get_config([bridges, Type, Name])). recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 940e958e3..db1caefbb 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -37,31 +37,26 @@ config_key_path() -> [connectors]. +-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]). post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), - LinkedBridgeIds = lists:foldl(fun - (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc) - when ConnId0 == ConnId -> - [BId | Acc]; - (_, Acc) -> Acc - end, [], emqx_bridge:list()), - case LinkedBridgeIds of - [] -> ok; - _ -> {error, {dependency_bridges_exist, LinkedBridgeIds}} + try foreach_linked_bridges(ConnId, fun(#{id := BId}) -> + throw({dependency_bridges_exist, BId}) + end) + catch throw:Error -> {error, Error} end; -post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) -> +post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), - lists:foreach(fun - (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId -> + foreach_linked_bridges(ConnId, + fun(#{id := BId}) -> {BType, BName} = emqx_bridge:parse_bridge_id(BId), BridgeConf = emqx:get_config([bridges, BType, BName]), - case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of - {ok, _} -> ok; + case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf}, + BridgeConf#{connector => NewConf}}) of + ok -> ok; {error, Reason} -> error({update_bridge_error, Reason}) - end; - (_) -> - ok - end, emqx_bridge:list()). + end + end). connector_id(Type0, Name0) -> Type = bin(Type0), @@ -112,3 +107,10 @@ delete(Type, Name) -> bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +foreach_linked_bridges(ConnId, Do) -> + lists:foreach(fun + (#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId -> + Do(Bridge); + (_) -> ok + end, emqx_bridge:list()). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 4989cf17e..72938649c 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -253,6 +253,10 @@ schema("/connectors/:id") -> {ok, _} -> case emqx_connector:delete(ConnType, ConnName) of {ok, _} -> {204}; + {error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} -> + {403, error_msg('DEPENDENCY_EXISTS', + <<"Cannot remove the connector as it's in use by a bridge: ", + BridgeID/binary>>)}; {error, Error} -> {400, error_msg('BAD_ARG', Error)} end; {error, not_found} -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 936982e75..4caf700a9 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -108,6 +108,9 @@ end_per_suite(_Config) -> init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + %% assert we there's no connectors and no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), Config. end_per_testcase(_, _Config) -> clear_resources(), @@ -200,10 +203,6 @@ t_mqtt_crud_apis(_) -> ok. t_mqtt_conn_bridge_ingress(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -272,10 +271,6 @@ t_mqtt_conn_bridge_ingress(_) -> ok. t_mqtt_conn_bridge_egress(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -350,10 +345,6 @@ t_mqtt_conn_bridge_egress(_) -> %% - update a connector should also update all of the the bridges %% - cannot delete a connector that is used by at least one bridge t_mqtt_conn_update(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST {ok, 201, Connector} = request(post, uri(["connectors"]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) @@ -396,10 +387,6 @@ t_mqtt_conn_update(_) -> {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). t_mqtt_conn_update2(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST %% but this connector is point to a unreachable server "2603" {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -440,6 +427,34 @@ t_mqtt_conn_update2(_) -> {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). +t_mqtt_conn_update3(_) -> + %% we add a mqtt connector, using POST + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) + #{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + %% ... and a MQTT bridge, using POST + %% we bind this bridge to the connector created just now + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), + + %% delete the connector should fail because it is in use by a bridge + {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + %% the connector now can be deleted without problems + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + t_mqtt_conn_testing(_) -> %% APIs for testing the connectivity %% then we add a mqtt connector, using POST