From 36640263ba16a2ac47a4d77c6a0b8697966b157d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 2 Nov 2023 18:28:22 +0100 Subject: [PATCH] fix(bridge_v2): start operation should return an error when unsuccessful The bridge V2 HTTP API start operation should return a 400 error if the start is unsuccessful. The bridge V1 HTTP API compatibility layer for Bridge V2 should return a 400 error if the start or restart operation is unsuccessful. This commit fixes the above and adds tests that checks this for the V2 HTTP API. Fixes: https://emqx.atlassian.net/browse/EMQX-11304 --- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 58 ++++++++++++++----- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 4 +- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 38 ++++++++++++ .../test/emqx_bridge_v2_api_SUITE.erl | 26 +++++++++ 5 files changed, 115 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index f2cc7163f..11f2949c3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -1079,7 +1079,9 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> {error, {unhealthy_target, Message}} -> ?BAD_REQUEST(Message); {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> - ?BAD_REQUEST(redact(Reason)) + ?BAD_REQUEST(redact(Reason)); + {error, Reason} -> + ?BAD_REQUEST(Reason) end. maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 0a080bc3e..3498b690b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -434,39 +434,66 @@ disable_enable(Action, BridgeType, BridgeName) when %% Manually start connector. This function can speed up reconnection when %% waiting for auto reconnection. The function forwards the start request to -%% its connector. +%% its connector. Returns ok if the status of the bridge is connected after +%% starting the connector. Returns {error, Reason} if the status of the bridge +%% is something else than connected after starting the connector or if an +%% error occurred when the connector was started. +-spec start(term(), term()) -> ok | {error, Reason :: term()}. start(BridgeV2Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun). + connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, true). -connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun) -> +connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) -> connector_operation_helper_with_conf( BridgeV2Type, + Name, lookup_conf(BridgeV2Type, Name), - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ). connector_operation_helper_with_conf( _BridgeV2Type, + _Name, {error, bridge_not_found} = Error, - _ConnectorOpFun + _ConnectorOpFun, + _DoHealthCheck ) -> Error; connector_operation_helper_with_conf( _BridgeV2Type, + _Name, #{enable := false}, - _ConnectorOpFun + _ConnectorOpFun, + _DoHealthCheck ) -> ok; connector_operation_helper_with_conf( BridgeV2Type, + Name, #{connector := ConnectorName}, - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ) -> ConnectorType = connector_type(BridgeV2Type), - ConnectorOpFun(ConnectorType, ConnectorName). + ConnectorOpFunResult = ConnectorOpFun(ConnectorType, ConnectorName), + case {DoHealthCheck, ConnectorOpFunResult} of + {false, _} -> + ConnectorOpFunResult; + {true, {error, Reason}} -> + {error, Reason}; + {true, ok} -> + case health_check(BridgeV2Type, Name) of + #{status := connected} -> + ok; + {error, Reason} -> + {error, Reason}; + NonConnectedStatus -> + {error, NonConnectedStatus} + end + end. reset_metrics(Type, Name) -> reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). @@ -513,6 +540,9 @@ do_send_msg_with_enabled_config( BridgeV2Id = id(BridgeType, BridgeName), emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). +-spec health_check(BridgeType :: term(), BridgeName :: term()) -> + #{status := term(), error := term()} | {error, Reason :: term()}. + health_check(BridgeType, BridgeName) -> case lookup_conf(BridgeType, BridgeName) of #{ @@ -1365,28 +1395,30 @@ bridge_v1_restart(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:restart(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true). bridge_v1_stop(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:stop(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, false). bridge_v1_start(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true). -bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) -> +bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of true -> connector_operation_helper_with_conf( BridgeV2Type, + Name, lookup_conf(BridgeV2Type, Name), - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ); false -> {error, not_bridge_v1_compatible} diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index f495a770b..6f1ff272b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -609,7 +609,9 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> {error, {unhealthy_target, Message}} -> ?BAD_REQUEST(Message); {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> - ?BAD_REQUEST(redact(Reason)) + ?BAD_REQUEST(redact(Reason)); + {error, Reason} -> + ?BAD_REQUEST(Reason) end. do_bpapi_call(all, Call, Args) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 837425888..c6d3d0566 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -806,6 +806,44 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> ), ok. +t_start_operation_when_on_add_channel_gives_error(_Config) -> + Conf = bridge_config(), + BridgeName = my_test_bridge, + emqx_common_test_helpers:with_mock( + emqx_bridge_v2_test_connector, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {error, <<"some_error">>} end, + fun() -> + %% We can crete the bridge event though on_add_channel returns error + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)), + ?assertMatch( + #{ + status := disconnected, + error := <<"some_error">> + }, + emqx_bridge_v2:health_check(bridge_type(), BridgeName) + ), + ?assertMatch( + {ok, #{ + status := disconnected, + error := <<"some_error">> + }}, + emqx_bridge_v2:lookup(bridge_type(), BridgeName) + ), + %% emqx_bridge_v2:start/2 should return ok if bridge if connected after + %% start and otherwise and error + ?assertMatch({error, _}, emqx_bridge_v2:start(bridge_type(), BridgeName)), + %% Let us change on_add_channel to be successful and try again + ok = meck:expect( + emqx_bridge_v2_test_connector, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {ok, #{}} end + ), + ?assertMatch(ok, emqx_bridge_v2:start(bridge_type(), BridgeName)) + end + ), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 484a1a325..96eccf169 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -280,6 +280,9 @@ init_mocks() -> meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId) + end), [?CONNECTOR_IMPL, emqx_connector_ee_schema]. clear_resources() -> @@ -504,6 +507,29 @@ do_start_bridge(TestType, Config) -> {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config), + %% Make start bridge fail + ok = meck:expect( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end + ), + + ok = emqx_connector_resource:stop(?BRIDGE_TYPE, ?CONNECTOR_NAME), + ok = emqx_connector_resource:start(?BRIDGE_TYPE, ?CONNECTOR_NAME), + + {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config), + + %% Make start bridge succeed + + ok = meck:expect( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {ok, connector_state} end + ), + + %% try to start again + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), {ok, 200, []} = request_json(get, uri([?ROOT]), Config),